Author: aconway
Date: Tue May 20 20:20:34 2014
New Revision: 1596394
URL: http://svn.apache.org/r1596394
Log:
QPID-DISPATCH-52: Extended test of distributed work-queue, multiple routers,
multiple brokers.
Modified:
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_broker.py
Modified: qpid/dispatch/trunk/tests/system_test.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1596394&r1=1596393&r2=1596394&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Tue May 20 20:20:34 2014
@@ -247,6 +247,10 @@ class Config(object):
f.write(str(self))
return name
+ # def __getitem(self, key):
+ # """Get an item, make sure any defaults have been set first"""
+ # # defaults()
+ # return super(Config, self).__getitem__(self, key)
class Qdrouterd(Process):
"""Run a Qpid Dispatch Router Daemon"""
@@ -257,27 +261,28 @@ class Qdrouterd(Process):
"""
DEFAULTS = {
- 'listener':{'sasl-mechanisms':'ANONYMOUS'},
- 'connector':{'sasl-mechanisms':'ANONYMOUS', 'role':'on-demand'}
+ 'listener':{'addr':'0.0.0.0', 'sasl-mechanisms':'ANONYMOUS'},
+ 'connector':{'addr':'0.0.0.0', 'sasl-mechanisms':'ANONYMOUS',
'role':'on-demand'}
}
def sections(self, name):
"""Return list of sections named name"""
return [p for n, p in self if n == name]
+ def defaults(self):
+ """Fill in default values in configuration"""
+ for name, props in self:
+ if name in Qdrouterd.Config.DEFAULTS:
+ for n,p in Qdrouterd.Config.DEFAULTS[name].iteritems():
+ props.setdefault(n,p)
+
def __str__(self):
- """Generate config file content. Fills in defaults for some
require values"""
- def defs(name, props):
- """Fill in defaults for required values"""
- if not name in Qdrouterd.Config.DEFAULTS:
- return props
- p = copy(Qdrouterd.Config.DEFAULTS[name])
- p.update(props)
- return p
+ """Generate config file content. Calls default() first."""
def props(p):
"""qpidd.conf format of dict p"""
return "".join([" %s: %s\n"%(k, v) for k, v in
p.iteritems()])
- return "".join(["%s {\n%s}\n"%(n, props(defs(n, p))) for n, p in
self])
+ self.defaults()
+ return "".join(["%s {\n%s}\n"%(n, props(p)) for n, p in self])
class Agent(object):
"""Management agent"""
@@ -567,3 +572,8 @@ class TestCase(unittest.TestCase, Tester
def test_zzzz_teardown_class(self):
"""Fake test to call tearDownClass"""
self.__class__.tearDownClass()
+
+ def assert_fair(self, seq):
+ avg = sum(seq)/len(seq)
+ for i in seq:
+ assert i > avg/2, "Work not fairly distributed: %s"%seq
Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1596394&r1=1596393&r2=1596394&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Tue May 20 20:20:34 2014
@@ -23,8 +23,9 @@ with waypoints.
"""
import unittest, system_test
from system_test import wait_port, wait_ports, Qdrouterd, retry, message,
MISSING_REQUIREMENTS
+from itertools import cycle
-class BrokerSystemTest(system_test.TestCase): # pylint:
disable=too-many-public-methods
+class DistributedQueueTest(system_test.TestCase): # pylint:
disable=too-many-public-methods
"""System tests involving routers and qpidd brokers"""
# Hack for python 2.6 which does not support setupClass.
@@ -34,7 +35,7 @@ class BrokerSystemTest(system_test.TestC
@classmethod
def setUpClass(cls):
"""Start 3 qpidd brokers, wait for them to be ready."""
- super(BrokerSystemTest, cls).setUpClass()
+ super(DistributedQueueTest, cls).setUpClass()
cls.qpidd = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port())
for i in xrange(3)]
for q in cls.qpidd:
@@ -45,63 +46,73 @@ class BrokerSystemTest(system_test.TestC
def tearDownClass(cls):
if cls.setup_ok:
cls.setup_ok = False
- super(BrokerSystemTest, cls).tearDownClass()
+ super(DistributedQueueTest, cls).tearDownClass()
- def test_distrbuted_queue(self):
- """Static distributed queue, one router, three brokers"""
- if not self.setup_ok:
- return self.skipTest("setUpClass failed")
- testq = self.id() # The distributed queue name
- for q in self.qpidd:
- q.agent.addQueue(testq)
-
- # Start a qdrouterd
- # We have a waypoint for each broker, on the same testq address.
- # Sending to testq should spread messages to the qpidd queues.
- # Subscribing to testq should gather messages from the qpidd queues.
- router_conf = Qdrouterd.Config([
- ('log', {'module':'DEFAULT', 'level':'NOTICE'}),
+ def setUp(self):
+ super(DistributedQueueTest, self).setUp()
+ self.testq = 'testq.'+self.id().split('.')[-1] # The distributed queue
name
+
+ def common_router_conf(self, name, mode='standalone'):
+ """Common router configuration for the tests"""
+ return Qdrouterd.Config([
+ ('log', {'module':'DEFAULT', 'level':'INFO'}),
('log', {'module':'ROUTER', 'level':'TRACE'}),
('log', {'module':'MESSAGE', 'level':'TRACE'}),
- ('container', {'container-name':self.id()}),
- ('container', {'container-name':self.id()}),
- ('router', {'mode': 'standalone', 'router-id': self.id()}),
- ('listener', {'addr':'0.0.0.0', 'port':self.get_port()}),
- ('fixed-address', {'prefix':testq, 'phase':0, 'fanout':'single',
'bias':'spread'}),
- ('fixed-address', {'prefix':testq, 'phase':1, 'fanout':'single',
'bias':'spread'})
+ ('container', {'container-name':name}),
+ ('router', {'mode': mode, 'router-id': name})
])
- # Add connector and waypoint for each broker.
- for q in self.qpidd:
- router_conf += [
- ('connector', {'name':q.name, 'addr':'0.0.0.0',
'port':q.port}),
- ('waypoint', {'name':testq, 'out-phase':1, 'in-phase':0,
'connector':q.name})]
-
- router = self.qdrouterd('router0', router_conf)
- wait_ports(router.ports)
- for q in self.qpidd:
- retry(lambda: router.is_connected(q.port))
+ def verify_equal_spread(self, send_addresses, receive_addresses):
+ """Verify we send/receive to the queue the load was spread over the
brokers.
+ Send to each of the send_addresses in turn, subscribe to all of the
receive_addresses.
+ """
msgr = self.messenger()
-
- address = router.addresses[0]+"/"+testq
- msgr.subscribe(address, flush=True)
+ for a in receive_addresses:
+ msgr.subscribe(a)
+ msgr.flush()
n = 20 # Messages per broker
r = ["x-%02d"%i for i in range(n*len(self.qpidd))]
- for b in r:
- msgr.put(message(address=address, body=b))
+ for b, a in zip(r, cycle(send_addresses)):
+ msgr.put(message(address=a, body=b))
+ msgr.flush()
+ # FIXME aconway 2014-05-20: From which subscription?
messages = sorted(msgr.fetch().body for i in r)
msgr.flush()
- self.assertEqual(messages, r)
- # Verify we got back exactly what we sent.
- qs = [q.agent.getQueue(testq) for q in self.qpidd]
+ self.assertEqual(r, messages)
+
+ qs = [q.agent.getQueue(self.testq) for q in self.qpidd]
enq = sum(q.msgTotalEnqueues for q in qs)
deq = sum(q.msgTotalDequeues for q in qs)
self.assertEquals((enq, deq), (len(r), len(r)))
- # Verify the messages were spread equally over the brokers.
- self.assertEquals(
- [(q.msgTotalEnqueues, q.msgTotalDequeues) for q in qs],
- [(n, n) for q in qs]
- )
+ # Verify each broker handled a reasonable share of the messages.
+ self.assert_fair([q.msgTotalEnqueues for q in qs])
+
+ def test_distrbuted_queue(self):
+ """Create a distributed queue with N routers and N brokers.
+ Each router is connected to all the brokers."""
+ if not self.setup_ok:
+ return self.skipTest("setUpClass failed")
+ for q in self.qpidd:
+ q.agent.addQueue(self.testq)
+
+ def router(i):
+ """Create router<i> with waypoints to each broker."""
+ name = "router%s"%i
+ rconf = self.common_router_conf(name, mode='interior')
+ rconf += [
+ ('listener', {'port':self.get_port(), 'role':'normal'}),
+ ('fixed-address', {'prefix':self.testq, 'phase':0,
'fanout':'single', 'bias':'spread'}),
+ ('fixed-address', {'prefix':self.testq, 'phase':1,
'fanout':'single', 'bias':'spread'})]
+ for q in self.qpidd:
+ rconf += [
+ ('connector', {'name':q.name, 'port':q.port}),
+ ('waypoint', {'name':self.testq, 'out-phase':1,
'in-phase':0, 'connector':q.name})]
+ return self.qdrouterd(name, rconf)
+ routers = [router(i) for i in xrange(len(self.qpidd))]
+ for r in routers: r.wait_ready()
+ addrs = [r.addresses[0]+"/"+self.testq for r in routers]
+ self.verify_equal_spread(addrs, addrs)
+
if __name__ == '__main__':
if MISSING_REQUIREMENTS:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]