Modified: qpid/branches/qpid-2920/qpid/java/test-profiles/Java010Excludes URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/test-profiles/Java010Excludes?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/java/test-profiles/Java010Excludes (original) +++ qpid/branches/qpid-2920/qpid/java/test-profiles/Java010Excludes Tue Mar 15 01:54:07 2011 @@ -47,6 +47,7 @@ org.apache.qpid.server.logging.Subscript // 0-10 Broker does not have a JMX connection MBean org.apache.qpid.management.jmx.ManagementActorLoggingTest#testConnectionCloseViaManagement +org.apache.qpid.management.jmx.MessageConnectionStatisticsTest#* // 0-10 has different ideas about clientid and ownership of queues org.apache.qpid.server.queue.ModelTest#* @@ -54,9 +55,6 @@ org.apache.qpid.server.queue.ModelTest#* // 0-10 is not supported by the MethodRegistry org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* -// QPID-2084 : this test needs more work for 0-10 -org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#* - //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) org.apache.qpid.server.queue.ProducerFlowControlTest#* @@ -74,6 +72,9 @@ org.apache.qpid.test.unit.publish.DirtyT org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage org.apache.qpid.test.unit.ack.RecoverTest#testRecoverInAutoAckListener +// This test uses 0-8 channel frames +org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* + //Temporarily adding the following until the issues are sorted out. //Should probably raise JIRAs for them. org.apache.qpid.transport.network.mina.MINANetworkDriverTest#*
Modified: qpid/branches/qpid-2920/qpid/python/qpid/codec010.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/python/qpid/codec010.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/python/qpid/codec010.py (original) +++ qpid/branches/qpid-2920/qpid/python/qpid/codec010.py Tue Mar 15 01:54:07 2011 @@ -17,7 +17,7 @@ # under the License. # -import datetime +import datetime, string from packer import Packer from datatypes import serial, timestamp, RangedSet, Struct, UUID from ops import Compound, PRIMITIVE, COMPOUND @@ -241,15 +241,20 @@ class Codec(Packer): v = sc.read_primitive(type) result[k] = v return result + + def _write_map_elem(self, k, v): + type = self.encoding(v) + sc = StringCodec() + sc.write_str8(k) + sc.write_uint8(type.CODE) + sc.write_primitive(type, v) + return sc.encoded + def write_map(self, m): sc = StringCodec() if m is not None: sc.write_uint32(len(m)) - for k, v in m.items(): - type = self.encoding(v) - sc.write_str8(k) - sc.write_uint8(type.CODE) - sc.write_primitive(type, v) + sc.write(string.joinfields(map(self._write_map_elem, m.keys(), m.values()), "")) self.write_vbin32(sc.encoded) def read_array(self): Modified: qpid/branches/qpid-2920/qpid/python/qpid/messaging/endpoints.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/python/qpid/messaging/endpoints.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/python/qpid/messaging/endpoints.py (original) +++ qpid/branches/qpid-2920/qpid/python/qpid/messaging/endpoints.py Tue Mar 15 01:54:07 2011 @@ -197,7 +197,7 @@ class Connection(Endpoint): return result def check_closed(self): - if self.closed: + if not self._connected: self._condition.gc() raise ConnectionClosed() Modified: qpid/branches/qpid-2920/qpid/python/qpid/tests/messaging/endpoints.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/python/qpid/tests/messaging/endpoints.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/python/qpid/tests/messaging/endpoints.py (original) +++ qpid/branches/qpid-2920/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 15 01:54:07 2011 @@ -186,6 +186,9 @@ class ConnectionTests(Base): def setup_connection(self): return Connection.establish(self.broker, **self.connection_options()) + def testCheckClosed(self): + assert not self.conn.check_closed() + def testSessionAnon(self): ssn1 = self.conn.session() ssn2 = self.conn.session() Modified: qpid/branches/qpid-2920/qpid/python/setup.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/python/setup.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/python/setup.py (original) +++ qpid/branches/qpid-2920/qpid/python/setup.py Tue Mar 15 01:54:07 2011 @@ -298,7 +298,7 @@ class install_lib(_install_lib): return outfiles + extra setup(name="qpid-python", - version="0.9", + version="0.11", author="Apache Qpid", author_email="[email protected]", packages=["mllib", "qpid", "qpid.messaging", "qpid.tests", Modified: qpid/branches/qpid-2920/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/specs/management-schema.xml?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/specs/management-schema.xml (original) +++ qpid/branches/qpid-2920/qpid/specs/management-schema.xml Tue Mar 15 01:54:07 2011 @@ -176,6 +176,7 @@ <statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> <statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/> <statistic name="flowStopped" type="bool" desc="Flow control active."/> + <statistic name="flowStoppedCount" type="count32" desc="Number of times flow control was activated for this queue"/> <method name="purge" desc="Discard all or some messages on a queue"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> Modified: qpid/branches/qpid-2920/qpid/tests/setup.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/setup.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/setup.py (original) +++ qpid/branches/qpid-2920/qpid/tests/setup.py Tue Mar 15 01:54:07 2011 @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-tests", - version="0.9", + version="0.11", author="Apache Qpid", author_email="[email protected]", packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9", Modified: qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py (original) +++ qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py Tue Mar 15 01:54:07 2011 @@ -18,7 +18,7 @@ # import traceback from qpid.queue import Empty -from qpid.datatypes import Message +from qpid.datatypes import Message, RangedSet from qpid.testlib import TestBase010 from qpid.session import SessionException @@ -77,13 +77,7 @@ class AlternateExchangeTests(TestBase010 """ session = self.session #set up a 'dead letter queue': - session.exchange_declare(exchange="dlq", type="fanout") - session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="dlq", queue="deleted") - session.message_subscribe(destination="dlq", queue="deleted") - session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - dlq = session.incoming("dlq") + dlq = self.setup_dlq() #create a queue using the dlq as its alternate exchange: session.queue_declare(queue="delete-me", alternate_exchange="dlq") @@ -236,6 +230,121 @@ class AlternateExchangeTests(TestBase010 self.assertEqual("Three", dlq.get(timeout=1).body) self.assertEmpty(dlq) + def test_queue_delete_loop(self): + """ + Test that if a queue is bound to its own alternate exchange, + then on deletion there is no infinite looping + """ + session = self.session + dlq = self.setup_dlq() + + #create a queue using the dlq as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="dlq") + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delete-me") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + #delete it: + session.queue_delete(queue="delete-me") + #cleanup: + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_queue_delete_no_match(self): + """ + Test that on queue deletion, if the queues own alternate + exchange cannot find a match for the message, the + alternate-exchange of that exchange will be tried. Note: + though the spec rules out going to the alternate-exchanges + alternate exchange when sending to an exchange, it does not + cover this case. + """ + session = self.session + dlq = self.setup_dlq() + + #setu up an 'intermediary' exchange + session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") + + #create a queue using the intermediary as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="my-exchange") + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delete-me") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + + #delete it: + session.queue_delete(queue="delete-me") + #cleanup: + session.exchange_delete(exchange="my-exchange") + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_reject_no_match(self): + """ + Test that on rejecting a message, if the queues own alternate + exchange cannot find a match for the message, the + alternate-exchange of that exchange will be tried. Note: + though the spec rules out going to the alternate-exchanges + alternate exchange when sending to an exchange, it does not + cover this case. + """ + session = self.session + dlq = self.setup_dlq() + + #setu up an 'intermediary' exchange + session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") + + #create a queue using the intermediary as its alternate exchange: + session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True) + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delivery-queue") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delivery-queue") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + + #get and reject those messages: + session.message_subscribe(destination="a", queue="delivery-queue") + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + incoming = session.incoming("a") + for m in ["One", "Two", "Three"]: + msg = incoming.get(timeout=1) + self.assertEqual(m, msg.body) + session.message_reject(RangedSet(msg.id)) + session.message_cancel(destination="a") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + #cleanup: + session.exchange_delete(exchange="my-exchange") + session.exchange_delete(exchange="dlq") + + def setup_dlq(self): + session = self.session + #set up 'dead-letter' handling: + session.exchange_declare(exchange="dlq", type="fanout") + session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="dlq", queue="deleted") + session.message_subscribe(destination="dlq", queue="deleted") + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + dlq = session.incoming("dlq") + return dlq def assertEmpty(self, queue): try: Modified: qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py (original) +++ qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py Tue Mar 15 01:54:07 2011 @@ -448,9 +448,9 @@ class MiscellaneousErrorsTests(TestHelpe def testTypeNotKnown(self): try: self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") - self.fail("Expected 503 for declaration of unknown exchange type.") + self.fail("Expected 404 for declaration of unknown exchange type.") except SessionException, e: - self.assertEquals(503, e.args[0].error_code) + self.assertEquals(404, e.args[0].error_code) def testDifferentDeclaredType(self): self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") Modified: qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py (original) +++ qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py Tue Mar 15 01:54:07 2011 @@ -20,6 +20,8 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content from qpid.testlib import TestBase010 +from qpid.session import SessionException +from qpid.datatypes import uuid4 from time import sleep class ExtensionTests(TestBase010): @@ -28,10 +30,57 @@ class ExtensionTests(TestBase010): def test_timed_autodelete(self): session = self.session session2 = self.conn.session("another-session") - session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5}) + session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":3}) session2.close() result = session.queue_query(queue="my-queue") self.assertEqual("my-queue", result.queue) sleep(5) result = session.queue_query(queue="my-queue") self.assert_(not result.queue) + + def valid_policy_args(self, args, name="test-queue"): + try: + self.session.queue_declare(queue=name, arguments=args) + self.session.queue_delete(queue=name) # cleanup + except SessionException, e: + self.fail("declare with valid policy args failed: %s" % (args)) + self.session = self.conn.session("replacement", 2) + + def invalid_policy_args(self, args, name="test-queue"): + # go through invalid declare attempts twice to make sure that + # the queue doesn't actually get created first time around + # even if exception is thrown + for i in range(1, 3): + try: + self.session.queue_declare(queue=name, arguments=args) + self.session.queue_delete(queue=name) # cleanup + self.fail("declare with invalid policy args suceeded: %s (iteration %d)" % (args, i)) + except SessionException, e: + self.session = self.conn.session(str(uuid4())) + + def test_policy_max_size_as_valid_string(self): + self.valid_policy_args({"qpid.max_size":"3"}) + + def test_policy_max_count_as_valid_string(self): + self.valid_policy_args({"qpid.max_count":"3"}) + + def test_policy_max_count_and_size_as_valid_strings(self): + self.valid_policy_args({"qpid.max_count":"3","qpid.max_size":"0"}) + + def test_policy_negative_count(self): + self.invalid_policy_args({"qpid.max_count":-1}) + + def test_policy_negative_size(self): + self.invalid_policy_args({"qpid.max_size":-1}) + + def test_policy_size_as_invalid_string(self): + self.invalid_policy_args({"qpid.max_size":"foo"}) + + def test_policy_count_as_invalid_string(self): + self.invalid_policy_args({"qpid.max_count":"foo"}) + + def test_policy_size_as_float(self): + self.invalid_policy_args({"qpid.max_size":3.14159}) + + def test_policy_count_as_float(self): + self.invalid_policy_args({"qpid.max_count":"2222222.22222"}) Modified: qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/management.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original) +++ qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Tue Mar 15 01:54:07 2011 @@ -242,6 +242,38 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) + def test_reroute_priority_queue(self): + self.startQmf() + session = self.session + + #setup test queue supporting multiple priority levels + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10}) + + #send some messages of varying priority to that queue: + for i in range(0, 5): + deliveryProps = session.delivery_properties(routing_key="test-queue", priority=i+5) + session.message_transfer(message=Message(deliveryProps, "Message %d" % (i+1))) + + + #declare and bind a queue to amq.fanout through which rerouted + #messages can be verified: + session.queue_declare(queue="rerouted", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10}) + session.exchange_bind(queue="rerouted", exchange="amq.fanout") + + #reroute messages from test queue to amq.fanout (and hence to + #rerouted queue): + pq = self.qmf.getObjects(_class="queue", name="test-queue")[0] + result = pq.reroute(0, False, "amq.fanout") + self.assertEqual(result.status, 0) + + #verify messages are all rerouted: + self.subscribe(destination="incoming", queue="rerouted") + incoming = session.incoming("incoming") + for i in range(0, 5): + msg = incoming.get(timeout=1) + self.assertEqual("Message %d" % (5-i), msg.body) + + def test_reroute_queue(self): """ Test ability to reroute messages from the head of a queue. @@ -309,7 +341,40 @@ class ManagementTest (TestBase010): self.assertEqual(result.status, 0) pq.update() self.assertEqual(pq.msgDepth,20) - + + def test_reroute_alternate_exchange(self): + """ + Test that when rerouting, the alternate-exchange is considered if relevant + """ + self.startQmf() + session = self.session + # 1. Create 2 exchanges A and B (fanout) where B is the + # alternate exchange for A + session.exchange_declare(exchange="B", type="fanout") + session.exchange_declare(exchange="A", type="fanout", alternate_exchange="B") + + # 2. Bind queue X to B + session.queue_declare(queue="X", exclusive=True, auto_delete=True) + session.exchange_bind(queue="X", exchange="B") + + # 3. Send 1 message to queue Y + session.queue_declare(queue="Y", exclusive=True, auto_delete=True) + props = session.delivery_properties(routing_key="Y") + session.message_transfer(message=Message(props, "reroute me!")) + + # 4. Call reroute on queue Y and specify that messages should + # be sent to exchange A + y = self.qmf.getObjects(_class="queue", name="Y")[0] + result = y.reroute(1, False, "A") + self.assertEqual(result.status, 0) + + # 5. verify that the message is rerouted through B (as A has + # no matching bindings) to X + self.subscribe(destination="x", queue="X") + self.assertEqual("reroute me!", session.incoming("x").get(timeout=1).body) + + # Cleanup + for e in ["A", "B"]: session.exchange_delete(exchange=e) def test_methods_async (self): """ Modified: qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/message.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (original) +++ qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Tue Mar 15 01:54:07 2011 @@ -245,9 +245,19 @@ class MessageTests(TestBase010): self.fail("Got message after cancellation: " + msg) except Empty: None - #cancellation of non-existant consumers should be handled without error - session.message_cancel(destination="my-consumer") - session.message_cancel(destination="this-never-existed") + #cancellation of non-existant consumers should be result in 404s + try: + session.message_cancel(destination="my-consumer") + self.fail("Expected 404 for recancellation of subscription.") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + session = self.conn.session("alternate-session", timeout=10) + try: + session.message_cancel(destination="this-never-existed") + self.fail("Expected 404 for cancellation of unknown subscription.") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) def test_ack(self): Modified: qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py (original) +++ qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py Tue Mar 15 01:54:07 2011 @@ -33,13 +33,13 @@ class PriorityTests (Base): def setup_session(self): return self.conn.session() - def prioritised_delivery(self, priorities, levels=10): + def prioritised_delivery(self, priorities, levels=10, key="x-qpid-priorities"): """ Test that message on a queue are delivered in priority order. """ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] - snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels, + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s}}}}" % (key, levels), durable=self.durable()) for m in msgs: snd.send(m) @@ -50,16 +50,16 @@ class PriorityTests (Base): assert msg.content == expected.content self.ssn.acknowledge(msg) - def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10): + def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10, level_key="x-qpid-priorities", fairshare_key="x-qpid-fairshare"): msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] - limit_policy = "x-qpid-fairshare:%s" % default_limit + limit_policy = "'%s':%s" % (fairshare_key, default_limit) if limits: for k, v in limits.items(): - limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v) + limit_policy += ", '%s-%s':%s" % (fairshare_key, k, v) - snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}" - % (levels, limit_policy), + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s, %s}}}}" + % (level_key, levels, limit_policy), durable=self.durable()) for m in msgs: snd.send(m) @@ -79,12 +79,18 @@ class PriorityTests (Base): def test_prioritised_delivery_1(self): self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10) + def test_prioritised_delivery_with_alias(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10, key="qpid.priorities") + def test_prioritised_delivery_2(self): self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5) def test_fairshare_1(self): self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]) + def test_fairshare_with_alias(self): + self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,2,3], level_key="qpid.priorities", fairshare_key="qpid.fairshare") + def test_fairshare_2(self): self.fairshare_delivery(priorities = [10 for i in range(30)]) Modified: qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py (original) +++ qpid/branches/qpid-2920/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py Tue Mar 15 01:54:07 2011 @@ -60,3 +60,18 @@ class ThresholdTests (Base): def test_alert_size_alias(self): self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)]) + + def test_alert_on_alert_queue(self): + rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}") + rcvQMFv1 = self.ssn.receiver("qpid.management/console.event.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}") + snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'qpid.alert_count':1}}}}") + snd.send(Message("my-message")) + queues = [] + for i in range(2): + event = rcv.fetch() + schema = event.content[0]["_schema_id"] + assert schema["_class_name"] == "queueThresholdExceeded" + values = event.content[0]["_values"] + queues.append(values["qName"]) + assert "ttq" in queues, "expected event for ttq (%s)" % (queues) + Modified: qpid/branches/qpid-2920/qpid/tools/setup.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tools/setup.py?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tools/setup.py (original) +++ qpid/branches/qpid-2920/qpid/tools/setup.py Tue Mar 15 01:54:07 2011 @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-tools", - version="0.9", + version="0.11", author="Apache Qpid", author_email="[email protected]", scripts=["src/py/qpid-cluster", @@ -30,7 +30,8 @@ setup(name="qpid-tools", "src/py/qpid-queue-stats", "src/py/qpid-route", "src/py/qpid-stat", - "src/py/qpid-tool"], + "src/py/qpid-tool", + "src/py/qmf-tool"], url="http://qpid.apache.org/", license="Apache Software License", description="Diagnostic and management tools for Apache Qpid brokers.") Modified: qpid/branches/qpid-2920/qpid/tools/src/py/qpid-config URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tools/src/py/qpid-config?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tools/src/py/qpid-config (original) +++ qpid/branches/qpid-2920/qpid/tools/src/py/qpid-config Tue Mar 15 01:54:07 2011 @@ -101,6 +101,7 @@ class Config: self._flowResumeCount = None self._flowStopSize = None self._flowResumeSize = None + self._extra_arguments = [] config = Config() @@ -119,6 +120,13 @@ FLOW_STOP_COUNT = "qpid.flow_stop_coun FLOW_RESUME_COUNT = "qpid.flow_resume_count" FLOW_STOP_SIZE = "qpid.flow_stop_size" FLOW_RESUME_SIZE = "qpid.flow_resume_size" +#There are various arguments to declare that have specific program +#options in this utility. However there is now a generic mechanism for +#passing arguments as well. The SPECIAL_ARGS list contains the +#arguments for which there are specific program options defined +#i.e. the arguments for which there is special processing on add and +#list +SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE] class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -178,6 +186,8 @@ def OptionsAndArguments(argv): help="Turn on sender flow control when the number of queued messages exceeds this value.") group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>", help="Turn off sender flow control when the number of queued messages drops below this value.") + group3.add_option("--argument", dest="extra_arguments", action="append", default=[], + metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments") # no option for declaring an exclusive queue - which can only be used by the session that creates it. parser.add_option_group(group3) @@ -257,6 +267,8 @@ def OptionsAndArguments(argv): config._flowStopCount = opts.flow_stop_count if opts.flow_resume_count: config._flowResumeCount = opts.flow_resume_count + if opts.extra_arguments: + config._extra_arguments = opts.extra_arguments return args @@ -360,6 +372,7 @@ class BrokerManager: if self.match(ex.name, filter): print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), args = ex.arguments + if not args: args = {} if ex.durable: print "--durable", if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", @@ -401,25 +414,26 @@ class BrokerManager: if self.match(q.name, filter): print "%-*s " % (maxNameLen, q.name), args = q.arguments + if not args: args = {} if q.durable: print "--durable", if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", if q.autoDelete: print "auto-del", if q.exclusive: print "excl", - if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], - if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], - if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], - if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], + if FILESIZE in args: print "--file-size=%s" % args[FILESIZE], + if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT], + if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE], + if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), if LVQ in args and args[LVQ] == 1: print "--order lvq", if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", - if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], + if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q._altExchange_.name, - if FLOW_STOP_SIZE in args: print "--flow-stop-size=%d" % args[FLOW_STOP_SIZE], - if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%d" % args[FLOW_RESUME_SIZE], - if FLOW_STOP_COUNT in args: print "--flow-stop-count=%d" % args[FLOW_STOP_COUNT], - if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%d" % args[FLOW_RESUME_COUNT], - print + if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE], + if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], + if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], + if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT], + print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) @@ -464,6 +478,12 @@ class BrokerManager: Usage() qname = args[0] declArgs = {} + for a in config._extra_arguments: + r = a.split("=", 1) + if len(r) == 2: value = r[1] + else: value = None + declArgs[r[0]] = value + if config._durable: declArgs[FILECOUNT] = config._fileCount declArgs[FILESIZE] = config._fileSize Modified: qpid/branches/qpid-2920/qpid/tools/src/py/qpid-printevents URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tools/src/py/qpid-printevents?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tools/src/py/qpid-printevents (original) +++ qpid/branches/qpid-2920/qpid/tools/src/py/qpid-printevents Tue Mar 15 01:54:07 2011 @@ -20,7 +20,7 @@ # import os -import optparse +import optparse from optparse import IndentedHelpFormatter import sys import socket @@ -62,11 +62,11 @@ _usage = "%prog [options] [broker-addr]. _description = \ """ -Collect and print events from one or more Qpid message brokers. +Collect and print events from one or more Qpid message brokers. If no broker-addr is supplied, %prog connects to 'localhost:5672'. -[broker-addr] syntax: +[broker-addr] syntax: [username/password@] hostname ip-address [:<port>] @@ -91,20 +91,20 @@ def main(argv=None): session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True) brokers = [] try: - for host in arguments: - brokers.append(session.addBroker(host, None, options.sasl_mechanism)) - - while (True): - sleep(10) - - except KeyboardInterrupt: - print - return 0 - - except Exception, e: - print "Failed: %s - %s" % (e.__class__.__name__, e) - return 1 - + try: + for host in arguments: + brokers.append(session.addBroker(host, None, options.sasl_mechanism)) + + while (True): + sleep(10) + + except KeyboardInterrupt: + print + return 0 + + except Exception, e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + return 1 finally: while len(brokers): b = brokers.pop() Modified: qpid/branches/qpid-2920/qpid/tools/src/py/qpid-route URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tools/src/py/qpid-route?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tools/src/py/qpid-route (original) +++ qpid/branches/qpid-2920/qpid/tools/src/py/qpid-route Tue Mar 15 01:54:07 2011 @@ -27,18 +27,18 @@ import locale from qmf.console import Session, BrokerURL usage = """ -Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] +Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism] qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange> qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism] qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key> - qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> + qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism] qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue> qpid-route [OPTIONS] route list [<dest-broker>] qpid-route [OPTIONS] route flush [<dest-broker>] qpid-route [OPTIONS] route map [<broker>] - qpid-route [OPTIONS] link add <dest-broker> <src-broker> + qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism] qpid-route [OPTIONS] link del <dest-broker> <src-broker> qpid-route [OPTIONS] link list [<dest-broker>]""" @@ -61,7 +61,7 @@ class Config: self._transport = "tcp" self._ack = 0 self._connTimeout = 10 - self._sasl_mechanism = None + self._client_sasl_mechanism = None config = Config() @@ -95,7 +95,7 @@ def OptionsAndArguments(argv): parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N") parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") - parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") + parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") opts, encArgs = parser.parse_args(args=argv) @@ -131,8 +131,8 @@ def OptionsAndArguments(argv): if opts.ack: config._ack = opts.ack - if opts.sasl_mechanism: - config._sasl_mechanism = opts.sasl_mechanism + if opts.client_sasl_mechanism: + config._client_sasl_mechanism = opts.client_sasl_mechanism return args @@ -143,7 +143,7 @@ class RouteManager: self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() - self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._sasl_mechanism) + self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism) self.broker._waitForStable() self.agent = self.broker.getBrokerAgent() @@ -166,7 +166,7 @@ class RouteManager: return link return None - def addLink(self, remoteBroker, mech="PLAIN"): + def addLink(self, remoteBroker, interbroker_mechanism=""): self.remote = BrokerURL(remoteBroker) if self.local.match(self.remote.host, self.remote.port): raise Exception("Linking broker to itself is not permitted") @@ -176,7 +176,7 @@ class RouteManager: link = self.getLink() if link == None: res = broker.connect(self.remote.host, self.remote.port, config._durable, - mech, self.remote.authName or "", self.remote.authPass or "", + interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "", config._transport) if config._verbose: print "Connect method returned:", res.status, res.text @@ -295,11 +295,11 @@ class RouteManager: if b[0] != self.local.name(): self.qmf.delBroker(b[1]) - def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, mech="PLAIN", dynamic=False): + def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False): if dynamic and config._srclocal: raise Exception("--src-local is not permitted on dynamic routes") - self.addLink(remoteBroker, mech) + self.addLink(remoteBroker, interbroker_mechanism) link = self.getLink() if link == None: raise Exception("Link failed to create") @@ -320,8 +320,8 @@ class RouteManager: if config._verbose: print "Bridge method returned:", res.status, res.text - def addQueueRoute(self, remoteBroker, exchange, queue): - self.addLink(remoteBroker) + def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ): + self.addLink(remoteBroker, interbroker_mechanism) link = self.getLink() if link == None: raise Exception("Link failed to create") @@ -504,10 +504,12 @@ def main(argv=None): rm = RouteManager(localBroker) if group == "link": if cmd == "add": - if nargs != 4: + if nargs < 3 or nargs > 5: Usage() return(-1) - rm.addLink(remoteBroker) + interbroker_mechanism = "" + if nargs > 4: interbroker_mechanism = args[4] + rm.addLink(remoteBroker, interbroker_mechanism) elif cmd == "del": if nargs != 4: Usage() @@ -518,16 +520,17 @@ def main(argv=None): elif group == "dynamic": if cmd == "add": - if nargs < 5 or nargs > 7: + if nargs < 5 or nargs > 8: Usage() return(-1) tag = "" excludes = "" - mech = "PLAIN" + interbroker_mechanism = "" if nargs > 5: tag = args[5] if nargs > 6: excludes = args[6] - rm.addRoute(remoteBroker, args[4], "", tag, excludes, mech, dynamic=True) + if nargs > 7: interbroker_mechanism = args[7] + rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True) elif cmd == "del": if nargs != 5: Usage() @@ -543,11 +546,11 @@ def main(argv=None): tag = "" excludes = "" - mech = "PLAIN" + interbroker_mechanism = "" if nargs > 6: tag = args[6] if nargs > 7: excludes = args[7] - if nargs > 8: mech = args[8] - rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, mech, dynamic=False) + if nargs > 8: interbroker_mechanism = args[8] + rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False) elif cmd == "del": if nargs != 6: Usage() @@ -565,11 +568,13 @@ def main(argv=None): return(-1) elif group == "queue": - if nargs != 6: + if nargs < 6 or nargs > 7: Usage() return(-1) if cmd == "add": - rm.addQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) + interbroker_mechanism = "" + if nargs > 6: interbroker_mechanism = args[6] + rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] ) elif cmd == "del": rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) else: Modified: qpid/branches/qpid-2920/qpid/tools/src/py/qpid-tool URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/tools/src/py/qpid-tool?rev=1081634&r1=1081633&r2=1081634&view=diff ============================================================================== --- qpid/branches/qpid-2920/qpid/tools/src/py/qpid-tool (original) +++ qpid/branches/qpid-2920/qpid/tools/src/py/qpid-tool Tue Mar 15 01:54:07 2011 @@ -259,7 +259,24 @@ class QmfData(Console): return displayId = long(tokens[0]) methodName = tokens[1] - args = tokens[2:] + args = [] + for arg in tokens[2:]: + ## + ## If the argument is a map, list, boolean, integer, or floating (one decimal point), + ## run it through the Python evaluator so it is converted to the correct type. + ## + ## TODO: use a regex for this instead of this convoluted logic, + ## or even consider passing all args through eval() [which would + ## be a minor change to the nterface as string args would then + ## always need to be quoted as strings within a map/list would + ## now] + if arg[0] == '{' or arg[0] == '[' or arg == "True" or arg == "False" or \ + ((arg.count('.') < 2 and (arg.count('-') == 0 or \ + (arg.count('-') == 1 and arg[0] == '-')) and \ + arg.replace('.','').replace('-','').isdigit())): + args.append(eval(arg)) + else: + args.append(arg) obj = None try: --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
