Author: aconway
Date: Fri Jun 13 19:08:37 2014
New Revision: 1602498

URL: http://svn.apache.org/r1602498
Log:
NO-JIRA: convert system_tests_two_routers to system_test framework.

Modified:
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_two_routers.py

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py?rev=1602498&r1=1602497&r2=1602498&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py 
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py Fri 
Jun 13 19:08:37 2014
@@ -137,7 +137,7 @@ class Node(object):
     NODE_TYPE='org.amqp.management' # AMQP management node type
     NODE_PROPERTIES={'name':SELF, 'type':NODE_TYPE}
 
-    def __init__(self, address=None, router=None, locales=None):
+    def __init__(self, address=None, router=None, locales=None, timeout=10):
         """
         @param address: AMQP address of the management node.
         @param router: If address does not contain a path, use the management 
node for this router ID.
@@ -155,7 +155,7 @@ class Node(object):
 
         self.messenger = proton.Messenger()
         self.messenger.start()
-        self.messenger.timeout = 1 # FIXME aconway 2014-06-02: config
+        self.messenger.timeout = timeout
         subscribe_address = Url(address)
         subscribe_address.path = "#"
         self.subscription = self.messenger.subscribe(str(subscribe_address))
@@ -173,7 +173,7 @@ class Node(object):
 
     def _flush(self):
         """Call self.messenger.work() till there is no work left."""
-        while self.messenger.work(0.01):
+        while self.messenger.work(0.1):
             pass
 
     CORRELATION_ID = 0

Modified: qpid/dispatch/trunk/tests/system_test.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1602498&r1=1602497&r2=1602498&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Fri Jun 13 19:08:37 2014
@@ -297,15 +297,17 @@ class Qdrouterd(Process):
             self.defaults()
             return "".join(["%s {\n%s}\n"%(n, props(p)) for n, p in self])
 
-    def __init__(self, name, config=Config(), wait=True):
+    def __init__(self, name, config=Config(), pyinclude=None, wait=True):
         """
         @param name: name used for for output files.
         @param config: router configuration
         @keyword wait: wait for router to be ready (call self.wait_ready())
         """
         self.config = copy(config)
+        if not pyinclude and os.environ['QPID_DISPATCH_HOME']:
+            pyinclude = os.path.join(os.environ['QPID_DISPATCH_HOME'], 
'python')
         super(Qdrouterd, self).__init__(
-            name, ['qdrouterd', '-c', config.write(name)], 
expect=Process.RUNNING)
+            name, ['qdrouterd', '-c', config.write(name), '-I', pyinclude], 
expect=Process.RUNNING)
         self._agent = None
         if wait:
             self.wait_ready()
@@ -346,10 +348,31 @@ class Qdrouterd(Process):
                 return c
         return None
 
-    def wait_connectors(self):
-        """Wait for all connectors to be connected"""
+    def wait_address(self, address, subscribers=0, remotes=0, **retry_kwargs):
+        """
+        Wait for an address to be visible on the router.
+        @keyword subscribers: Wait till subscriberCount >= subscribers
+        @keyword remotes: Wait till remoteCount >= remotes
+        @param retry_kwargs: keyword args for L{retry}
+        """
+        def check():
+            # FIXME aconway 2014-06-12: this should be a request by name, not 
a query.
+            addrs = self.agent.query(
+                entity_type='org.apache.qpid.dispatch.router.address',
+                attribute_names=['name', 'subscriberCount', 'remoteCount'])
+            # FIXME aconway 2014-06-12: endswith check is because of M0/L 
prefixes
+            addrs = [a for a in addrs if a.name.endswith(address)]
+            return addrs and addrs[0].subscriberCount >= subscribers and 
addrs[0].remoteCount >= remotes
+        assert retry(check, **retry_kwargs)
+
+
+    def wait_connectors(self, **retry_kwargs):
+        """
+        Wait for all connectors to be connected
+        @param retry_kwargs: keyword args for L{retry}
+        """
         for c in self.config.sections('connector'):
-            retry(lambda: self.is_connected(c['port']))
+            assert retry(lambda: self.is_connected(c['port']), **retry_kwargs)
 
     def wait_ready(self):
         """Wait for ports and connectors to be ready"""
@@ -531,7 +554,8 @@ class TestCase(unittest.TestCase, Tester
     @classmethod
     def base_dir(cls):
         if not cls._base_dir:
-            cls._base_dir = os.path.abspath(os.path.join(__name__+'.dir', 
cls.__name__))
+            cls._base_dir = os.path.abspath(
+                os.path.join(__name__+'.dir', cls.__module__, cls.__name__))
         return cls._base_dir
 
     @classmethod

Modified: qpid/dispatch/trunk/tests/system_tests_two_routers.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_two_routers.py?rev=1602498&r1=1602497&r2=1602498&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_two_routers.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_two_routers.py Fri Jun 13 19:08:37 
2014
@@ -17,161 +17,86 @@
 # under the License.
 #
 
-import sys
-import os
-import time
-import unittest
-import subprocess
-from proton import Messenger, Message, PENDING, ACCEPTED, REJECTED, RELEASED
-
-def wait_for_addr(messenger, addr, local_count, remote_count):
-    msub  = messenger.subscribe("amqp:/#")
-    reply = msub.address
-    req   = Message()
-    rsp   = Message()
-
-    done = False
-    while not done:
-        req.address    = "amqp:/_local/$management"
-        req.reply_to   = reply
-        req.properties = {u'operation':u'QUERY', 
u'entityType':u'org.apache.qpid.dispatch.router.address'}
-        req.body       = {u'attributeNames': [u'name', u'subscriberCount', 
u'remoteCount']}
-        messenger.put(req)
-        messenger.send()
-        messenger.recv()
-        messenger.get(rsp)
-        for item in rsp.body[u'results']:
-            if item[0][2:] == addr and \
-               local_count == item[1] and \
-               remote_count == item[2]:
-                done = True
-        time.sleep(0.2)
-
-def wait_for_routethrough(messenger, addr):
-    msub  = messenger.subscribe("amqp:/#")
-    reply = msub.address
-    req   = Message()
-    rsp   = Message()
-
-    done = False
-    while not done:
-        req.address    = "amqp:/_topo/0/%s/$management" % addr
-        req.reply_to   = reply
-        req.properties = {u'operation':u'GET-OPERATIONS', 
u'type':u'org.amqp.management', u'name':u'self'}
-        messenger.put(req)
-        messenger.send()
-        try:
-            messenger.recv()
-            done = True
-        except Exception:
-            pass
-        time.sleep(0.2)
-
-def startRouter(obj):
-    default_home = os.path.normpath('/usr/lib/qpid-dispatch')
-    home = os.environ.get('QPID_DISPATCH_HOME', default_home)
-    if obj.ssl_option == "ssl":
-        configA_file = '%s/tests/config-2/A-ssl.conf' % home
-        configB_file = '%s/tests/config-2/B-ssl.conf' % home 
-    else:
-        configA_file = '%s/tests/config-2/A.conf' % home
-        configB_file = '%s/tests/config-2/B.conf' % home
-
-    obj.routerA = subprocess.Popen(['qdrouterd', '-c', configA_file],
-                                   stderr=subprocess.PIPE,
-                                   stdout=subprocess.PIPE)
-    obj.routerB = subprocess.Popen(['qdrouterd', '-c', configB_file],
-                                   stderr=subprocess.PIPE,
-                                   stdout=subprocess.PIPE)
-    time.sleep(1)
-    print "Waiting for router topology to stabilize..."
-
-    M1 = Messenger()
-    M2 = Messenger()
-
-    M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-    M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-
-    M1.start()
-    M2.start()
-
-    M1.timeout = 0.5
-    M2.timeout = 0.5
-
-    wait_for_routethrough(M1, "QDR.B")
-    wait_for_routethrough(M2, "QDR.A")
-
-    M1.stop()
-    M2.stop()
-
-
-def stopRouter(obj):
-    obj.routerA.terminate()
-    obj.routerB.terminate()
-    obj.routerA.wait()
-    obj.routerB.wait()
+import unittest, sys, time, os
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
+from system_test import TestCase, Messenger, Qdrouterd, retry_exception
+from qpid_dispatch_internal.management import Node
 
-
-class RouterTest(unittest.TestCase):
+class RouterTest(TestCase):
     ssl_option = None
 
-    if (sys.version_info[0] == 2) and (sys.version_info[1] < 7):
-        def setUp(self):
-            startRouter(self)
-
-        def tearDown(self):
-            stopRouter(self)
-    else:
-        @classmethod
-        def setUpClass(cls):
-            startRouter(cls)
-
-        @classmethod
-        def tearDownClass(cls):
-            stopRouter(cls)
-
-    def flush(self, messenger):
-        while messenger.work(0.1):
-            pass
-
-    def subscribe(self, messenger, address):
-        sub = messenger.subscribe(address)
-        self.flush(messenger)
-        return sub
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router and a messenger"""
+        super(RouterTest, cls).setUpClass()
+
+        def ssl_config(password):
+            if not cls.ssl_option: return []
+            def ssl_file(name):
+                return os.path.join(os.path.dirname(__file__), 'config-2', 
name)
+            return [
+                ('ssl-profile', {
+                    'name': 'ssl-profile-name',
+                    'cert-db': ssl_file('ca-certificate.pem'),
+                    'cert-file': ssl_file('server-certificate.pem'),
+                    'key-file': ssl_file('server-private-key.pem'),
+                    'password': password})]
+
+        def router(name, password, connection):
+            if cls.ssl_option:
+                connection[1]['ssl-profile'] = 'ssl-profile-name'
+            config = Qdrouterd.Config(ssl_config(password) + [
+                ('log', {'module':'DEFAULT', 'level':'trace', 
'output':name+".log"}),
+                ('container', {'worker-threads': 4, 'container-name': 
'Qpid.Dispatch.Router.%s'%name}),
+                ('router', {'mode': 'interior', 'router-id': 'QDR.%s'%name}),
+                ('listener', {'port': cls.tester.get_port()}),
+                ('fixed-address', {'prefix': '/closest/', 'fanout': 'single', 
'bias': 'closest'}),
+                ('fixed-address', {'prefix': '/spread/', 'fanout': 'single', 
'bias': 'spread'}),
+                ('fixed-address', {'prefix': '/multicast/', 'fanout': 
'multiple'}),
+                ('fixed-address', {'prefix': '/', 'fanout': 'multiple'}),
+                connection
+            ])
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+        router('A', 'server-password',
+               ('listener', {'role': 'inter-router', 'port': 
cls.tester.get_port()}))
+        router('B', 'client-password',
+               ('connector', {'role': 'inter-router', 'port': 
cls.routers[0].ports[1]}))
+
+        def query_through(address, router):
+            n = Node(address, router, timeout=0.2)
+            retry_exception(lambda: n.query('org.apache.qpid.dispatch.router'))
+        # Wait till we can query through each router to the other.
+        query_through(cls.routers[0].addresses[0], 'QDR.B')
+        query_through(cls.routers[1].addresses[0], 'QDR.A')
+
 
     def test_00_discard(self):
