Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=1242526&r1=1242525&r2=1242526&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Thu Feb  9 21:11:41 2012
@@ -21,13 +21,18 @@
 
 import os
 from optparse import OptionParser, OptionGroup
-from time import sleep ### debug
 import sys
 import locale
 import socket
 import re
-from qmf.console import Session, Console
-from qpid.disp import Display, Header, Sorter
+from qpid.messaging import Connection
+
+home = os.environ.get("QPID_TOOLS_HOME", 
os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import BrokerAgent
+from qpidtoollibs.disp import Display, Header, Sorter
+
 
 class Config:
     def __init__(self):
@@ -37,7 +42,7 @@ class Config:
         self._limit = 50
         self._increasing = False
         self._sortcol = None
-        self._cluster_detail = False
+        self._details = None
         self._sasl_mechanism = None
 
 config = Config()
@@ -56,24 +61,16 @@ def OptionsAndArguments(argv):
     parser.add_option_group(group1)
 
     group2 = OptionGroup(parser, "Display Options")
-    group2.add_option("-b", "--broker", help="Show Brokers", 
-                      action="store_const", const="b", dest="show")
-    group2.add_option("-c", "--connections", help="Show Connections",
-                      action="store_const", const="c", dest="show")
-    group2.add_option("-e", "--exchanges", help="Show Exchanges",
-                      action="store_const", const="e", dest="show")
-    group2.add_option("-q", "--queues", help="Show Queues",
-                  action="store_const", const="q", dest="show")
-    group2.add_option("-u", "--subscriptions", help="Show Subscriptions",
-                  action="store_const", const="u", dest="show")
-    group2.add_option("-S", "--sort-by",  metavar="<colname>",
-                  help="Sort by column name")
-    group2.add_option("-I", "--increasing", action="store_true", default=False,
-                  help="Sort by increasing value (default = decreasing)")
-    group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>",
-                  help="Limit output to n rows")
-    group2.add_option("-C", "--cluster", action="store_true", default=False,
-                  help="Display per-broker cluster detail.")
+    group2.add_option("-b", "--broker", help="Show Brokers",               
action="store_const", const="b", dest="show")
+    group2.add_option("-c", "--connections", help="Show Connections",      
action="store_const", const="c", dest="show")
+    group2.add_option("-e", "--exchanges", help="Show Exchanges",          
action="store_const", const="e", dest="show")
+    group2.add_option("-q", "--queues", help="Show Queues",                
action="store_const", const="q", dest="show")
+    group2.add_option("-u", "--subscriptions", help="Show Subscriptions",  
action="store_const", const="u", dest="show")
+    group2.add_option("-m", "--memory", help="Show Broker Memory Stats",   
action="store_const", const="m", dest="show")
+    group2.add_option("-S", "--sort-by",  metavar="<colname>",                 
  help="Sort by column name")
+    group2.add_option("-I", "--increasing", action="store_true", 
default=False,  help="Sort by increasing value (default = decreasing)")
+    group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>",  
  help="Limit output to n rows")
+    group2.add_option("-D", "--details", action="store", metavar="<name>", 
dest="detail", default=None, help="Display details on a single object.")
     parser.add_option_group(group2)
 
     opts, args = parser.parse_args(args=argv)
@@ -86,8 +83,8 @@ def OptionsAndArguments(argv):
     config._connTimeout = opts.timeout
     config._increasing = opts.increasing
     config._limit = opts.limit
-    config._cluster_detail = opts.cluster
     config._sasl_mechanism = opts.sasl_mechanism
+    config._detail = opts.detail
 
     if args:
         config._host = args[0]
@@ -119,86 +116,26 @@ class IpAddr:
                 bestAddr = addrPort
         return bestAddr
 
