Author: aconway
Date: Fri Jun 13 19:08:16 2014
New Revision: 1602494
URL: http://svn.apache.org/r1602494
Log:
DISPATCH-56: Refactor qdstat tool and system_test framework to use
management.amqp.Node.
Added:
qpid/dispatch/trunk/tools/qdstat
- copied, changed from r1602493, qpid/dispatch/trunk/tools/qdstat.in
Removed:
qpid/dispatch/trunk/tools/qdstat.in
Modified:
qpid/dispatch/trunk/CMakeLists.txt
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_broker.py
qpid/dispatch/trunk/tests/system_tests_management.py
Modified: qpid/dispatch/trunk/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Fri Jun 13 19:08:16 2014
@@ -97,9 +97,6 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined"
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/conditionals.h.in
${CMAKE_CURRENT_BINARY_DIR}/conditionals.h)
-configure_file(${CMAKE_CURRENT_SOURCE_DIR}/tools/qdstat.in
- ${CMAKE_CURRENT_BINARY_DIR}/tools/qdstat)
-
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/build_env.py.in
${CMAKE_CURRENT_BINARY_DIR}/build_env.py)
@@ -157,7 +154,7 @@ install(FILES etc/qdrouterd.conf DESTINA
## Python modules installation
##
set(TOOLS_EXECUTABLES
- ${CMAKE_CURRENT_BINARY_DIR}/tools/qdstat
+ ${CMAKE_CURRENT_SOURCE_DIR}/tools/qdstat
)
set(DOC_FILES
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py Fri
Jun 13 19:08:16 2014
@@ -23,6 +23,7 @@ AMQP management tools for Qpid dispatch.
import proton, re, threading, httplib
from collections import namedtuple
+from entity import Entity
class Error(Exception): pass
@@ -67,8 +68,6 @@ class Url:
self.password = kwargs.get('password')
self.host = kwargs.get('host')
self.port = kwargs.get('port')
- if self.host is None:
- raise ValueError('Host required for url')
self.path = kwargs.get('path')
elif isinstance(s, Url):
self.scheme = s.scheme
@@ -97,10 +96,10 @@ class Url:
s += self.user
if self.password:
s += ":%s@" % self.password
- if ':' not in self.host:
- s += self.host
- else:
+ if self.host and ':' in self.host:
s += "[%s]" % self.host
+ else:
+ s += self.host or '0.0.0.0'
if self.port:
s += ":%s" % self.port
if self.path:
@@ -140,7 +139,7 @@ class Node(object):
NODE_TYPE='org.amqp.management' # AMQP management node type
NODE_PROPERTIES={'name':SELF, 'type':NODE_TYPE}
- def __init__(self, address, router=None, locales=None):
+ def __init__(self, address=None, router=None, locales=None):
"""
@param address: AMQP address of the management node.
@param router: If address does not contain a path, use the management
node for this router ID.
@@ -171,7 +170,8 @@ class Node(object):
self.messenger = None
def __del__(self):
- self.stop()
+ if hasattr(self, 'messenger'):
+ self.stop()
def _flush(self):
"""Call self.messenger.work() till there is no work left."""
@@ -235,14 +235,18 @@ class Node(object):
self.check_response(response)
return response
- class QueryResult(namedtuple('QueryResult', ['attribute_names',
'results'])):
+ class QueryResponse(list):
"""
- Result returned by L{query}
+ Result returned by L{query}. Behaves as a list of L{Entity}.
@ivar attribute_names: List of attribute names for the results.
- @ivar results: List of lists. Each entry is a list of attribute values
- corresponding to the attribute_names.
"""
- pass
+ def __init__(self, response):
+ """
+ @param response: the respose message to a query.
+ """
+ self.attribute_names = response.body['attributeNames']
+ for r in response.body['results']:
+ self.append(Entity(attributes=dict(zip(self.attribute_names,
r))))
def query(self, entity_type=None, attribute_names=None, offset=None,
count=None):
"""
@@ -252,9 +256,10 @@ class Node(object):
@keyword attribute_names: A list of attribute names to query.
@keyword offset: An integer offset into the list of results to return.
@keyword count: A count of the maximum number of results to return.
- @return: A L{QueryResult}
+ @return: A L{QueryResponse}
"""
+ attribute_names = attribute_names or []
response = self.call(self.node_request(
operation='QUERY', entityType=entity_type, offset=offset,
count=count,
- body={'attributeNames':attribute_names or []}))
- return Node.QueryResult(response.body['attributeNames'],
response.body['results'])
+ body={'attributeNames':attribute_names}))
+ return Node.QueryResponse(response)
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py Fri
Jun 13 19:08:16 2014
@@ -26,7 +26,7 @@ An entity has a set of named attributes
from schema import EntityType
from copy import copy
-class Entity(dict):
+class Entity(object):
"""
A management entity: a set of attributes with an associated entity-type.
@@ -37,18 +37,19 @@ class Entity(dict):
@ivar I{attribute-name}: Access an entity attribute as a python attribute.
"""
- def __init__(self, entity_type, attributes=None, schema=None,
**kw_attributes):
+ def __init__(self, entity_type=None, attributes=None, schema=None,
**kw_attributes):
"""
@param entity_type: An L{EntityType} or the name of an entity type in
the schema.
- @param schema: The L{Schema} defining entity_type.
@param attributes: An attribute mapping.
+ @param schema: The L{Schema} defining entity_type.
@param kw_attributes: Attributes as keyword arguments.
"""
super(Entity, self).__init__()
if schema and entity_type in schema.entity_types:
self.entity_type = schema.entity_types[entity_type]
else:
- assert isinstance(entity_type, EntityType), "'%s' is not an entity
type"%entity_type
+ assert entity_type is None or \
+ isinstance(entity_type, EntityType), "'%s' is not an entity
type"%entity_type
self.entity_type = entity_type
self.attributes = attributes or {}
self.attributes.update(kw_attributes)
@@ -76,6 +77,10 @@ class Entity(dict):
else:
return (self.entity_type.name, self.attributes)
+ def __str__(self):
+ return str(self.dump)
+
+
class EntityList(list):
"""
A list of entities with some convenience methods for finding entities
@@ -150,6 +155,9 @@ class EntityList(list):
"""
return [e.dump(as_map) for e in self]
+ def __str__(self):
+ return str(self.dump())
+
def replace(self, contents):
"""
Replace the contents of the list.
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py Fri
Jun 13 19:08:16 2014
@@ -274,6 +274,9 @@ class AttributeTypeHolder(object):
('description', self.description or None)
])
+ def __str__(self):
+ print self.name
+
class IncludeType(AttributeTypeHolder):
def __init__(self, name, schema, attributes=None, description=""):
Modified: qpid/dispatch/trunk/tests/system_test.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Fri Jun 13 19:08:16 2014
@@ -56,6 +56,7 @@ import os, time, socket, random, subproc
from copy import copy
import proton
from proton import Message
+from qpid_dispatch_internal.management import amqp
# Optional modules
MISSING_MODULES = []
@@ -296,37 +297,6 @@ class Qdrouterd(Process):
self.defaults()
return "".join(["%s {\n%s}\n"%(n, props(p)) for n, p in self])
- class Agent(object):
- """Management agent"""
- def __init__(self, router):
- self.router = router
- self.messenger = Messenger()
- self.messenger.route("amqp:/*",
"amqp://0.0.0.0:%s/$1"%router.ports[0])
- self.address = "amqp:/$management"
- self.subscription = self.messenger.subscribe("amqp:/#")
- self.reply_to = self.subscription.address
-
- def stop(self):
- """Stop the agent's messenger"""
- self.messenger.stop()
-
- def get(self, entity_type):
- """Return a list of attribute dicts for each instance of
entity_type"""
- request = message(
- address=self.address, reply_to=self.reply_to,
- correlation_id=1,
- properties={u'operation':u'QUERY', u'entityType':entity_type},
- body={'attributeNames':[]})
- self.messenger.put(request)
- response = self.messenger.fetch()
- if response.properties['statusCode'] != 200:
- raise Exception("Agent error: %d %s" % (
- response.properties['statusCode'],
- response.properties['statusDescription']))
- attrs = response.body['attributeNames']
- return [dict(zip(attrs, values)) for values in
response.body['results']]
-
-
def __init__(self, name, config=Config(), wait=True):
"""
@param name: name used for for output files.
@@ -337,13 +307,14 @@ class Qdrouterd(Process):
super(Qdrouterd, self).__init__(
name, ['qdrouterd', '-c', config.write(name)],
expect=Process.RUNNING)
self._agent = None
- if wait: self.wait_ready()
+ if wait:
+ self.wait_ready()
@property
def agent(self):
"""Return an management Agent for this router"""
if not self._agent:
- self._agent = self.Agent(self)
+ self._agent = amqp.Node(self.addresses[0])
return self._agent
def teardown(self):
@@ -369,9 +340,9 @@ class Qdrouterd(Process):
def is_connected(self, port, host='0.0.0.0'):
"""If router has a connection to host:port return the management info.
Otherwise return None"""
- connections = self.agent.get('org.apache.qpid.dispatch.connection')
+ connections = self.agent.query('org.apache.qpid.dispatch.connection')
for c in connections:
- if c['name'] == '%s:%s'%(host, port):
+ if c.name == '%s:%s'%(host, port):
return c
return None
@@ -393,7 +364,7 @@ class Qpidd(Process):
def __str__(self):
return "".join(["%s=%s\n"%(k, v) for k, v in self.iteritems()])
- def __init__(self, name, config=Config(), port=None):
+ def __init__(self, name, config=Config(), port=None, wait=True):
self.config = Qpidd.Config(
{'auth':'no',
'log-to-stderr':'false', 'log-to-file':name+".log",
@@ -406,6 +377,8 @@ class Qpidd(Process):
self.port = self.config['port'] or 5672
self.address = "127.0.0.1:%s"%self.port
self._agent = None
+ if wait:
+ self.wait_ready()
def qm_connect(self):
"""Make a qpid_messaging connection to the broker"""
@@ -422,7 +395,8 @@ class Qpidd(Process):
self._agent = qpidtoollibs.BrokerAgent(self.qm_connect(), **kwargs)
return self._agent
-
+ def wait_ready(self):
+ wait_port(self.port)
# Decorator to add an optional flush argument to a method, defaulting to
# the _flush value for the messenger.
Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Fri Jun 13 19:08:16 2014
@@ -32,10 +32,10 @@ class DistributedQueueTest(system_test.T
def setUpClass(cls):
"""Start 3 qpidd brokers, wait for them to be ready."""
super(DistributedQueueTest, cls).setUpClass()
- cls.qpidds = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port())
+ cls.qpidds = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port(),
wait=False)
for i in xrange(3)]
for q in cls.qpidds:
- wait_port(q.port)
+ q.wait_ready()
@classmethod
def tearDownClass(cls):
Modified: qpid/dispatch/trunk/tests/system_tests_management.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_management.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_management.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_management.py Fri Jun 13 19:08:16
2014
@@ -68,9 +68,9 @@ class ManagementTest(system_test.TestCas
address = 'org.apache.qpid.dispatch.router.address'
response = self.node.query(entity_type=address)
self.assertEqual(response.attribute_names[0:3], ['type', 'name',
'identity'])
- for r in response.results: # Check types
- self.assertEqual(r[0], address)
- names = [r[1] for r in response.results]
+ for r in response: # Check types
+ self.assertEqual(r.type, address)
+ names = [r.name for r in response]
self.assertTrue('L$management' in names)
self.assertTrue('M0$management' in names)
@@ -79,9 +79,9 @@ class ManagementTest(system_test.TestCas
# Try offset, count
self.assertGreater(len(names), 2)
response0 = self.node.query(entity_type=address, count=1)
- self.assertEqual(names[0:1], [r[1] for r in response0.results])
+ self.assertEqual(names[0:1], [r[1] for r in response0])
response1_2 = self.node.query(entity_type=address, count=2,
offset=1)
- self.assertEqual(names[1:3], [r[1] for r in response1_2.results])
+ self.assertEqual(names[1:3], [r[1] for r in response1_2])
self.fail("Negative test passed!")
except: pass
@@ -90,7 +90,7 @@ class ManagementTest(system_test.TestCas
# FIXME aconway 2014-06-05: negative test: attribute_names query
doesn't work.
# Need a better test.
try:
- self.assertNotEqual([], response.results)
+ self.assertNotEqual([], response)
self.fail("Negative test passed!")
except: pass
Copied: qpid/dispatch/trunk/tools/qdstat (from r1602493,
qpid/dispatch/trunk/tools/qdstat.in)
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdstat?p2=qpid/dispatch/trunk/tools/qdstat&p1=qpid/dispatch/trunk/tools/qdstat.in&r1=1602493&r2=1602494&rev=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdstat.in (original)
+++ qpid/dispatch/trunk/tools/qdstat Fri Jun 13 19:08:16 2014
@@ -26,8 +26,10 @@ import locale
import socket
import re
from proton import Messenger, Message, Timeout
+from qpid_dispatch_internal.management.amqp import Url, Node
+from qpid_dispatch_internal.management.entity import Entity
-home = os.environ.get("QPID_DISPATCH_HOME",
os.path.normpath("${QPID_DISPATCH_HOME_INSTALLED}"))
+home = os.environ.get("QPID_DISPATCH_HOME",
os.path.normpath(os.path.dirname(__file__)))
sys.path.append(os.path.join(home, "python"))
from qpid_dispatch_internal.tools import Display, Header, Sorter, YN, Commas,
TimeLong
@@ -35,7 +37,6 @@ from qpid_dispatch_internal.tools import
class Config:
def __init__(self):
- self._host = "0.0.0.0"
self._connTimeout = 5
self._types = ""
self._limit = 50
@@ -88,75 +89,15 @@ def OptionsAndArguments(argv):
parser.error("You must specify one of these options: -g, -c, -l, -n,
-a, or -m. For details, try $ qdstat --help")
config._types = opts.show
- config._host = opts.bus
+ config._address = opts.bus
config._router = opts.router
config._connTimeout = opts.timeout
return args
-class AmqpEntity(object):
- def __init__(self, types, values):
- if len(types) != len(values):
- raise Exception("Mismatched types and values for entity")
- self.values = {}
- for idx in range(len(types)):
- self.values[types[idx]] = values[idx]
-
- def __getattr__(self, attr):
- if attr in self.values:
- return self.values[attr]
- raise Exception("Unknown attribute: %s" % attr)
- def __repr__(self):
- return "%r" % self.values
-
-
-class BusManager:
- def __init__(self):
- pass
-
- def SetHost(self, host, router):
- self.M = Messenger()
- self.M.start()
- self.M.timeout = config._connTimeout
- self.M.route("amqp:/*", "amqp://%s/$1" % host)
- if router:
- self.address = "amqp:/_topo/0/%s/$management" % router
- else:
- self.address = "amqp:/$management"
- self.subscription = self.M.subscribe("amqp:/#")
- self.reply = self.subscription.address
-
- def Disconnect(self):
- self.M.stop()
-
- def _get_object(self, cls):
- request = Message()
- response = Message()
-
- request.address = self.address
- request.reply_to = self.reply
- request.correlation_id = 1
- request.properties = {u'operation':u'QUERY', u'entityType':cls}
- request.body = {'attributeNames': []}
-
- self.M.put(request)
- self.M.send()
- self.M.recv()
- self.M.get(response)
-
- if response.properties['statusCode'] != 200:
- raise Exception("Agent reports: %d %s" %
(response.properties['statusCode'], response.properties['statusDescription']))
-
- entities = []
- anames = response.body['attributeNames']
- results = response.body['results']
- for e in results:
- entities.append(AmqpEntity(anames, e))
-
- return entities
-
+class BusManager(Node):
def displayConnections(self):
disp = Display(prefix=" ")
@@ -170,7 +111,7 @@ class BusManager:
rows = []
- objects = self._get_object('org.apache.qpid.dispatch.connection')
+ objects = self.query('org.apache.qpid.dispatch.connection')
for conn in objects:
row = []
@@ -216,7 +157,7 @@ class BusManager:
heads.append(Header("value"))
rows = []
- objects = self._get_object('org.apache.qpid.dispatch.router')
+ objects = self.query('org.apache.qpid.dispatch.router')
router = objects[0]
rows.append(('Mode', router.mode))
@@ -243,7 +184,7 @@ class BusManager:
heads.append(Header("msg-fifo"))
rows = []
- objects = self._get_object('org.apache.qpid.dispatch.router.link')
+ objects = self.query('org.apache.qpid.dispatch.router.link')
for link in objects:
row = []
@@ -275,8 +216,8 @@ class BusManager:
heads.append(Header("valid-origins"))
rows = []
- objects = self._get_object('org.apache.qpid.dispatch.router.node')
- attached = self._get_object('org.apache.qpid.dispatch.router')[0]
+ objects = self.query('org.apache.qpid.dispatch.router.node')
+ attached = self.query('org.apache.qpid.dispatch.router')[0]
nodes = {}
for node in objects:
@@ -325,7 +266,7 @@ class BusManager:
heads.append(Header("from-proc", Header.COMMAS))
rows = []
- objects = self._get_object('org.apache.qpid.dispatch.router.address')
+ objects = self.query('org.apache.qpid.dispatch.router.address')
for addr in objects:
row = []
@@ -359,7 +300,7 @@ class BusManager:
heads.append(Header("rebal-out", Header.COMMAS))
rows = []
- objects = self._get_object('org.apache.qpid.dispatch.allocator')
+ objects = self.query('org.apache.qpid.dispatch.allocator')
for t in objects:
row = []
@@ -392,12 +333,10 @@ class BusManager:
def main(argv=None):
args = OptionsAndArguments(argv)
- bm = BusManager()
-
try:
- bm.SetHost(config._host, config._router)
+ bm = BusManager(config._address, config._router)
bm.display(args)
- bm.Disconnect()
+ bm.stop()
return 0
except KeyboardInterrupt:
print
@@ -409,7 +348,7 @@ def main(argv=None):
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
- bm.Disconnect()
+ bm.stop()
return 1
if __name__ == "__main__":
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]