-        addr = "amqp://0.0.0.0:20100/discard/1"
-        M1 = Messenger()
-        M1.timeout = 1.0
-        M1.start()
+        addr = self.routers[0].addresses[0]+"/discard/1"
+        M1 = self.messenger()
         tm = Message()
         tm.address = addr
         for i in range(100):
             tm.body = {'number': i}
             M1.put(tm)
         M1.send()
-        M1.stop()
-
 
     def test_01_pre_settled(self):
         addr = "amqp:/pre_settled/1"
-        M1 = Messenger()
-        M2 = Messenger()
-
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1 = self.messenger()
+        M2 = self.messenger()
 
-        M1.timeout = 1.0
-        M2.timeout = 1.0
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
-        M1.start()
-        M2.start()
-        self.subscribe(M2, addr)
+        M2.subscribe(addr, flush=True)
 
         tm = Message()
         rm = Message()
 
-        wait_for_addr(M1, "pre_settled/1", 0, 1)
+        self.routers[0].wait_address("pre_settled/1", 0, 1, timeout=30)
 
         tm.address = addr
         for i in range(100):
@@ -190,15 +115,15 @@ class RouterTest(unittest.TestCase):
 
     def test_02_multicast(self):
         addr = "amqp:/pre_settled/multicast/1"