-class Broker(object):
-    def __init__(self, qmf, broker):
-        self.broker = broker
-
-        agents = qmf.getAgents()
-        for a in agents:
-            if a.getAgentBank() == '0':
-                self.brokerAgent = a
-
-        bobj = qmf.getObjects(_class="broker", 
_package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
-        self.currentTime = bobj.getTimestamps()[0]
-        try:
-            self.uptime = bobj.uptime
-        except:
-            self.uptime = 0
-        self.connections = {}
-        self.sessions = {}
-        self.exchanges = {}
-        self.queues = {}
-        self.subscriptions = {}
-        package = "org.apache.qpid.broker"
-
-        list = qmf.getObjects(_class="connection", _package=package, 
_agent=self.brokerAgent)
-        for conn in list:
-            if not conn.shadow:
-                self.connections[conn.getObjectId()] = conn
-
-        list = qmf.getObjects(_class="session", _package=package, 
_agent=self.brokerAgent)
-        for sess in list:
-            if sess.connectionRef in self.connections:
-                self.sessions[sess.getObjectId()] = sess
-
-        list = qmf.getObjects(_class="exchange", _package=package, 
_agent=self.brokerAgent)
-        for exchange in list:
-            self.exchanges[exchange.getObjectId()] = exchange
-
-        list = qmf.getObjects(_class="queue", _package=package, 
_agent=self.brokerAgent)
-        for queue in list:
-            self.queues[queue.getObjectId()] = queue
-
-        list = qmf.getObjects(_class="subscription", _package=package, 
_agent=self.brokerAgent)
-        for subscription in list:
-            self.subscriptions[subscription.getObjectId()] = subscription
-
-    def getName(self):
-        return self.broker.getUrl()
-
-    def getCurrentTime(self):
-        return self.currentTime
-
-    def getUptime(self):
-        return self.uptime
-
-class BrokerManager(Console):
+class BrokerManager:
     def __init__(self):
-        self.brokerName = None
-        self.qmf        = None
-        self.broker     = None
-        self.brokers    = []
-        self.cluster    = None
+        self.brokerName  = None
+        self.connections = []
+        self.brokers     = []
+        self.cluster     = None
 
     def SetBroker(self, brokerUrl, mechanism):
         self.url = brokerUrl
-        self.qmf = Session()
-        self.mechanism = mechanism
-        self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, 
mechanism)
-        agents = self.qmf.getAgents()
-        for a in agents:
-            if a.getAgentBank() == '0':
-                self.brokerAgent = a
+        self.connections.append(Connection(self.url, sasl_mechanism=mechanism))
+        self.connections[0].open()
+        self.brokers.append(BrokerAgent(self.connections[0]))
 
     def Disconnect(self):
         """ Release any allocated brokers.  Ignore any failures as the tool is
         shutting down.
         """
         try:
-            if self.broker:
-                self.qmf.delBroker(self.broker)
-            else:
-                for b in self.brokers: self.qmf.delBroker(b.broker)
+            for conn in self.connections:
+                conn.close()
         except:
             pass
 
@@ -238,62 +175,63 @@ class BrokerManager(Console):
             hosts.append(bestUrl)
         return hosts
 
-    def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, 
exchange=None, queue=None):
-        if len(subs) == 0:
-            return
-        this = subs[0]
-        remaining = subs[1:]
-        newindent = indent + "  "
-        if this == 'b':
-            pass
-        elif this == 'c':
-            if broker:
-                for oid in broker.connections:
-                    iconn = broker.connections[oid]
-                    self.printConnSub(indent, broker.getName(), iconn)
-                    self.displaySubs(remaining, newindent, broker=broker, 
conn=iconn,
-                                     sess=sess, exchange=exchange, queue=queue)
-        elif this == 's':
-            pass
-        elif this == 'e':
-            pass
-        elif this == 'q':
-            pass
-        print
-
     def displayBroker(self, subs):
         disp = Display(prefix="  ")
         heads = []
-        heads.append(Header('broker'))
-        heads.append(Header('cluster'))
         heads.append(Header('uptime', Header.DURATION))
-        heads.append(Header('conn', Header.KMG))
-        heads.append(Header('sess', Header.KMG))
-        heads.append(Header('exch', Header.KMG))
-        heads.append(Header('queue', Header.KMG))
+        heads.append(Header('connections', Header.COMMAS))
+        heads.append(Header('sessions', Header.COMMAS))
+        heads.append(Header('exchanges', Header.COMMAS))
+        heads.append(Header('queues', Header.COMMAS))
         rows = []
