Author: aconway
Date: Tue May 20 20:20:34 2014
New Revision: 1596394

URL: http://svn.apache.org/r1596394
Log:
QPID-DISPATCH-52: Extended test of distributed work-queue, multiple routers, 
multiple brokers.

Modified:
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_broker.py

Modified: qpid/dispatch/trunk/tests/system_test.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1596394&r1=1596393&r2=1596394&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Tue May 20 20:20:34 2014
@@ -247,6 +247,10 @@ class Config(object):
             f.write(str(self))
         return name
 
+    # def __getitem(self, key):
+    #     """Get an item, make sure any defaults have been set first"""
+    #     # defaults()
+    #     return super(Config, self).__getitem__(self, key)
 
 class Qdrouterd(Process):
     """Run a Qpid Dispatch Router Daemon"""
@@ -257,27 +261,28 @@ class Qdrouterd(Process):
         """
 
         DEFAULTS = {
-            'listener':{'sasl-mechanisms':'ANONYMOUS'},
-            'connector':{'sasl-mechanisms':'ANONYMOUS', 'role':'on-demand'}
+            'listener':{'addr':'0.0.0.0', 'sasl-mechanisms':'ANONYMOUS'},
+            'connector':{'addr':'0.0.0.0', 'sasl-mechanisms':'ANONYMOUS', 
'role':'on-demand'}
         }
 
         def sections(self, name):
             """Return list of sections named name"""
             return [p for n, p in self if n == name]
 
+        def defaults(self):
+            """Fill in default values in configuration"""
+            for name, props in self:
+                if name in Qdrouterd.Config.DEFAULTS:
+                    for n,p in Qdrouterd.Config.DEFAULTS[name].iteritems():
+                        props.setdefault(n,p)
+
         def __str__(self):
-            """Generate config file content. Fills in defaults for some 
require values"""
-            def defs(name, props):
-                """Fill in defaults for required values"""
-                if not name in Qdrouterd.Config.DEFAULTS:
-                    return props
-                p = copy(Qdrouterd.Config.DEFAULTS[name])
-                p.update(props)
-                return p
+            """Generate config file content. Calls default() first."""
             def props(p):
                 """qpidd.conf format of dict p"""
                 return "".join(["    %s: %s\n"%(k, v) for k, v in 
p.iteritems()])
-            return "".join(["%s {\n%s}\n"%(n, props(defs(n, p))) for n, p in 
self])
+            self.defaults()
+            return "".join(["%s {\n%s}\n"%(n, props(p)) for n, p in self])
 
     class Agent(object):
         """Management agent"""
@@ -567,3 +572,8 @@ class TestCase(unittest.TestCase, Tester
         def test_zzzz_teardown_class(self):
             """Fake test to call tearDownClass"""
             self.__class__.tearDownClass()
+
+    def assert_fair(self, seq):
+        avg = sum(seq)/len(seq)
+        for i in seq:
+            assert i > avg/2, "Work not fairly distributed: %s"%seq

Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1596394&r1=1596393&r2=1596394&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Tue May 20 20:20:34 2014
@@ -23,8 +23,9 @@ with waypoints.
 """
 import unittest, system_test
 from system_test import wait_port, wait_ports, Qdrouterd, retry, message, 
MISSING_REQUIREMENTS
+from itertools import cycle
 
-class BrokerSystemTest(system_test.TestCase): # pylint: 
disable=too-many-public-methods
+class DistributedQueueTest(system_test.TestCase): # pylint: 
disable=too-many-public-methods
     """System tests involving routers and qpidd brokers"""
 
     # Hack for python 2.6 which does not support setupClass.
@@ -34,7 +35,7 @@ class BrokerSystemTest(system_test.TestC
     @classmethod
     def setUpClass(cls):
         """Start 3 qpidd brokers, wait for them to be ready."""
-        super(BrokerSystemTest, cls).setUpClass()
+        super(DistributedQueueTest, cls).setUpClass()
         cls.qpidd = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port())
                     for i in xrange(3)]
         for q in cls.qpidd:
