Author: aconway
Date: Mon Feb 13 16:18:02 2012
New Revision: 1243578

URL: http://svn.apache.org/viewvc?rev=1243578&view=rev
Log:
QPID-3603: Speed up qpid-ha-tool with fast QMF2 method calls.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
    qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-tool

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1243578&r1=1243577&r2=1243578&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Feb 13 
16:18:02 2012
@@ -70,7 +70,7 @@ HaBroker::HaBroker(broker::Broker& b, co
         throw Exception("Cannot start HA: management is disabled");
     if (ma) {
         _qmf::Package  packageInit(ma);
-        mgmtObject = new _qmf::HaBroker(ma, this);
+        mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
         mgmtObject->set_status(BACKUP);
         ma->addObject(mgmtObject);
     }

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml?rev=1243578&r1=1243577&r2=1243578&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/management-schema.xml Mon 
Feb 13 16:18:02 2012
@@ -21,6 +21,7 @@
 
   <!-- Monitor and control HA status of a broker. -->
   <class name="HaBroker">
+    <property name="name"   type="sstr" access="RC" index="y" desc="Primary 
Key"/>
     <property name="status" type="sstr" desc="HA status: primary or backup"/>
     <property name="clientAddresses" type="sstr" desc="List of addresses used 
by clients to connect to the HA cluster."/>
     <property name="brokerAddresses" type="sstr" desc="List of addresses used 
by HA brokers to connect to each other."/>

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1243578&r1=1243577&r2=1243578&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Mon Feb 13 
16:18:02 2012
@@ -281,12 +281,12 @@ class ShortTests(BrokerTest):
         assert retry(lambda: not is_running(primary.pid)) # Wait for primary 
to die
         backup.promote()
         n = receiver.received       # Make sure we are still running
-        assert retry(lambda: receiver.received > n + 10)
+        # FIXME aconway 2012-02-01: c++ client has 1 sec min retry, hence long 
timeout
+        assert retry(lambda: receiver.received > n + 10, timeout=5)
         sender.stop()
         receiver.stop()
 
     def test_backup_failover(self):
-        # FIXME aconway 2012-01-30: UNFINISHED
         brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
                     for name in ["a","b","c"] ]
         url = ",".join([b.host_port() for b in brokers])
@@ -300,7 +300,6 @@ class ShortTests(BrokerTest):
         brokers[2].promote()            # c must fail over to b.
         brokers[2].connect().session().sender("q").send("b")
         self.assert_browse_backup(brokers[1], "q", ["a","b"])
-        # FIXME aconway 2012-01-30: finish
         for b in brokers[1:]: b.kill()
 
 if __name__ == "__main__":

Modified: qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-tool
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-tool?rev=1243578&r1=1243577&r2=1243578&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-tool (original)
+++ qpid/branches/qpid-3603-2/qpid/tools/src/py/qpid-ha-tool Mon Feb 13 
16:18:02 2012
@@ -21,6 +21,114 @@
 
 import qmf.console, optparse, sys
 from qpid.management import managementChannel, managementClient