-        for broker in self.brokers:
-            if self.cluster:
-                ctext = "%s(%s)" % (self.cluster.clusterName, 
self.cluster.status)
-            else:
-                ctext = "<standalone>"
-            row = (broker.getName(), ctext, broker.getUptime(),
-                   len(broker.connections), len(broker.sessions),
-                   len(broker.exchanges), len(broker.queues))
-            rows.append(row)
-        title = "Brokers"
-        if config._sortcol:
-            sorter = Sorter(heads, rows, config._sortcol, config._limit, 
config._increasing)
-            dispRows = sorter.getSorted()
-        else:
-            dispRows = rows
-        disp.formattedTable(title, heads, dispRows)
+        broker = self.brokers[0].getBroker()
+        connections = self.getConnectionMap()
+        sessions = self.getSessionMap()
+        exchanges = self.getExchangeMap()
+        queues = self.getQueueMap()
+        row = (broker.getUpdateTime() - broker.getCreateTime(),
+               len(connections), len(sessions),
+               len(exchanges), len(queues))
+        rows.append(row)
+        disp.formattedTable('Broker Summary:', heads, rows)
+
+        if 'queueCount' not in broker.values:
+            return
+
+        print
+        heads = []
+        heads.append(Header('Statistic'))
+        heads.append(Header('Messages', Header.COMMAS))
+        heads.append(Header('Bytes', Header.COMMAS))
+        rows = []
+        rows.append(['queue-depth',         broker.msgDepth, broker.byteDepth])
+        rows.append(['total-enqueues',      broker.msgTotalEnqueues, 
broker.byteTotalEnqueues])
+        rows.append(['total-dequeues',      broker.msgTotalDequeues, 
broker.byteTotalDequeues])
+        rows.append(['persistent-enqueues', broker.msgPersistEnqueues, 
broker.bytePersistEnqueues])
+        rows.append(['persistent-dequeues', broker.msgPersistDequeues, 
broker.bytePersistDequeues])
+        rows.append(['transactional-enqueues', broker.msgTxnEnqueues, 
broker.byteTxnEnqueues])
+        rows.append(['transactional-dequeues', broker.msgTxnDequeues, 
broker.byteTxnDequeues])
+        rows.append(['flow-to-disk-depth', broker.msgFtdDepth, 
broker.byteFtdDepth])
+        rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, 
broker.byteFtdEnqueues])
+        rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, 
broker.byteFtdDequeues])
+        rows.append(['acquires', broker.acquires, None])
+        rows.append(['releases', broker.releases, None])
+        rows.append(['discards-no-route', broker.discardsNoRoute, None])
+        rows.append(['discards-ttl-expired', broker.discardsTtl, None])
+        rows.append(['discards-limit-overflow', broker.discardsOverflow, None])
+        rows.append(['discards-ring-overflow', broker.discardsRing, None])
+        rows.append(['discards-lvq-replace', broker.discardsLvq, None])
+        rows.append(['discards-subscriber-reject', broker.discardsSubscriber, 
None])
+        rows.append(['discards-purged', broker.discardsPurge, None])
+        rows.append(['reroutes', broker.reroutes, None])
+        rows.append(['abandoned', broker.abandoned, None])
+        rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None])
+        disp.formattedTable('Aggregate Broker Statistics:', heads, rows)
+
 
     def displayConn(self, subs):
         disp = Display(prefix="  ")
         heads = []
-        if self.cluster:
-            heads.append(Header('broker'))
         heads.append(Header('client-addr'))
         heads.append(Header('cproc'))
         heads.append(Header('cpid'))
@@ -303,25 +241,20 @@ class BrokerManager(Console):
         heads.append(Header('msgIn', Header.KMG))
         heads.append(Header('msgOut', Header.KMG))
         rows = []
-        for broker in self.brokers:
-            for oid in broker.connections:
-                conn = broker.connections[oid]
-                row = []
-                if self.cluster:
-                    row.append(broker.getName())
-                row.append(conn.address)
-                row.append(conn.remoteProcessName)
-                row.append(conn.remotePid)
-                row.append(conn.authIdentity)
-                row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
-                idle = broker.getCurrentTime() - conn.getTimestamps()[0]
-                row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
-                row.append(conn.msgsFromClient)
-                row.append(conn.msgsToClient)
-                rows.append(row)
+        connections = self.brokers[0].getAllConnections()
+        broker = self.brokers[0].getBroker()
+        for conn in connections:
+            row = []
+            row.append(conn.address)
+            row.append(conn.remoteProcessName)
+            row.append(conn.remotePid)
+            row.append(conn.authIdentity)
+            row.append(broker.getUpdateTime() - conn.getCreateTime())
+            row.append(broker.getUpdateTime() - conn.getUpdateTime())
+            row.append(conn.msgsFromClient)
+            row.append(conn.msgsToClient)
+            rows.append(row)
         title = "Connections"
-        if self.cluster:
-            title += " for cluster '%s'" % self.cluster.clusterName
         if config._sortcol:
             sorter = Sorter(heads, rows, config._sortcol, config._limit, 
config._increasing)
             dispRows = sorter.getSorted()
@@ -335,8 +268,6 @@ class BrokerManager(Console):
     def displayExchange(self, subs):
         disp = Display(prefix="  ")
         heads = []
-        if self.cluster:
-            heads.append(Header('broker'))
         heads.append(Header("exchange"))
         heads.append(Header("type"))
         heads.append(Header("dur", Header.Y))
@@ -348,26 +279,21 @@ class BrokerManager(Console):
         heads.append(Header("byteOut", Header.KMG))
         heads.append(Header("byteDrop", Header.KMG))
         rows = []