@@ -45,63 +46,73 @@ class BrokerSystemTest(system_test.TestC
     def tearDownClass(cls):
         if cls.setup_ok:
             cls.setup_ok = False
-            super(BrokerSystemTest, cls).tearDownClass()
+            super(DistributedQueueTest, cls).tearDownClass()
 
-    def test_distrbuted_queue(self):
-        """Static distributed queue, one router, three brokers"""
-        if not self.setup_ok:
-            return self.skipTest("setUpClass failed")
-        testq = self.id()       # The distributed queue name
-        for q in self.qpidd:
-            q.agent.addQueue(testq)
-
-        # Start a qdrouterd
-        # We have a waypoint for each broker, on the same testq address.
-        # Sending to testq should spread messages to the qpidd queues.
-        # Subscribing to testq should gather messages from the qpidd queues.
-        router_conf = Qdrouterd.Config([
-            ('log', {'module':'DEFAULT', 'level':'NOTICE'}),
+    def setUp(self):
+        super(DistributedQueueTest, self).setUp()
+        self.testq = 'testq.'+self.id().split('.')[-1] # The distributed queue 
name
+
+    def common_router_conf(self, name, mode='standalone'):
+        """Common router configuration for the tests"""
+        return Qdrouterd.Config([
+            ('log', {'module':'DEFAULT', 'level':'INFO'}),
             ('log', {'module':'ROUTER', 'level':'TRACE'}),
             ('log', {'module':'MESSAGE', 'level':'TRACE'}),
-            ('container', {'container-name':self.id()}),
-            ('container', {'container-name':self.id()}),
-            ('router', {'mode': 'standalone', 'router-id': self.id()}),
-            ('listener', {'addr':'0.0.0.0', 'port':self.get_port()}),
-            ('fixed-address', {'prefix':testq, 'phase':0, 'fanout':'single', 
'bias':'spread'}),
-            ('fixed-address', {'prefix':testq, 'phase':1, 'fanout':'single', 
'bias':'spread'})
+            ('container', {'container-name':name}),
+            ('router', {'mode': mode, 'router-id': name})
         ])
-        # Add connector and waypoint for each broker.
-        for q in self.qpidd:
-            router_conf += [
-                ('connector', {'name':q.name, 'addr':'0.0.0.0', 
'port':q.port}),
-                ('waypoint', {'name':testq, 'out-phase':1, 'in-phase':0, 
'connector':q.name})]
-
-        router = self.qdrouterd('router0', router_conf)
-        wait_ports(router.ports)
-        for q in self.qpidd:
-            retry(lambda: router.is_connected(q.port))
 
+    def verify_equal_spread(self, send_addresses, receive_addresses):
+        """Verify we send/receive to the queue the load was spread over the 
brokers.
+        Send to each of the send_addresses in turn, subscribe to all of the 
receive_addresses.
+        """
         msgr = self.messenger()
-
-        address = router.addresses[0]+"/"+testq
-        msgr.subscribe(address, flush=True)
+        for a in receive_addresses:
+            msgr.subscribe(a)
+        msgr.flush()
         n = 20                  # Messages per broker
         r = ["x-%02d"%i for i in range(n*len(self.qpidd))]
-        for b in r:
-            msgr.put(message(address=address, body=b))
+        for b, a in zip(r, cycle(send_addresses)):
+            msgr.put(message(address=a, body=b))
+        msgr.flush()
+        # FIXME aconway 2014-05-20: From which subscription?
         messages = sorted(msgr.fetch().body for i in r)
         msgr.flush()
-        self.assertEqual(messages, r)
-        # Verify we got back exactly what we sent.
-        qs = [q.agent.getQueue(testq) for q in self.qpidd]
+        self.assertEqual(r, messages)
+
+        qs = [q.agent.getQueue(self.testq) for q in self.qpidd]
         enq = sum(q.msgTotalEnqueues for q in qs)
         deq = sum(q.msgTotalDequeues for q in qs)
         self.assertEquals((enq, deq), (len(r), len(r)))
-        # Verify the messages were spread equally over the brokers.
-        self.assertEquals(
-            [(q.msgTotalEnqueues, q.msgTotalDequeues) for q in qs],
-            [(n, n) for q in qs]
-        )
+        # Verify each broker handled a reasonable share of the messages.
+        self.assert_fair([q.msgTotalEnqueues for q in qs])
+
+    def test_distrbuted_queue(self):
+        """Create a distributed queue with N routers and N brokers.
+        Each router is connected to all the brokers."""
+        if not self.setup_ok:
+            return self.skipTest("setUpClass failed")
+        for q in self.qpidd:
+            q.agent.addQueue(self.testq)
+
+        def router(i):
+            """Create router<i> with waypoints to each broker."""
+            name = "router%s"%i
+            rconf = self.common_router_conf(name, mode='interior')
+            rconf += [
+                ('listener', {'port':self.get_port(), 'role':'normal'}),
+                ('fixed-address', {'prefix':self.testq, 'phase':0, 
'fanout':'single', 'bias':'spread'}),
+                ('fixed-address', {'prefix':self.testq, 'phase':1, 
'fanout':'single', 'bias':'spread'})]
+            for q in self.qpidd:
+                rconf += [
+                    ('connector', {'name':q.name, 'port':q.port}),
+                    ('waypoint', {'name':self.testq, 'out-phase':1, 
'in-phase':0, 'connector':q.name})]
+            return self.qdrouterd(name, rconf)
+        routers = [router(i) for i in xrange(len(self.qpidd))]
+        for r in routers: r.wait_ready()
+        addrs = [r.addresses[0]+"/"+self.testq for r in routers]
+        self.verify_equal_spread(addrs, addrs)
+
 
 if __name__ == '__main__':
     if MISSING_REQUIREMENTS:



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to