Modified: qpid/branches/QPID-3799-acl/tools/src/py/qpid-stat URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/tools/src/py/qpid-stat?rev=1294242&r1=1294241&r2=1294242&view=diff ============================================================================== --- qpid/branches/QPID-3799-acl/tools/src/py/qpid-stat (original) +++ qpid/branches/QPID-3799-acl/tools/src/py/qpid-stat Mon Feb 27 17:40:42 2012 @@ -31,7 +31,7 @@ home = os.environ.get("QPID_TOOLS_HOME", sys.path.append(os.path.join(home, "python")) from qpidtoollibs.broker import BrokerAgent -from qpidtoollibs.disp import Display, Header, Sorter +from qpidtoollibs.disp import Display, Header, Sorter, YN, Commas, TimeLong class Config: @@ -42,7 +42,6 @@ class Config: self._limit = 50 self._increasing = False self._sortcol = None - self._details = None self._sasl_mechanism = None config = Config() @@ -52,42 +51,43 @@ def OptionsAndArguments(argv): global config - parser = OptionParser(usage="usage: %prog [options] BROKER", - description="Example: $ qpid-stat -q broker-host:10000") + parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]") group1 = OptionGroup(parser, "General Options") - group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", + help="URL of the broker to query") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", + help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") - group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show") - group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") - group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") - group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") - group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") - group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") + group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show") group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") - group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.") + parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) if not opts.show: - parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") + parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") config._types = opts.show config._sortcol = opts.sort_by + config._host = opts.broker config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit config._sasl_mechanism = opts.sasl_mechanism - config._detail = opts.detail - - if args: - config._host = args[0] return args @@ -118,24 +118,23 @@ class IpAddr: class BrokerManager: def __init__(self): - self.brokerName = None - self.connections = [] - self.brokers = [] - self.cluster = None + self.brokerName = None + self.connection = None + self.broker = None + self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.connections.append(Connection(self.url, sasl_mechanism=mechanism)) - self.connections[0].open() - self.brokers.append(BrokerAgent(self.connections[0])) + self.connection = Connection(self.url, sasl_mechanisms=mechanism) + self.connection.open() + self.broker = BrokerAgent(self.connection) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: - for conn in self.connections: - conn.close() + connection.close() except: pass @@ -175,7 +174,7 @@ class BrokerManager: hosts.append(bestUrl) return hosts - def displayBroker(self, subs): + def displayBroker(self): disp = Display(prefix=" ") heads = [] heads.append(Header('uptime', Header.DURATION)) @@ -184,7 +183,7 @@ class BrokerManager: heads.append(Header('exchanges', Header.COMMAS)) heads.append(Header('queues', Header.COMMAS)) rows = [] - broker = self.brokers[0].getBroker() + broker = self.broker.getBroker() connections = self.getConnectionMap() sessions = self.getSessionMap() exchanges = self.getExchangeMap() @@ -229,7 +228,7 @@ class BrokerManager: disp.formattedTable('Aggregate Broker Statistics:', heads, rows) - def displayConn(self, subs): + def displayConn(self): disp = Display(prefix=" ") heads = [] heads.append(Header('client-addr')) @@ -241,8 +240,8 @@ class BrokerManager: heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] - connections = self.brokers[0].getAllConnections() - broker = self.brokers[0].getBroker() + connections = self.broker.getAllConnections() + broker = self.broker.getBroker() for conn in connections: row = [] row.append(conn.address) @@ -262,10 +261,10 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displaySession(self, subs): + def displaySession(self): disp = Display(prefix=" ") - def displayExchange(self, subs): + def displayExchange(self): disp = Display(prefix=" ") heads = [] heads.append(Header("exchange")) @@ -279,7 +278,7 @@ class BrokerManager: heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() for ex in exchanges: row = [] row.append(ex.name) @@ -301,7 +300,7 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueues(self, subs): + def displayQueues(self): disp = Display(prefix=" ") heads = [] heads.append(Header("queue")) @@ -317,7 +316,7 @@ class BrokerManager: heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() for q in queues: row = [] row.append(q.name) @@ -341,11 +340,67 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueue(self, subs): + + def displayQueue(self, name): + queue = self.broker.getQueue(name) + if not queue: + print "Queue '%s' not found" % name + return + disp = Display(prefix=" ") heads = [] + heads.append(Header('Name')) + heads.append(Header('Durable', Header.YN)) + heads.append(Header('AutoDelete', Header.YN)) + heads.append(Header('Exclusive', Header.YN)) + heads.append(Header('FlowStopped', Header.YN)) + heads.append(Header('FlowStoppedCount', Header.COMMAS)) + heads.append(Header('Consumers', Header.COMMAS)) + heads.append(Header('Bindings', Header.COMMAS)) + rows = [] + rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, + queue.flowStopped, queue.flowStoppedCount, + queue.consumerCount, queue.bindingCount]) + disp.formattedTable("Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Property')) + heads.append(Header('Value')) + rows = [] + rows.append(['arguments', queue.arguments]) + rows.append(['alt-exchange', queue.altExchange]) + disp.formattedTable("Optional Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) + rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) + rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) + rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) + rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) + rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) + rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) + rows.append(['acquires', queue.acquires, None]) + rows.append(['releases', queue.releases, None]) + rows.append(['discards-ttl-expired', queue.discardsTtl, None]) + rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) + rows.append(['discards-ring-overflow', queue.discardsRing, None]) + rows.append(['discards-lvq-replace', queue.discardsLvq, None]) + rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) + rows.append(['discards-purged', queue.discardsPurge, None]) + rows.append(['reroutes', queue.reroutes, None]) + disp.formattedTable("Statistics:", heads, rows) - def displaySubscriptions(self, subs): + + def displaySubscriptions(self): disp = Display(prefix=" ") heads = [] heads.append(Header("subscr")) @@ -359,7 +414,7 @@ class BrokerManager: heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] - subscriptions = self.brokers[0].getAllSubscriptions() + subscriptions = self.broker.getAllSubscriptions() sessions = self.getSessionMap() connections = self.getConnectionMap() for s in subscriptions: @@ -392,55 +447,71 @@ class BrokerManager: disp = Display(prefix=" ") heads = [Header('Statistic'), Header('Value', Header.COMMAS)] rows = [] - memory = self.brokers[0].getMemory() + memory = self.broker.getMemory() for k,v in memory.values.items(): if k != 'name': rows.append([k, v]) disp.formattedTable('Broker Memory Statistics:', heads, rows) + def displayAcl(self): + acl = self.broker.getAcl() + if not acl: + print "ACL Policy Module is not installed" + return + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value')] + rows = [] + rows.append(['policy-file', acl.policyFile]) + rows.append(['enforcing', YN(acl.enforcingAcl)]) + rows.append(['has-transfer-acls', YN(acl.transferAcl)]) + rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)]) + rows.append(['acl-denials', Commas(acl.aclDenyCount)]) + disp.formattedTable('ACL Policy Statistics:', heads, rows) + def getExchangeMap(self): - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() emap = {} for e in exchanges: emap[e.name] = e return emap def getQueueMap(self): - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() qmap = {} for q in queues: qmap[q.name] = q return qmap def getSessionMap(self): - sessions = self.brokers[0].getAllSessions() + sessions = self.broker.getAllSessions() smap = {} for s in sessions: smap[s.name] = s return smap def getConnectionMap(self): - connections = self.brokers[0].getAllConnections() + connections = self.broker.getAllConnections() cmap = {} for c in connections: cmap[c.address] = c return cmap - def displayMain(self, main, subs): - if main == 'b': self.displayBroker(subs) - elif main == 'c': self.displayConn(subs) - elif main == 's': self.displaySession(subs) - elif main == 'e': self.displayExchange(subs) + def displayMain(self, names, main): + if main == 'g': self.displayBroker() + elif main == 'c': self.displayConn() + elif main == 's': self.displaySession() + elif main == 'e': self.displayExchange() elif main == 'q': - if config._detail: - self.displayQueue(subs, config._detail) + if len(names) >= 1: + self.displayQueue(names[0]) else: - self.displayQueues(subs) - elif main == 'u': self.displaySubscriptions(subs) - elif main == 'm': self.displayMemory(subs) + self.displayQueues() + elif main == 'u': self.displaySubscriptions() + elif main == 'm': self.displayMemory() + elif main == 'acl': self.displayAcl() - def display(self): - self.displayMain(config._types[0], config._types[1:]) + def display(self, names): + self.displayMain(names, config._types) def main(argv=None): @@ -450,7 +521,7 @@ def main(argv=None): try: bm.SetBroker(config._host, config._sasl_mechanism) - bm.display() + bm.display(args) bm.Disconnect() return 0 except KeyboardInterrupt:
Modified: qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/broker.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/broker.py?rev=1294242&r1=1294241&r2=1294242&view=diff ============================================================================== --- qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/broker.py (original) +++ qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/broker.py Mon Feb 27 17:40:42 2012 @@ -24,6 +24,9 @@ except ImportError: from qpid.datatypes import uuid4 class BrokerAgent(object): + """ + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection. + """ def __init__(self, conn): self.conn = conn self.sess = self.conn.session() @@ -35,6 +38,9 @@ class BrokerAgent(object): self.next_correlator = 1 def close(self): + """ + Close the proxy session. This will not affect the connection used in creating the object. + """ self.sess.close() def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): @@ -124,9 +130,15 @@ class BrokerAgent(object): return None def getCluster(self): + """ + Get the broker's Cluster object. + """ return self._getAllBrokerObjects(Cluster) def getBroker(self): + """ + Get the Broker object that contains broker-scope statistics and operations. + """ # # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because # of a bug that used to be in the broker whereby by-name queries did not return the @@ -173,8 +185,14 @@ class BrokerAgent(object): def getAllBindings(self): return self._getAllBrokerObjects(Binding) - def getBinding(self, exchange=None, queue=None): - pass + def getAllLinks(self): + return self._getAllBrokerObjects(Link) + + def getAcl(self): + objects = self._getAllBrokerObjects(Acl) + if len(objects) > 0: + return objects[0] + return None # Acl module not loaded def echo(self, sequence, body): """Request a response to test the path to the management broker""" @@ -204,23 +222,55 @@ class BrokerAgent(object): """Get the message timestamping configuration""" pass -# def addExchange(self, exchange_type, name, **kwargs): -# pass - -# def delExchange(self, name): -# pass + def addExchange(self, exchange_type, name, options={}, **kwargs): + properties = {} + properties['exchange-type'] = exchange_type + for k,v in options.items(): + properties[k] = v + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'exchange', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delExchange(self, name): + args = {'type': 'exchange', 'name': name} + self._method('delete', args) + + def addQueue(self, name, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'queue', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delQueue(self, name): + args = {'type': 'queue', 'name': name} + self._method('delete', args) + + def bind(self, exchange, queue, key, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'properties': properties, + 'strict': True} + self._method('create', args) + + def unbind(self, exchange, queue, key, **kwargs): + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'strict': True} + self._method('delete', args) -# def addQueue(self, name, **kwargs): -# pass - -# def delQueue(self, name): -# pass - -# def bind(self, exchange, queue, key, **kwargs): -# pass - -# def unbind(self, exchange, queue, key, **kwargs): -# pass + def reloadAclFile(self): + self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def create(self, _type, name, properties, strict): """Create an object of the specified type""" @@ -328,3 +378,10 @@ class Queue(BrokerObject): self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, "org.apache.qpid.broker:queue:%s" % self.name) +class Link(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Acl(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) Modified: qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/disp.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/disp.py?rev=1294242&r1=1294241&r2=1294242&view=diff ============================================================================== --- qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/disp.py (original) +++ qpid/branches/QPID-3799-acl/tools/src/py/qpidtoollibs/disp.py Mon Feb 27 17:40:42 2012 @@ -21,6 +21,31 @@ from time import strftime, gmtime +def YN(val): + if val: + return 'Y' + return 'N' + +def Commas(value): + sval = str(value) + result = "" + while True: + if len(sval) == 0: + return result + left = sval[:-3] + right = sval[-3:] + result = right + result + if len(left) > 0: + result = ',' + result + sval = left + +def TimeLong(value): + return strftime("%c", gmtime(value / 1000000000)) + +def TimeShort(value): + return strftime("%X", gmtime(value / 1000000000)) + + class Header: """ """ NONE = 1 @@ -59,9 +84,9 @@ class Header: return 'Y' return '' if self.format == Header.TIME_LONG: - return strftime("%c", gmtime(value / 1000000000)) + return TimeLong(value) if self.format == Header.TIME_SHORT: - return strftime("%X", gmtime(value / 1000000000)) + return TimeShort(value) if self.format == Header.DURATION: if value < 0: value = 0 sec = value / 1000000000 @@ -78,17 +103,7 @@ class Header: result += "%ds" % (sec % 60) return result if self.format == Header.COMMAS: - sval = str(value) - result = "" - while True: - if len(sval) == 0: - return result - left = sval[:-3] - right = sval[-3:] - result = right + result - if len(left) > 0: - result = ',' + result - sval = left + return Commas(value) except: return "?" --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org