-        for broker in self.brokers:
-            for oid in broker.exchanges:
-                ex = broker.exchanges[oid]
-                row = []
-                if self.cluster:
-                    row.append(broker.getName())
-                row.append(ex.name)
-                row.append(ex.type)
-                row.append(ex.durable)
-                row.append(ex.bindingCount)
-                row.append(ex.msgReceives)
-                row.append(ex.msgRoutes)
-                row.append(ex.msgDrops)
-                row.append(ex.byteReceives)
-                row.append(ex.byteRoutes)
-                row.append(ex.byteDrops)
-                rows.append(row)
+        exchanges = self.brokers[0].getAllExchanges()
+        for ex in exchanges:
+            row = []
+            row.append(ex.name)
+            row.append(ex.type)
+            row.append(ex.durable)
+            row.append(ex.bindingCount)
+            row.append(ex.msgReceives)
+            row.append(ex.msgRoutes)
+            row.append(ex.msgDrops)
+            row.append(ex.byteReceives)
+            row.append(ex.byteRoutes)
+            row.append(ex.byteDrops)
+            rows.append(row)
         title = "Exchanges"
-        if self.cluster:
-            title += " for cluster '%s'" % self.cluster.clusterName
         if config._sortcol:
             sorter = Sorter(heads, rows, config._sortcol, config._limit, 
config._increasing)
             dispRows = sorter.getSorted()
@@ -375,11 +301,9 @@ class BrokerManager(Console):
             dispRows = rows
         disp.formattedTable(title, heads, dispRows)
 
-    def displayQueue(self, subs):
+    def displayQueues(self, subs):
         disp = Display(prefix="  ")
         heads = []
-        if self.cluster:
-            heads.append(Header('broker'))
         heads.append(Header("queue"))
         heads.append(Header("dur", Header.Y))
         heads.append(Header("autoDel", Header.Y))
@@ -393,28 +317,23 @@ class BrokerManager(Console):
         heads.append(Header("cons", Header.KMG))
         heads.append(Header("bind", Header.KMG))
         rows = []
-        for broker in self.brokers:
-            for oid in broker.queues:
-                q = broker.queues[oid]
-                row = []
-                if self.cluster:
-                    row.append(broker.getName())
-                row.append(q.name)
-                row.append(q.durable)
-                row.append(q.autoDelete)
-                row.append(q.exclusive)
-                row.append(q.msgDepth)
-                row.append(q.msgTotalEnqueues)
-                row.append(q.msgTotalDequeues)
-                row.append(q.byteDepth)
-                row.append(q.byteTotalEnqueues)
-                row.append(q.byteTotalDequeues)
-                row.append(q.consumerCount)
-                row.append(q.bindingCount)
-                rows.append(row)
+        queues = self.brokers[0].getAllQueues()
+        for q in queues:
+            row = []
+            row.append(q.name)
+            row.append(q.durable)
+            row.append(q.autoDelete)
+            row.append(q.exclusive)
+            row.append(q.msgDepth)
+            row.append(q.msgTotalEnqueues)
+            row.append(q.msgTotalDequeues)
+            row.append(q.byteDepth)
+            row.append(q.byteTotalEnqueues)
+            row.append(q.byteTotalDequeues)
+            row.append(q.consumerCount)
+            row.append(q.bindingCount)
+            rows.append(row)
         title = "Queues"
-        if self.cluster:
-            title += " for cluster '%s'" % self.cluster.clusterName
         if config._sortcol:
             sorter = Sorter(heads, rows, config._sortcol, config._limit, 
config._increasing)
             dispRows = sorter.getSorted()
@@ -422,46 +341,46 @@ class BrokerManager(Console):
             dispRows = rows
         disp.formattedTable(title, heads, dispRows)
 
+    def displayQueue(self, subs):
+        disp = Display(prefix="  ")
+        heads = []
+
     def displaySubscriptions(self, subs):
         disp = Display(prefix="  ")
         heads = []
-        if self.cluster:
-            heads.append(Header('broker'))
-        heads.append(Header("subscription"))
+        heads.append(Header("subscr"))
         heads.append(Header("queue"))
-        heads.append(Header("connection"))
-        heads.append(Header("processName"))
-        heads.append(Header("processId"))
-        heads.append(Header("browsing", Header.Y))
-        heads.append(Header("acknowledged", Header.Y))
-        heads.append(Header("exclusive", Header.Y))
+        heads.append(Header("conn"))
+        heads.append(Header("procName"))
+        heads.append(Header("procId"))
+        heads.append(Header("browse", Header.Y))
+        heads.append(Header("acked", Header.Y))
+        heads.append(Header("excl", Header.Y))
         heads.append(Header("creditMode"))
         heads.append(Header("delivered", Header.KMG))
         rows = []