-        M1 = Messenger()
-        M2 = Messenger()
-        M3 = Messenger()
-        M4 = Messenger()
-
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-        M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1 = self.messenger()
+        M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -209,10 +134,10 @@ class RouterTest(unittest.TestCase):
         M2.start()
         M3.start()
         M4.start()
-        self.subscribe(M2, addr)
-        self.subscribe(M3, addr)
-        self.subscribe(M4, addr)
-        wait_for_addr(M1, "pre_settled/multicast/1", 1, 1)
+        M2.subscribe(addr, flush=True)
+        M3.subscribe(addr, flush=True)
+        M4.subscribe(addr, flush=True)
+        self.routers[0].wait_address("pre_settled/multicast/1", 1, 1)
 
         tm = Message()
         rm = Message()
@@ -244,15 +169,15 @@ class RouterTest(unittest.TestCase):
 
     def test_02a_multicast_unsettled(self):
         addr = "amqp:/pre_settled/multicast/2"
-        M1 = Messenger()
-        M2 = Messenger()
-        M3 = Messenger()
-        M4 = Messenger()
-
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-        M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1 = self.messenger()
+        M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -269,10 +194,10 @@ class RouterTest(unittest.TestCase):
         M3.start()
         M4.start()
 
-        self.subscribe(M2, addr)
-        self.subscribe(M3, addr)
-        self.subscribe(M4, addr)
-        wait_for_addr(M1, "pre_settled/multicast/2", 1, 1)
+        M2.subscribe(addr, flush=True)
+        M3.subscribe(addr, flush=True)
+        M4.subscribe(addr, flush=True)
+        self.routers[0].wait_address("pre_settled/multicast/2", 1, 1)
 
         tm = Message()
         rm = Message()
