Propchange: qpid/branches/java-config-and-management/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:r1339579-1339789
Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/ ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src:r1339579-1339789 Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:r1339579-1339789 Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:r1339579-1339789 Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:r1339579-1339789 Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:r1339579-1339789 Propchange: qpid/branches/java-config-and-management/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:r1339579-1339789 Propchange: qpid/branches/java-config-and-management/qpid/packaging/windows/ ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/packaging/windows:r1300143-1349442,1349444-1349530,1349532-1353860 Propchange: qpid/branches/java-config-and-management/qpid/python/ ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/python:r1300143-1349442,1349444-1349530,1349532-1353860 Propchange: qpid/branches/java-config-and-management/qpid/python/examples/api/spout ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/python/examples/api/spout:r1300143-1349442,1349444-1349530,1349532-1353860 Propchange: qpid/branches/java-config-and-management/qpid/python/qpid/concurrency.py ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/python/qpid/concurrency.py:r1300143-1349442,1349444-1349530,1349532-1353860 Modified: qpid/branches/java-config-and-management/qpid/python/qpid/connection.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/python/qpid/connection.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/python/qpid/connection.py (original) +++ qpid/branches/java-config-and-management/qpid/python/qpid/connection.py Thu Jun 28 09:14:52 2012 @@ -170,6 +170,10 @@ class Connection(Framer): if not status: self.detach_all() break + # When we do not use SSL transport, we get periodic + # spurious timeout events on the socket. When using SSL, + # these events show up as timeout *errors*. Both should be + # ignored unless we have aborted. except socket.timeout: if self.aborted(): self.close_code = (None, "connection timed out") @@ -178,9 +182,12 @@ class Connection(Framer): else: continue except socket.error, e: - self.close_code = (None, str(e)) - self.detach_all() - break + if self.aborted() or str(e) != "The read operation timed out": + self.close_code = (None, str(e)) + self.detach_all() + break + else: + continue frame_dec.write(data) seg_dec.write(*frame_dec.read()) op_dec.write(*seg_dec.read()) Modified: qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py (original) +++ qpid/branches/java-config-and-management/qpid/python/qpid/messaging/driver.py Thu Jun 28 09:14:52 2012 @@ -226,7 +226,11 @@ class LinkIn: def do_link(self, sst, rcv, _rcv, type, subtype, action): link_opts = _rcv.options.get("link", {}) - reliability = link_opts.get("reliability", "at-least-once") + if type == "topic": + default_reliability = "unreliable" + else: + default_reliability = "at-least-once" + reliability = link_opts.get("reliability", default_reliability) declare = link_opts.get("x-declare", {}) subscribe = link_opts.get("x-subscribe", {}) acq_mode = acquire_mode.pre_acquired Modified: qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py (original) +++ qpid/branches/java-config-and-management/qpid/python/qpid/messaging/util.py Thu Jun 28 09:14:52 2012 @@ -50,10 +50,13 @@ def set_reconnect_urls(conn, msg): reconnect_urls = [] urls = msg.properties["amq.failover"] for u in urls: + # FIXME aconway 2012-06-12: Nasty hack parsing of the C++ broker's URL format. if u.startswith("amqp:"): - for p in u[5:].split(","): - parts = p.split(":") - host, port = parts[1:3] + for a in u[5:].split(","): + parts = a.split(":") + # Handle IPv6 addresses which have : in the host part. + port = parts[-1] # Last : separated field is port + host = ":".join(parts[1:-1]) # First : separated field is protocol, host is the rest. reconnect_urls.append("%s:%s" % (host, port)) conn.reconnect_urls = reconnect_urls log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls) Modified: qpid/branches/java-config-and-management/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/specs/management-schema.xml?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/specs/management-schema.xml (original) +++ qpid/branches/java-config-and-management/qpid/specs/management-schema.xml Thu Jun 28 09:14:52 2012 @@ -8,9 +8,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -81,7 +81,6 @@ <property name="systemRef" type="objId" references="System" access="RO" desc="System ID" parentRef="y"/> <property name="port" type="uint16" access="RO" desc="TCP Port for AMQP Service"/> <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> - <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> <property name="mgmtPublish" type="bool" access="RO" desc="Broker's management agent sends unsolicited data on the publish interval"/> @@ -125,8 +124,8 @@ <statistic name="abandonedViaAlt" type="count64" unit="message" desc="Messages routed to alternate exchange from a deleted queue"/> <method name="echo" desc="Request a response to test the path to the management broker"> - <arg name="sequence" dir="IO" type="uint32" default="0"/> - <arg name="body" dir="IO" type="lstr" default=""/> + <arg name="sequence" dir="IO" type="uint32"/> + <arg name="body" dir="IO" type="lstr"/> </method> <method name="connect" desc="Establish a connection to another broker"> @@ -143,7 +142,7 @@ <arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/> <arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/> <arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/> - <arg name="filter" dir="I" type="map" default="{}" desc="if specified, move only those messages matching this filter"/> + <arg name="filter" dir="I" type="map" desc="if specified, move only those messages matching this filter"/> </method> <method name="setLogLevel" desc="Set the log level"> @@ -164,20 +163,20 @@ <method name="create" desc="Create an object of the specified type"> <arg name="type" dir="I" type="sstr" desc="The type of object to create"/> - <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> - <arg name="properties" dir="I" type="map" desc="Type specific object properties"/> - <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/> + <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> + <arg name="properties" dir="I" type="map" desc="Type specific object properties"/> + <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/> </method> <method name="delete" desc="Delete an object of the specified type"> <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/> - <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/> - <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> + <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/> + <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> </method> <method name="query" desc="Query the current state of an object."> <arg name="type" dir="I" type="sstr" desc="The type of object to query."/> - <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/> + <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/> <arg name="results" dir="O" type="map" desc="A snapshot of the object's state."/> </method> @@ -272,14 +271,14 @@ <method name="purge" desc="Discard all or some messages on a queue"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> - <arg name="filter" dir="I" type="map" default="{}" desc="if specified, purge only those messages matching this filter"/> + <arg name="filter" dir="I" type="map" desc="if specified, purge only those messages matching this filter"/> </method> <method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> <arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/> <arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/> - <arg name="filter" dir="I" type="map" default="{}" desc="if specified, reroute only those messages matching this filter"/> + <arg name="filter" dir="I" type="map" desc="if specified, reroute only those messages matching this filter"/> </method> </class> @@ -321,7 +320,7 @@ <statistic name="msgMatched" type="count64"/> </class> - + <!-- =============================================================== Subscription @@ -338,7 +337,7 @@ <property name="arguments" type="map" access="RC"/> <statistic name="delivered" type="count64" unit="message" desc="Messages delivered"/> </class> - + <!-- =============================================================== Connection @@ -366,7 +365,7 @@ <statistic name="msgsFromClient" type="count64"/> <statistic name="msgsToClient" type="count64"/> - <method name="close"/> + <method name="close"/> </class> <!-- @@ -379,15 +378,17 @@ This class represents an inter-broker connection. <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> - <property name="host" type="sstr" access="RC" index="y"/> - <property name="port" type="uint16" access="RC" index="y"/> - <property name="transport" type="sstr" access="RC"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="host" type="sstr" access="RO"/> + <property name="port" type="uint16" access="RO"/> + <property name="transport" type="sstr" access="RO"/> <property name="durable" type="bool" access="RC"/> + <property name="connectionRef" type="objId" references="Connection" access="RO"/> <statistic name="state" type="sstr" desc="Operational state of the link"/> <statistic name="lastError" type="lstr" desc="Reason link is not operational"/> - <method name="close"/> + <method name="close"/> <method name="bridge" desc="Bridge messages over the link"> <arg name="durable" dir="I" type="bool"/> @@ -411,7 +412,8 @@ --> <class name="Bridge"> <property name="linkRef" type="objId" references="Link" access="RC" index="y" parentRef="y"/> - <property name="channelId" type="uint16" access="RC" index="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="channelId" type="uint16" access="RO"/> <property name="durable" type="bool" access="RC"/> <property name="src" type="sstr" access="RC"/> <property name="dest" type="sstr" access="RC"/> @@ -422,7 +424,7 @@ <property name="excludes" type="sstr" access="RC"/> <property name="dynamic" type="bool" access="RC"/> <property name="sync" type="uint16" access="RC"/> - <method name="close"/> + <method name="close"/> </class> @@ -441,7 +443,7 @@ <property name="expireTime" type="absTime" access="RO" optional="y"/> <property name="maxClientRate" type="uint32" access="RO" unit="msgs/sec" optional="y"/> - <statistic name="framesOutstanding" type="count32"/> + <statistic name="unackedMessages" type="uint64" unit="message" desc="Unacknowledged messages in the session"/> <statistic name="TxnStarts" type="count64" unit="transaction" desc="Total transactions started "/> <statistic name="TxnCommits" type="count64" unit="transaction" desc="Total transactions committed"/> Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original) +++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Thu Jun 28 09:14:52 2012 @@ -36,3 +36,4 @@ from extensions import * from msg_groups import * from new_api import * from stats import * +from qmf_events import * Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original) +++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Thu Jun 28 09:14:52 2012 @@ -302,9 +302,10 @@ class ManagementTest (TestBase010): twenty = range(1,21) props = session.delivery_properties(routing_key="routing_key") + mp = session.message_properties(application_headers={'x-qpid.trace' : 'A,B,C'}) for count in twenty: body = "Reroute Message %d" % count - msg = Message(props, body) + msg = Message(props, mp, body) session.message_transfer(destination="amq.direct", message=msg) pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] @@ -317,6 +318,16 @@ class ManagementTest (TestBase010): self.assertEqual(pq.msgDepth,19) self.assertEqual(aq.msgDepth,1) + "Verify that the trace was cleared on the rerouted message" + url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port) + conn = qpid.messaging.Connection(url) + conn.open() + sess = conn.session() + rx = sess.receiver("alt-queue1;{mode:browse}") + rm = rx.fetch(1) + self.assertEqual(rm.properties['x-qpid.trace'], '') + conn.close() + "Reroute top 9 messages from reroute-queue to alt.direct2" result = pq.reroute(9, False, "alt.direct2", {}) self.assertEqual(result.status, 0) @@ -385,6 +396,30 @@ class ManagementTest (TestBase010): # Cleanup for e in ["A", "B"]: session.exchange_delete(exchange=e) + def test_reroute_invalid_alt_exchange(self): + """ + Test that an error is returned for an attempt to reroute to + alternate exchange on a queue for which no such exchange has + been defined. + """ + self.startQmf() + session = self.session + # create queue with no alt-exchange, and send a message to it + session.queue_declare(queue="q", exclusive=True, auto_delete=True) + props = session.delivery_properties(routing_key="q") + session.message_transfer(message=Message(props, "don't reroute me!")) + + # attempt to reroute the message to alt-exchange + q = self.qmf.getObjects(_class="queue", name="q")[0] + result = q.reroute(1, True, "", {}) + # verify the attempt fails... + self.assertEqual(result.status, 4) #invalid parameter + + # ...and message is still on the queue + self.subscribe(destination="d", queue="q") + self.assertEqual("don't reroute me!", session.incoming("d").get(timeout=1).body) + + def test_methods_async (self): """ """ Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original) +++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Thu Jun 28 09:14:52 2012 @@ -1122,6 +1122,70 @@ class MultiConsumerMsgGroupTests(Base): snd.close() + def test_ttl_expire(self): + """ Verify that expired (TTL) group messages are skipped correctly + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") + + groups = ["A","B","C","A","B","C"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + if m.properties['THE-GROUP'] == 'B': + m.ttl = 1; + snd.send(m) + + sleep(2) # let all B's expire + + # create consumers on separate sessions: C1,C2 + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":0}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":0}) + + # C1 should acquire A-0, then C2 should acquire C-2, Group B should + # expire and never be fetched + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + m2 = c2.fetch(0); + assert m2.properties['THE-GROUP'] == 'C' + assert m2.content['index'] == 2 + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 3 + + m2 = c2.fetch(0); + assert m2.properties['THE-GROUP'] == 'C' + assert m2.content['index'] == 5 + + # there should be no more left for either consumer + try: + mx = c1.fetch(0) + assert False # should never get here + except Empty: + pass + try: + mx = c2.fetch(0) + assert False # should never get here + except Empty: + pass + + c1.session.acknowledge() + c2.session.acknowledge() + c1.close() + c2.close() + snd.close() + + class StickyConsumerMsgGroupTests(Base): """ Tests for the behavior of sticky-consumer message groups. These tests Modified: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py (original) +++ qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py Thu Jun 28 09:14:52 2012 @@ -57,7 +57,7 @@ class GeneralTests(Base): sess2 = self.setup_session() tx = sess1.sender("amq.direct/key") - rx_main = sess1.receiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}") + rx_main = sess1.receiver("amq.direct/key;{link:{reliability:at-least-once,x-declare:{alternate-exchange:'amq.fanout'}}}") rx_alt = sess2.receiver("amq.fanout") rx_alt.capacity = 10 Propchange: qpid/branches/java-config-and-management/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py ------------------------------------------------------------------------------ Merged /qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:r1300143-1349442,1349444-1349530,1349532-1353860 Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/.gitignore Thu Jun 28 09:14:52 2012 @@ -19,4 +19,5 @@ # with the License. You may obtain a copy of the License at /qpid-clusterc /qpid-configc +/qpid-hac /qpid-routec Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/qmf-tool Thu Jun 28 09:14:52 2012 @@ -266,7 +266,7 @@ class QmfData: self.conn_options = conn_options self.qmf_options = qmf_options self.agent_filter = '[]' - self.connection = cqpid.Connection(self.url, self.conn_options) + self.connection = cqpid.Connection(self.url, **self.conn_options) self.connection.open() self.session = qmf2.ConsoleSession(self.connection, self.qmf_options) self.session.setAgentFilter(self.agent_filter) Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-config Thu Jun 28 09:14:52 2012 @@ -481,7 +481,7 @@ class BrokerManager: if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY], if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: - print "--alternate-exchange=%s" % q._altExchange_.name, + print "--alternate-exchange=%s" % q.altExchange, if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE], if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-ha Thu Jun 28 09:14:52 2012 @@ -35,18 +35,18 @@ HA_BROKER = "org.apache.qpid.ha:habroker class Command: commands = {} - def __init__(self, name, help, args=[]): + def __init__(self, name, help, arg_names=[]): Command.commands[name] = self self.name = name - self.args = args - usage="%s [options] %s\n\n%s"%(name, " ".join(args), help) + self.arg_names = arg_names + usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help) self.help = help self.op=optparse.OptionParser(usage) self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>") - def execute(self): - opts, args = self.op.parse_args() - if len(args) != len(self.args)+1: + def execute(self, args): + opts, args = self.op.parse_args(args) + if len(args) != len(self.arg_names)+1: self.op.print_help() raise Exception("Wrong number of arguments") broker = opts.broker or "localhost:5672" @@ -93,29 +93,31 @@ class SetCmd(Command): Command.__init__(self, "set", "Set HA configuration settings") def add(optname, metavar, type, help): self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") - add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other") - add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers") + add("--brokers-url", "<url>", "string", "URL with address of each broker in the cluster. Used by brokers to connect to each other.") + add("--public-url", "<url>", "string", "URL advertised to clients to connect to the cluster. May be a list or a VIP.") add("--backups", "<n>", "int", "Expect <n> backups to be running"), def do_execute(self, qmf_broker, ha_broker, opts, args): - if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER) - if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER) + if (opts.brokers_url): qmf_broker._method("setBrokersUrl", {"url":opts.brokers_url}, HA_BROKER) + if (opts.public_url): qmf_broker._method("setPublicUrl", {"url":opts.public_url}, HA_BROKER) if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) SetCmd() class QueryCmd(Command): def __init__(self): - Command.__init__(self, "query", "Print HA configuration settings") + Command.__init__(self, "query", "Print HA configuration and status") def do_execute(self, qmf_broker, ha_broker, opts, args): hb = ha_broker for x in [("Status:", hb.status), - ("Brokers URL:", hb.brokers), - ("Public URL:", hb.publicBrokers), - ("Expected Backups:", hb.expectedBackups) + ("Brokers URL:", hb.brokersUrl), + ("Public URL:", hb.publicUrl), + ("Expected Backups:", hb.expectedBackups), + ("Replicate: ", hb.replicateDefault) ]: print "%-20s %s"%(x[0], x[1]) + QueryCmd() def print_usage(prog): @@ -143,7 +145,7 @@ def main(argv): if not command: print_usage(os.path.basename(argv[0])); return 1; - if command.execute(): return 1 + if command.execute(args): return 1 except Exception, e: print e return 1 Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-printevents Thu Jun 28 09:14:52 2012 @@ -21,34 +21,85 @@ import os import optparse -from optparse import IndentedHelpFormatter import sys -import socket -from time import time, strftime, gmtime, sleep -from qmf.console import Console, Session +from optparse import IndentedHelpFormatter +from time import time, strftime, gmtime, sleep +from threading import Lock, Condition, Thread +from qpid.messaging import Connection +import qpid.messaging.exceptions + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpidtoollibs.broker import EventHelper + + +class Printer(object): + """ + This class serializes printed lines so that events coming from different + threads don't overlap each other. + """ + def __init__(self): + self.lock = Lock() - -class EventConsole(Console): - def event(self, broker, event): - print event - sys.stdout.flush() - - def brokerConnected(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl() + def pr(self, text): + self.lock.acquire() + try: + print text + finally: + self.lock.release() sys.stdout.flush() + - def brokerConnectionFailed(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc)) - sys.stdout.flush() +class EventReceiver(Thread): + """ + One instance of this class is created for each broker that is being monitored. + This class does not use the "reconnect" option because it needs to report as + events when the connection is established and when it's lost. + """ + def __init__(self, printer, url, mechanism, options): + Thread.__init__(self) + self.printer = printer + self.url = url + self.mechanism = mechanism + self.options = options + self.running = True + self.helper = EventHelper() + + def cancel(self): + self.running = False + + def run(self): + isOpen = False + while self.running: + try: + conn = Connection.establish(self.url, sasl_mechanisms=self.mechanism, client_properties=self.options) + isOpen = True + self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerConnected broker=%s" % self.url) + + sess = conn.session() + rx = sess.receiver(self.helper.eventAddress()) + + while self.running: + try: + msg = rx.fetch(1) + event = self.helper.event(msg) + self.printer.pr(event.__repr__()) + sess.acknowledge() + except qpid.messaging.exceptions.Empty: + pass + + except Exception, e: + if isOpen: + self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerDisconnected broker=%s" % self.url) + isOpen = False + sleep(1) - def brokerDisconnected(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl() - sys.stdout.flush() class JHelpFormatter(IndentedHelpFormatter): - """Format usage and description without stripping newlines from usage strings """ - + Format usage and description without stripping newlines from usage strings + """ def format_usage(self, usage): return usage @@ -87,16 +138,23 @@ def main(argv=None): if len(arguments) == 0: arguments.append("localhost") - console = EventConsole() - session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True) - brokers = [] + brokers = [] + mechanism = options.sasl_mechanism + props = {'qpid.ha-admin' : 1} + printer = Printer() + + if options.heartbeats: + props['heartbeat'] = 5 + try: try: for host in arguments: - brokers.append(session.addBroker(host, None, options.sasl_mechanism)) + er = EventReceiver(printer, host, mechanism, props) + brokers.append(er) + er.start() - while (True): - sleep(10) + while (True): + sleep(10) except KeyboardInterrupt: print @@ -106,9 +164,10 @@ def main(argv=None): print "Failed: %s - %s" % (e.__class__.__name__, e) return 1 finally: - while len(brokers): - b = brokers.pop() - session.delBroker(b) + for b in brokers: + b.cancel() + for b in brokers: + b.join() if __name__ == '__main__': sys.exit(main()) Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-stat Thu Jun 28 09:14:52 2012 @@ -52,7 +52,16 @@ def OptionsAndArguments(argv): global config - parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]") + usage = \ +"""%prog -g [options] + %prog -c [options] + %prog -e [options] + %prog -q [options] [queue-name] + %prog -u [options] + %prog -m [options] + %prog --acl [options]""" + + parser = OptionParser(usage=usage) group1 = OptionGroup(parser, "General Options") group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", @@ -64,7 +73,7 @@ def OptionsAndArguments(argv): group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") parser.add_option_group(group1) - group2 = OptionGroup(parser, "Display Options") + group2 = OptionGroup(parser, "Command Options") group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") @@ -72,12 +81,14 @@ def OptionsAndArguments(argv): group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show") - group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") - group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") - group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") - parser.add_option_group(group2) + group3 = OptionGroup(parser, "Display Options") + group3.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") + group3.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") + group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") + parser.add_option_group(group3) + opts, args = parser.parse_args(args=argv) if not opts.show: @@ -416,7 +427,8 @@ class BrokerManager: heads.append(Header("acked", Header.Y)) heads.append(Header("excl", Header.Y)) heads.append(Header("creditMode")) - heads.append(Header("delivered", Header.KMG)) + heads.append(Header("delivered", Header.COMMAS)) + heads.append(Header("sessUnacked", Header.COMMAS)) rows = [] subscriptions = self.broker.getAllSubscriptions() sessions = self.getSessionMap() @@ -436,6 +448,7 @@ class BrokerManager: row.append(s.exclusive) row.append(s.creditMode) row.append(s.delivered) + row.append(session.unackedMessages) rows.append(row) except: pass Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpid-tool Thu Jun 28 09:14:52 2012 @@ -455,6 +455,7 @@ class QmfData(Console): rows.append(row) else: print "No object found with ID %d" % dispId + return finally: self.lock.release() self.disp.table(caption, heads, rows) Modified: qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py (original) +++ qpid/branches/java-config-and-management/qpid/tools/src/py/qpidtoollibs/broker.py Thu Jun 28 09:14:52 2012 @@ -18,6 +18,7 @@ # from qpid.messaging import Message +from qpidtoollibs.disp import TimeLong try: from uuid import uuid4 except ImportError: @@ -190,6 +191,9 @@ class BrokerAgent(object): def getAcl(self): return self._getSingleObject(Acl) + def getMemory(self): + return self._getSingleObject(Memory) + def echo(self, sequence, body): """Request a response to test the path to the management broker""" pass @@ -268,6 +272,20 @@ class BrokerAgent(object): def reloadAclFile(self): self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): + args = {'userId': userName, + 'action': action, + 'object': aclObj, + 'objectName': aclObjName, + 'propertyMap': propMap} + return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def acl_lookupPublish(self, userName, exchange, key): + args = {'userId': userName, + 'exchangeName': exchange, + 'routingKey': key} + return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + def create(self, _type, name, properties, strict): """Create an object of the specified type""" pass @@ -281,6 +299,41 @@ class BrokerAgent(object): return self._getBrokerObject(self, _type, oid) +class EventHelper(object): + def eventAddress(self, pkg='*', cls='*', sev='*'): + return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev) + + def event(self, msg): + return BrokerEvent(msg) + + +class BrokerEvent(object): + def __init__(self, msg): + self.msg = msg + self.content = msg.content[0] + self.values = self.content['_values'] + self.schema_id = self.content['_schema_id'] + self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name']) + + def __repr__(self): + rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name) + for k,v in self.values.items(): + rep = rep + " %s=%s" % (k, v) + return rep + + def __getattr__(self, key): + if key not in self.values: + return None + value = self.values[key] + return value + + def getAttributes(self): + return self.values + + def getTimestamp(self): + return self.content['_timestamp'] + + class BrokerObject(object): def __init__(self, broker, content): self.broker = broker @@ -348,7 +401,7 @@ class Connection(BrokerObject): BrokerObject.__init__(self, broker, values) def close(self): - pass + self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) class Session(BrokerObject): def __init__(self, broker, values): Modified: qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp?rev=1354874&r1=1354873&r2=1354874&view=diff ============================================================================== --- qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp (original) +++ qpid/branches/java-config-and-management/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp Thu Jun 28 09:14:52 2012 @@ -49,6 +49,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/Connection.h" +#include "qpid/framing/FieldValue.h" #include <map> @@ -472,13 +473,15 @@ INT ResourceManager::recover(XID *xids, try { // status if we can't talk to the broker status = XAER_RMFAIL; - std::vector<std::string> wireFormatXids; DtxRecoverResult dtxrr = qpidSession.dtxRecover(true); // status if we can't process the xids status = XAER_RMERR; - dtxrr.getInDoubt().collect(wireFormatXids); + + std::vector<std::string> wireFormatXids(dtxrr.getInDoubt().size()); + std::transform(dtxrr.getInDoubt().begin(), dtxrr.getInDoubt().end(), wireFormatXids.begin(), Array::get<std::string, Array::ValuePtr>); + size_t nXids = wireFormatXids.size(); if (nXids > 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