-        for broker in self.brokers:
-            for oid in broker.subscriptions:
-                s = broker.subscriptions[oid]
-                row = []
-                try:
-                    if self.cluster:
-                        row.append(broker.getName())
-                    row.append(s.name)
-                    
row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name)
-                    connectionRef = 
self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef
-                    
row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address)
-                    
row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName)
-                    
row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid)
-                    row.append(s.browsing)
-                    row.append(s.acknowledged)
-                    row.append(s.exclusive)
-                    row.append(s.creditMode)
-                    row.append(s.delivered)
-                    rows.append(row)
-                except:
-                    pass
+        subscriptions = self.brokers[0].getAllSubscriptions()
+        sessions = self.getSessionMap()
+        connections = self.getConnectionMap()
+        for s in subscriptions:
+            row = []
+            try:
+                row.append(s.name)
+                row.append(s.queueRef)
+                session = sessions[s.sessionRef]
+                connection = connections[session.connectionRef]
+                row.append(connection.address)
+                row.append(connection.remoteProcessName)
+                row.append(connection.remotePid)
+                row.append(s.browsing)
+                row.append(s.acknowledged)
+                row.append(s.exclusive)
+                row.append(s.creditMode)
+                row.append(s.delivered)
+                rows.append(row)
+            except:
+                pass
         title = "Subscriptions"
-        if self.cluster:
-            title += " for cluster '%s'" % self.cluster.clusterName
         if config._sortcol:
             sorter = Sorter(heads, rows, config._sortcol, config._limit, 
config._increasing)
             dispRows = sorter.getSorted()
@@ -469,33 +388,58 @@ class BrokerManager(Console):
             dispRows = rows
         disp.formattedTable(title, heads, dispRows)
 
+    def displayMemory(self, unused):
+        disp = Display(prefix="  ")
+        heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
+        rows = []
+        memory = self.brokers[0].getMemory()
+        for k,v in memory.values.items():
+            if k != 'name':
+                rows.append([k, v])
+        disp.formattedTable('Broker Memory Statistics:', heads, rows)
+
+    def getExchangeMap(self):
+        exchanges = self.brokers[0].getAllExchanges()
+        emap = {}
+        for e in exchanges:
+            emap[e.name] = e
+        return emap
+
+    def getQueueMap(self):
+        queues = self.brokers[0].getAllQueues()
+        qmap = {}
+        for q in queues:
+            qmap[q.name] = q
+        return qmap
+
+    def getSessionMap(self):
+        sessions = self.brokers[0].getAllSessions()
+        smap = {}
+        for s in sessions:
+            smap[s.name] = s
+        return smap
+
+    def getConnectionMap(self):
+        connections = self.brokers[0].getAllConnections()
+        cmap = {}
+        for c in connections:
+            cmap[c.address] = c
+        return cmap
+
     def displayMain(self, main, subs):
         if   main == 'b': self.displayBroker(subs)
         elif main == 'c': self.displayConn(subs)
         elif main == 's': self.displaySession(subs)
         elif main == 'e': self.displayExchange(subs)
-        elif main == 'q': self.displayQueue(subs)
+        elif main == 'q':
+            if config._detail:
+                self.displayQueue(subs, config._detail)
+            else:
+                self.displayQueues(subs)
         elif main == 'u': self.displaySubscriptions(subs)
+        elif main == 'm': self.displayMemory(subs)
 
     def display(self):
-        if config._cluster_detail or config._types[0] == 'b':
-            # always show cluster detail when dumping broker stats
-            self._getCluster()
-        if self.cluster:
-            memberList = self.cluster.members.split(";")
-            hostList = self._getHostList(memberList)
-            self.qmf.delBroker(self.broker)
-            self.broker = None
-            if config._host.find("@") > 0:
-                authString = config._host.split("@")[0] + "@"
-            else:
-                authString = ""
-            for host in hostList:
-                b = self.qmf.addBroker(authString + host, config._connTimeout)
-                self.brokers.append(Broker(self.qmf, b))
-        else:
-            self.brokers.append(Broker(self.qmf, self.broker))
-
         self.displayMain(config._types[0], config._types[1:])
 
 

Added: qpid/trunk/qpid/tools/src/py/qpidtoollibs/__init__.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidtoollibs/__init__.py?rev=1242526&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidtoollibs/__init__.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpidtoollibs/__init__.py Thu Feb  9 21:11:41 
2012
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#