@@ -310,11 +235,11 @@ class RouterTest(unittest.TestCase):
 
     def test_02c_sender_settles_first(self):
         addr = "amqp:/settled/senderfirst/1"
-        M1 = Messenger()
-        M2 = Messenger()
+        M1 = self.messenger()
+        M2 = self.messenger()
 
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -324,8 +249,8 @@ class RouterTest(unittest.TestCase):
 
         M1.start()
         M2.start()
-        self.subscribe(M2, addr)
-        wait_for_addr(M1, "settled/senderfirst/1", 0, 1)
+        M2.subscribe(addr, flush=True)
+        self.routers[0].wait_address("settled/senderfirst/1", 0, 1)
 
         tm = Message()
         rm = Message()
@@ -336,8 +261,8 @@ class RouterTest(unittest.TestCase):
         M1.send(0)
 
         M1.settle(ttrk)
-        self.flush(M1)
-        self.flush(M2)
+        M1.flush()
+        M2.flush()
 
         M2.recv(1)
         rtrk = M2.get(rm)
@@ -345,8 +270,8 @@ class RouterTest(unittest.TestCase):
         M2.settle(rtrk)
         self.assertEqual(0, rm.body['number'])
 
-        self.flush(M1)
-        self.flush(M2)
+        M1.flush()
+        M2.flush()
 
         M1.stop()
         M2.stop()
@@ -354,11 +279,11 @@ class RouterTest(unittest.TestCase):
 
     def test_03_propagated_disposition(self):
         addr = "amqp:/unsettled/2"
-        M1 = Messenger()
-        M2 = Messenger()
+        M1 = self.messenger()
+        M2 = self.messenger()
 
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -368,8 +293,8 @@ class RouterTest(unittest.TestCase):
 
         M1.start()
         M2.start()
-        self.subscribe(M2, addr)
-        wait_for_addr(M1, "unsettled/2", 0, 1)
+        M2.subscribe(addr, flush=True)
+        self.routers[0].wait_address("unsettled/2", 0, 1)
 
         tm = Message()
         rm = Message()
@@ -390,8 +315,8 @@ class RouterTest(unittest.TestCase):
         M2.accept(rx_tracker)
         M2.settle(rx_tracker)
 
-        self.flush(M2)
-        self.flush(M1)
+        M2.flush()
+        M1.flush()
 
         self.assertEqual(ACCEPTED, M1.status(tx_tracker))
 