+from qpid.messaging import Connection
+from qpid.messaging import Message as QpidMessage
+try:
+  from uuid import uuid4
+except ImportError:
+  from qpid.datatypes import uuid4
+
+# Utility for doing fast qmf2 operations on a broker.
+class QmfBroker(object):
+  def __init__(self, conn):
+    self.conn = conn
+    self.sess = self.conn.session()
+    self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, 
link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+        str(uuid4())
+    self.reply_rx = self.sess.receiver(self.reply_to)
+    self.reply_rx.capacity = 10
+    self.tx = self.sess.sender("qmf.default.direct/broker")
+    self.next_correlator = 1
+
+  def close(self):
+    self.conn.close()
+
+  def __repr__(self):
+    return "Qpid Broker: %s" % self.url
+
+  def _method(self, method, arguments, 
addr="org.apache.qpid.broker:broker:amqp-broker"):
+    props = {'method'             : 'request',
+             'qmf.opcode'         : '_method_request',
+             'x-amqp-0-10.app-id' : 'qmf2'}
+    correlator = str(self.next_correlator)
+    self.next_correlator += 1
+
+    content = {'_object_id'   : {'_object_name' : addr},
+               '_method_name' : method,
+               '_arguments'   : arguments}
+
+    message = QpidMessage(content, reply_to=self.reply_to, 
correlation_id=correlator,
+                          properties=props, subject="broker")
+    self.tx.send(message)
+    response = self.reply_rx.fetch(10)
+    if response.properties['qmf.opcode'] == '_exception':
+      raise Exception("Exception from Agent: %r" % response.content['_values'])
+    if response.properties['qmf.opcode'] != '_method_response':
+      raise Exception("bad response: %r" % response.properties)
+    return response.content['_arguments']
+
+  def _sendRequest(self, opcode, content):
+    props = {'method'             : 'request',
+             'qmf.opcode'         : opcode,
+             'x-amqp-0-10.app-id' : 'qmf2'}
+    correlator = str(self.next_correlator)
+    self.next_correlator += 1
+    message = QpidMessage(content, reply_to=self.reply_to, 
correlation_id=correlator,
+                          properties=props, subject="broker")
+    self.tx.send(message)
+    return correlator
+
+  def _doClassQuery(self, class_name):
+    query = {'_what'      : 'OBJECT',
+             '_schema_id' : {'_class_name' : class_name}}
+    correlator = self._sendRequest('_query_request', query)
+    response = self.reply_rx.fetch(10)
+    if response.properties['qmf.opcode'] != '_query_response':
+      raise Exception("bad response")
+    items = []
+    done = False
+    while not done:
+      for item in response.content:
+        items.append(item['_values'])
+      if 'partial' in response.properties:
+        response = self.reply_rx.fetch(10)
+      else:
+        done = True
+    return items
+
+  def _doNameQuery(self, class_name, object_name, 
package_name='org.apache.qpid.broker'):
+    query = {'_what'      : 'OBJECT',
+             '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, 
class_name, object_name)}}
+    correlator = self._sendRequest('_query_request', query)
+    response = self.reply_rx.fetch(10)
+    if response.properties['qmf.opcode'] != '_query_response':
+      raise Exception("bad response")
+    items = []
+    done = False
+    while not done:
+      for item in response.content:
+        items.append(item['_values'])
+      if 'partial' in response.properties:
+        response = self.reply_rx.fetch(10)
+      else:
+        done = True
+    if len(items) == 1:
+      return items[0]
+    return None
+
+  def _getAllBrokerObjects(self, cls):
+    items = self._doClassQuery(cls.__name__.lower())
+    objs = []
+    for item in items:
+      objs.append(cls(self, item))
+    return objs
+
+  def _getBrokerObject(self, cls, name):
+    obj = self._doNameQuery(cls.__name__.lower(), name)
+    if obj:
+      return cls(self, obj)
+    return None
+
 
 op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]")
 
@@ -33,42 +141,41 @@ op.add_option("-b", "--broker-addresses"
 op.add_option("-q", "--query", action="store_true",
              help="Show the current HA settings on the broker.")
 
-class HaBroker:
-    def __init__(self, session, broker):
-        self.session = session
-       self.qmf_broker = self.session.addBroker(
-            broker, client_properties={"qpid.ha-admin":1})
-       ha_brokers = self.session.getObjects(
-            _class="habroker", _package="org.apache.qpid.ha")
-       if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
-       self.ha_broker = ha_brokers[0]
-
-    def query(self):
-       self.ha_broker.update()
-       print "status=", self.ha_broker.status
-       print "broker-addresses=", self.ha_broker.brokerAddresses
-       print "client-addresses=", self.ha_broker.clientAddresses
+def get_ha_broker(qmf_broker):
+    ha_brokers = qmf_broker._doClassQuery("habroker")
+    if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
+    return ha_brokers[0]
 
 def main(argv):
     try:
        opts, args = op.parse_args(argv)
        if len(args) >1: broker = args[1]
        else: broker = "localhost:5672"
-        session = qmf.console.Session()
+        conn = Connection.establish(broker, 
client_properties={"qpid.ha-admin":1})
+        ha_broker = "org.apache.qpid.ha:habroker:ha-broker"
        try:
-            hb = HaBroker(session, broker)
+            qmf_broker = QmfBroker(conn)
+            get_ha_broker(qmf_broker)   # Verify that HA is enabled
            action=False
            if opts.promote:
-                hb.ha_broker.promote(); action=True
+                qmf_broker._method("promote", {}, ha_broker)
+                action=True
            if opts.broker_addresses:
-                hb.ha_broker.setBrokerAddresses(opts.broker_addresses); 
action=True
+                qmf_broker._method('setBrokerAddresses', 
{'brokerAddresses':opts.broker_addresses}, ha_broker)
+                action=True
            if opts.client_addresses:
-                hb.ha_broker.setClientAddresses(opts.client_addresses); 
action=True
-           if opts.query or not action: hb.query()
+                qmf_broker._method('setClientAddresses', 
{'clientAddresses':opts.client_addresses}, ha_broker)
+                action=True
+           if opts.query or not action:
+                hb = get_ha_broker(qmf_broker)
+                print "status=%s"%hb["status"]
+                print "broker-addresses=%s"%hb["brokerAddresses"]
+                print "client-addresses=%s"%hb["clientAddresses"]
            return 0
        finally:
-           session.close()      # Avoid errors shutting down threads.
+           conn.close()      # Avoid errors shutting down threads.
     except Exception, e:
+        raise                           # FIXME aconway 2012-01-31:
         print e
         return 1
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to