Added: qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py?rev=1242526&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py Thu Feb  9 21:11:41 2012
@@ -0,0 +1,322 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import Message
+try:
+  from uuid import uuid4
+except ImportError:
+  from qpid.datatypes import uuid4
+
+class BrokerAgent(object):
+  def __init__(self, conn):
+    self.conn = conn
+    self.sess = self.conn.session()
+    self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, 
link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+        str(uuid4())
+    self.reply_rx = self.sess.receiver(self.reply_to)
+    self.reply_rx.capacity = 10
+    self.tx = self.sess.sender("qmf.default.direct/broker")
+    self.next_correlator = 1
+
+  def close(self):
+    self.sess.close()
+
+  def _method(self, method, arguments, 
addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
+    props = {'method'             : 'request',
+             'qmf.opcode'         : '_method_request',
+             'x-amqp-0-10.app-id' : 'qmf2'}
+    correlator = str(self.next_correlator)
+    self.next_correlator += 1
+
+    content = {'_object_id'   : {'_object_name' : addr},
+               '_method_name' : method,
+               '_arguments'   : arguments}
+
+    message = Message(content, reply_to=self.reply_to, 
correlation_id=correlator,
+                      properties=props, subject="broker")
+    self.tx.send(message)
+    response = self.reply_rx.fetch(timeout)
+    self.sess.acknowledge()
+    if response.properties['qmf.opcode'] == '_exception':
+      raise Exception("Exception from Agent: %r" % response.content['_values'])
+    if response.properties['qmf.opcode'] != '_method_response':
+      raise Exception("bad response: %r" % response.properties)
+    return response.content['_arguments']
+
+  def _sendRequest(self, opcode, content):
+    props = {'method'             : 'request',
+             'qmf.opcode'         : opcode,
+             'x-amqp-0-10.app-id' : 'qmf2'}
+    correlator = str(self.next_correlator)
+    self.next_correlator += 1
+    message = Message(content, reply_to=self.reply_to, 
correlation_id=correlator,
+                      properties=props, subject="broker")
+    self.tx.send(message)
+    return correlator
+
+  def _doClassQuery(self, class_name):
+    query = {'_what'      : 'OBJECT',
+             '_schema_id' : {'_class_name' : class_name}}
+    correlator = self._sendRequest('_query_request', query)
+    response = self.reply_rx.fetch(10)
+    if response.properties['qmf.opcode'] != '_query_response':
+      raise Exception("bad response")
+    items = []
+    done = False
+    while not done:
+      for item in response.content:
+        items.append(item)
+      if 'partial' in response.properties:
+        response = self.reply_rx.fetch(10)
+      else:
+        done = True
+      self.sess.acknowledge()
+    return items
+
+  def _doNameQuery(self, class_name, object_name, 
package_name='org.apache.qpid.broker'):
+    query = {'_what'      : 'OBJECT',
+             '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, 
class_name, object_name)}}
+    correlator = self._sendRequest('_query_request', query)
+    response = self.reply_rx.fetch(10)
+    if response.properties['qmf.opcode'] != '_query_response':
+      raise Exception("bad response")
+    items = []
+    done = False
+    while not done:
+      for item in response.content:
+        items.append(item)
+      if 'partial' in response.properties:
+        response = self.reply_rx.fetch(10)
+      else:
+        done = True
+      self.sess.acknowledge()
+    if len(items) == 1:
+      return items[0]
+    return None
+
+  def _getAllBrokerObjects(self, cls):
+    items = self._doClassQuery(cls.__name__.lower())
+    objs = []
+    for item in items:
+      objs.append(cls(self, item))
+    return objs
+    
+  def _getBrokerObject(self, cls, name):
+    obj = self._doNameQuery(cls.__name__.lower(), name)
+    if obj:
+      return cls(self, obj)
+    return None
+
+  def getCluster(self):
+    return self._getAllBrokerObjects(Cluster)
+
+  def getBroker(self):
+    return self._getBrokerObject(Broker, "amqp-broker")
+
+  def getMemory(self):
+    return self._getAllBrokerObjects(Memory)[0]
+
+  def getAllConnections(self):
+    return self._getAllBrokerObjects(Connection)
+
+  def getConnection(self, name):
+    return self._getBrokerObject(Connection, name)
+
+  def getAllSessions(self):
+    return self._getAllBrokerObjects(Session)
+
+  def getSession(self, name):
+    return self._getBrokerObject(Session, name)
+
+  def getAllSubscriptions(self):
+    return self._getAllBrokerObjects(Subscription)
+
+  def getSubscription(self, name):
+    return self._getBrokerObject(Subscription, name)
+
+  def getAllExchanges(self):
+    return self._getAllBrokerObjects(Exchange)
+
+  def getExchange(self, name):
+    return self._getBrokerObject(Exchange, name)
+
+  def getAllQueues(self):
+    return self._getAllBrokerObjects(Queue)
+
+  def getQueue(self, name):
+    return self._getBrokerObject(Queue, name)
+
+  def getAllBindings(self):
+    return self._getAllBrokerObjects(Binding)
+
+  def getBinding(self, exchange=None, queue=None):
+    pass
+
+  def echo(self, sequence, body):
+    """Request a response to test the path to the management broker"""
+    pass
+
+  def connect(self, host, port, durable, authMechanism, username, password, 
transport):
+    """Establish a connection to another broker"""
+    pass
+
+  def queueMoveMessages(self, srcQueue, destQueue, qty):
+    """Move messages from one queue to another"""
+    pass
+
+  def setLogLevel(self, level):
+    """Set the log level"""
+    pass
+
+  def getLogLevel(self):
+    """Get the log level"""
+    pass
+
+  def setTimestampConfig(self, receive):
+    """Set the message timestamping configuration"""
+    pass
+
+  def getTimestampConfig(self):
+    """Get the message timestamping configuration"""
+    pass
+
+#  def addExchange(self, exchange_type, name, **kwargs):
+#    pass
+
+#  def delExchange(self, name):
+#    pass
+
+#  def addQueue(self, name, **kwargs):
+#    pass
+
+#  def delQueue(self, name):
+#    pass
+
+#  def bind(self, exchange, queue, key, **kwargs):
+#    pass
+
+#  def unbind(self, exchange, queue, key, **kwargs):
+#    pass
+
+  def create(self, _type, name, properties, strict):
+    """Create an object of the specified type"""
+    pass
+
+  def delete(self, _type, name, options):
+    """Delete an object of the specified type"""
+    pass
+
+  def query(self, _type, name):
+    """Query the current state of an object"""
+    return self._getBrokerObject(self, _type, name)
+
+
+class BrokerObject(object):
+  def __init__(self, broker, content):
+    self.broker = broker
+    self.content = content
+    self.values = content['_values']
+
+  def __getattr__(self, key):
+    if key not in self.values:
+      return None
+    value = self.values[key]
+    if value.__class__ == dict and '_object_name' in value:
+      full_name = value['_object_name']
+      colon = full_name.find(':')
+      if colon > 0:
+        full_name = full_name[colon+1:]
+        colon = full_name.find(':')
+        if colon > 0:
+          return full_name[colon+1:]
+    return value
+
+  def getAttributes(self):
+    return self.values
+
+  def getCreateTime(self):
+    return self.content['_create_ts']
+
+  def getDeleteTime(self):
+    return self.content['_delete_ts']
+
+  def getUpdateTime(self):
+    return self.content['_update_ts']
+
+  def update(self):
+    """
+    Reload the property values from the agent.
+    """
+    refreshed = self.broker._getBrokerObject(self.__class__, self.name)
+    if refreshed:
+      self.content = refreshed.content
+      self.values = self.content['_values']
+    else:
+      raise Exception("No longer exists on the broker")
+
+class Broker(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+class Memory(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+class Connection(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+  def close(self):
+    pass
+
+class Session(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+class Subscription(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+  def __repr__(self):
+    return "subscription name undefined"
+
+class Exchange(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+class Binding(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+  def __repr__(self):
+    return "Binding key: %s" % self.values['bindingKey']
+
+class Queue(BrokerObject):
+  def __init__(self, broker, values):
+    BrokerObject.__init__(self, broker, values)
+
+  def purge(self, request):
+    """Discard all or some messages on a queue"""
+    self.broker._method("purge", {'request':request}, 
"org.apache.qpid.broker:queue:%s" % self.name)
+
+  def reroute(self, request, useAltExchange, exchange, filter={}):
+    """Remove all or some messages on this queue and route them to an 
exchange"""
+    self.broker._method("reroute", 
{'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter},
+                        "org.apache.qpid.broker:queue:%s" % self.name)
+

Added: qpid/trunk/qpid/tools/src/py/qpidtoollibs/disp.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidtoollibs/disp.py?rev=1242526&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidtoollibs/disp.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpidtoollibs/disp.py Thu Feb  9 21:11:41 2012
@@ -0,0 +1,249 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from time import strftime, gmtime
+
+class Header:
+  """ """
+  NONE = 1
+  KMG = 2
+  YN = 3
+  Y = 4
+  TIME_LONG = 5
+  TIME_SHORT = 6
+  DURATION = 7
+  COMMAS = 8
+
+  def __init__(self, text, format=NONE):
+    self.text = text
+    self.format = format
+
+  def __repr__(self):
+    return self.text
+
+  def __str__(self):
+    return self.text
+
+  def formatted(self, value):
+    try:
+      if value == None:
+        return ''
+      if self.format == Header.NONE:
+        return value
+      if self.format == Header.KMG:
+        return self.num(value)
+      if self.format == Header.YN:
+        if value:
+          return 'Y'
+        return 'N'
+      if self.format == Header.Y:
+        if value:
+          return 'Y'
+        return ''
+      if self.format == Header.TIME_LONG:
+         return strftime("%c", gmtime(value / 1000000000))
+      if self.format == Header.TIME_SHORT:
+         return strftime("%X", gmtime(value / 1000000000))
+      if self.format == Header.DURATION:
+        if value < 0: value = 0
+        sec = value / 1000000000
+        min = sec / 60
+        hour = min / 60
+        day = hour / 24
+        result = ""
+        if day > 0:
+          result = "%dd " % day
+        if hour > 0 or result != "":
+          result += "%dh " % (hour % 24)
+        if min > 0 or result != "":
+          result += "%dm " % (min % 60)
+        result += "%ds" % (sec % 60)
+        return result
+      if self.format == Header.COMMAS:
+        sval = str(value)
+        result = ""
+        while True:
+          if len(sval) == 0:
+            return result
+          left = sval[:-3]
+          right = sval[-3:]
+          result = right + result
+          if len(left) > 0:
+            result = ',' + result
+          sval = left
+    except:
+      return "?"
+
+  def numCell(self, value, tag):
+    fp = float(value) / 1000.
+    if fp < 10.0:
+      return "%1.2f%c" % (fp, tag)
+    if fp < 100.0:
+      return "%2.1f%c" % (fp, tag)
+    return "%4d%c" % (value / 1000, tag)
+
+  def num(self, value):
+    if value < 1000:
+      return "%4d" % value
+    if value < 1000000:
+      return self.numCell(value, 'k')
+    value /= 1000
+    if value < 1000000:
+      return self.numCell(value, 'm')
+    value /= 1000
+    return self.numCell(value, 'g')
+
+
+class Display:
+  """ Display formatting for QPID Management CLI """
+  
+  def __init__(self, spacing=2, prefix="    "):
+    self.tableSpacing    = spacing
+    self.tablePrefix     = prefix
+    self.timestampFormat = "%X"
+
+  def formattedTable(self, title, heads, rows):
+    fRows = []
+    for row in rows:
+      fRow = []
+      col = 0
+      for cell in row:
+        fRow.append(heads[col].formatted(cell))
+        col += 1
+      fRows.append(fRow)
+    headtext = []
+    for head in heads:
+      headtext.append(head.text)
+    self.table(title, headtext, fRows)
+
+  def table(self, title, heads, rows):
+    """ Print a table with autosized columns """
+
+    # Pad the rows to the number of heads
+    for row in rows:
+      diff = len(heads) - len(row)
+      for idx in range(diff):
+        row.append("")
+
+    print title
+    if len (rows) == 0:
+      return
+    colWidth = []
+    col      = 0
+    line     = self.tablePrefix
+    for head in heads:
+      width = len (head)
+      for row in rows:
+        cellWidth = len (unicode (row[col]))
+        if cellWidth > width:
+          width = cellWidth
+      colWidth.append (width + self.tableSpacing)
+      line = line + head
+      if col < len (heads) - 1:
+        for i in range (colWidth[col] - len (head)):
+          line = line + " "
+      col = col + 1
+    print line
+    line = self.tablePrefix
+    for width in colWidth:
+      for i in range (width):
+        line = line + "="
+    print line
+
+    for row in rows:
+      line = self.tablePrefix
+      col  = 0
+      for width in colWidth:
+        line = line + unicode (row[col])
+        if col < len (heads) - 1:
+          for i in range (width - len (unicode (row[col]))):
+            line = line + " "
+        col = col + 1
+      print line
+
+  def do_setTimeFormat (self, fmt):
+    """ Select timestamp format """
+    if fmt == "long":
+      self.timestampFormat = "%c"
+    elif fmt == "short":
+      self.timestampFormat = "%X"
+
+  def timestamp (self, nsec):
+    """ Format a nanosecond-since-the-epoch timestamp for printing """
+    return strftime (self.timestampFormat, gmtime (nsec / 1000000000))
+
+  def duration(self, nsec):
+    if nsec < 0: nsec = 0
+    sec = nsec / 1000000000
+    min = sec / 60
+    hour = min / 60
+    day = hour / 24
+    result = ""
+    if day > 0:
+      result = "%dd " % day
+    if hour > 0 or result != "":
+      result += "%dh " % (hour % 24)
+    if min > 0 or result != "":
+      result += "%dm " % (min % 60)
+    result += "%ds" % (sec % 60)
+    return result
+
+class Sortable:
+  """ """
+  def __init__(self, row, sortIndex):
+    self.row = row
+    self.sortIndex = sortIndex
+    if sortIndex >= len(row):
+      raise Exception("sort index exceeds row boundary")
+
+  def __cmp__(self, other):
+    return cmp(self.row[self.sortIndex], other.row[self.sortIndex])
+
+  def getRow(self):
+    return self.row
+
+class Sorter:
+  """ """
+  def __init__(self, heads, rows, sortCol, limit=0, inc=True):
+    col = 0
+    for head in heads:
+      if head.text == sortCol:
+        break
+      col += 1
+    if col == len(heads):
+      raise Exception("sortCol '%s', not found in headers" % sortCol)
+
+    list = []
+    for row in rows:
+      list.append(Sortable(row, col))
+    list.sort()
+    if not inc:
+      list.reverse()
+    count = 0
+    self.sorted = []
+    for row in list:
+      self.sorted.append(row.getRow())
+      count += 1
+      if count == limit:
+        break
+
+  def getSorted(self):
+    return self.sorted



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to