@@ -408,8 +333,8 @@ class RouterTest(unittest.TestCase):
         M2.reject(rx_tracker)
         M2.settle(rx_tracker)
 
-        self.flush(M2)
-        self.flush(M1)
+        M2.flush()
+        M1.flush()
 
         self.assertEqual(REJECTED, M1.status(tx_tracker))
 
@@ -418,8 +343,8 @@ class RouterTest(unittest.TestCase):
 
 
     def test_04_unsettled_undeliverable(self):
-        addr = "amqp://0.0.0.0:20100/unsettled_undeliverable/1"
-        M1 = Messenger()
+        addr = self.routers[0].addresses[0]+"/unsettled_undeliverable/1"
+        M1 = self.messenger()
 
         M1.timeout = 1.0
         M1.outgoing_window = 5
@@ -431,7 +356,7 @@ class RouterTest(unittest.TestCase):
 
         tx_tracker = M1.put(tm)
         M1.send(0)
-        self.flush(M1)
+        M1.flush()
         self.assertEqual(RELEASED, M1.status(tx_tracker))
 
         M1.stop()
@@ -439,11 +364,11 @@ class RouterTest(unittest.TestCase):
 
     def test_05_three_ack(self):
         addr = "amqp:/three_ack/1"
-        M1 = Messenger()
-        M2 = Messenger()
+        M1 = self.messenger()
+        M2 = self.messenger()
 
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -453,8 +378,8 @@ class RouterTest(unittest.TestCase):
 
         M1.start()
         M2.start()
-        self.subscribe(M2, addr)
-        wait_for_addr(M1, "three_ack/1", 0, 1)
+        M2.subscribe(addr, flush=True)
+        self.routers[0].wait_address("three_ack/1", 0, 1)
 
         tm = Message()
         rm = Message()
@@ -471,15 +396,15 @@ class RouterTest(unittest.TestCase):
 
         M2.accept(rx_tracker)
 
-        self.flush(M2)
-        self.flush(M1)
+        M2.flush()
+        M1.flush()
 
         self.assertEqual(ACCEPTED, M1.status(tx_tracker))
 
         M1.settle(tx_tracker)
 
-        self.flush(M1)
-        self.flush(M2)
+        M1.flush()
+        M2.flush()
 
         ##
         ## We need a way to verify on M2 (receiver) that the tracker has been
@@ -488,35 +413,35 @@ class RouterTest(unittest.TestCase):
 
         M2.settle(rx_tracker)
 
-        self.flush(M2)
-        self.flush(M1)
+        M2.flush()
+        M1.flush()
 
         M1.stop()
         M2.stop()
 
 
     def notest_06_link_route_sender(self):
-        pass 
+        pass
 
     def notest_07_link_route_receiver(self):
-        pass 
+        pass
 
 
     def test_08_delivery_annotations(self):
         addr = "amqp:/ma/1"
-        M1 = Messenger()
-        M2 = Messenger()
+        M1 = self.messenger()
+        M2 = self.messenger()
 
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
 
         M1.start()
         M2.start()
-        self.subscribe(M2, addr)
-        wait_for_addr(M1, "ma/1", 0, 1)
+        M2.subscribe(addr, flush=True)
+        self.routers[0].wait_address("ma/1", 0, 1)
 
         tm = Message()
         rm = Message()
@@ -545,11 +470,11 @@ class RouterTest(unittest.TestCase):
 
 
     def test_09_management(self):
-        M = Messenger()
+        M = self.messenger()
         M.timeout = 2.0
         M.start()
-        M.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        sub = self.subscribe(M, "amqp:/#")
+        M.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        sub = M.subscribe("amqp:/#")
         reply = sub.address
 
         request  = Message()
@@ -583,15 +508,15 @@ class RouterTest(unittest.TestCase):
 
     def test_10_semantics_multicast(self):
         addr = "amqp:/multicast/1"
-        M1 = Messenger()
-        M2 = Messenger()
-        M3 = Messenger()
-        M4 = Messenger()
-
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-        M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1 = self.messenger()
+        M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -603,10 +528,10 @@ class RouterTest(unittest.TestCase):
         M3.start()
         M4.start()
 
-        self.subscribe(M2, addr)
-        self.subscribe(M3, addr)
-        self.subscribe(M4, addr)
-        wait_for_addr(M1, "multicast/1", 1, 1)
+        M2.subscribe(addr, flush=True)
+        M3.subscribe(addr, flush=True)
+        M4.subscribe(addr, flush=True)
+        self.routers[0].wait_address("multicast/1", 1, 1)
 
         tm = Message()
         rm = Message()
@@ -646,15 +571,15 @@ class RouterTest(unittest.TestCase):
 
     def test_11a_semantics_closest_is_local(self):
         addr = "amqp:/closest/1"
-        M1 = Messenger()
-        M2 = Messenger()
-        M3 = Messenger()
-        M4 = Messenger()
-
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-        M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1 = self.messenger()
+        M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -666,10 +591,10 @@ class RouterTest(unittest.TestCase):
         M3.start()
         M4.start()
 
-        self.subscribe(M2, addr)
-        self.subscribe(M3, addr)
-        self.subscribe(M4, addr)
-        wait_for_addr(M1, "closest/1", 1, 1)
+        M2.subscribe(addr, flush=True)
+        M3.subscribe(addr, flush=True)
+        M4.subscribe(addr, flush=True)
+        self.routers[0].wait_address("closest/1", 1, 1)
 
         tm = Message()
         rm = Message()
@@ -699,15 +624,15 @@ class RouterTest(unittest.TestCase):
 
     def test_11b_semantics_closest_is_remote(self):
         addr = "amqp:/closest/2"
-        M1 = Messenger()
-        M2 = Messenger()
-        M3 = Messenger()
-        M4 = Messenger()
-
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-        M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1 = self.messenger()
+        M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
@@ -719,9 +644,9 @@ class RouterTest(unittest.TestCase):
         M3.start()
         M4.start()
 
-        self.subscribe(M2, addr)
-        self.subscribe(M4, addr)
-        wait_for_addr(M1, "closest/2", 0, 1)
+        M2.subscribe(addr, flush=True)
+        M4.subscribe(addr, flush=True)
+        self.routers[0].wait_address("closest/2", 0, 1)
 
         tm = Message()
         rm = Message()
@@ -755,15 +680,15 @@ class RouterTest(unittest.TestCase):
 
     def test_12_semantics_spread(self):
         addr = "amqp:/spread/1"
-        M1 = Messenger()
-        M2 = Messenger()
-        M3 = Messenger()
-        M4 = Messenger()
-
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-        M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1 = self.messenger()
+        M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 0.2
@@ -774,10 +699,10 @@ class RouterTest(unittest.TestCase):
         M2.start()
         M3.start()
         M4.start()
-        self.subscribe(M2, addr)
-        self.subscribe(M3, addr)
-        self.subscribe(M4, addr)
-        wait_for_addr(M1, "spread/1", 1, 1)
+        M2.subscribe(addr, flush=True)
+        M3.subscribe(addr, flush=True)
+        M4.subscribe(addr, flush=True)
+        self.routers[0].wait_address("spread/1", 1, 1)
 
         tm = Message()
         rm = Message()
@@ -834,19 +759,19 @@ class RouterTest(unittest.TestCase):
 
     def test_13_to_override(self):
         addr = "amqp:/toov/1"
-        M1 = Messenger()
-        M2 = Messenger()
+        M1 = self.messenger()
+        M2 = self.messenger()
 
-        M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
-        M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.timeout = 1.0
         M2.timeout = 1.0
 
         M1.start()
         M2.start()
-        self.subscribe(M2, addr)
-        wait_for_addr(M1, "toov/1", 0, 1)
+        M2.subscribe(addr, flush=True)
+        self.routers[0].wait_address("toov/1", 0, 1)
 
         tm = Message()
         rm = Message()
@@ -879,7 +804,4 @@ if __name__ == '__main__':
         if '--ssl' in sys.argv:
             sys.argv.remove('--ssl')
             RouterTest.ssl_option = "ssl"
-            print "...Using SSL configuration"
-        else:
-            print "...Using non-SSL configuration"
     unittest.main()



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to