Author: aconway
Date: Fri Jun 13 19:08:37 2014
New Revision: 1602498
URL: http://svn.apache.org/r1602498
Log:
NO-JIRA: convert system_tests_two_routers to system_test framework.
Modified:
qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_two_routers.py
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py?rev=1602498&r1=1602497&r2=1602498&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/node.py Fri
Jun 13 19:08:37 2014
@@ -137,7 +137,7 @@ class Node(object):
NODE_TYPE='org.amqp.management' # AMQP management node type
NODE_PROPERTIES={'name':SELF, 'type':NODE_TYPE}
- def __init__(self, address=None, router=None, locales=None):
+ def __init__(self, address=None, router=None, locales=None, timeout=10):
"""
@param address: AMQP address of the management node.
@param router: If address does not contain a path, use the management
node for this router ID.
@@ -155,7 +155,7 @@ class Node(object):
self.messenger = proton.Messenger()
self.messenger.start()
- self.messenger.timeout = 1 # FIXME aconway 2014-06-02: config
+ self.messenger.timeout = timeout
subscribe_address = Url(address)
subscribe_address.path = "#"
self.subscription = self.messenger.subscribe(str(subscribe_address))
@@ -173,7 +173,7 @@ class Node(object):
def _flush(self):
"""Call self.messenger.work() till there is no work left."""
- while self.messenger.work(0.01):
+ while self.messenger.work(0.1):
pass
CORRELATION_ID = 0
Modified: qpid/dispatch/trunk/tests/system_test.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1602498&r1=1602497&r2=1602498&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Fri Jun 13 19:08:37 2014
@@ -297,15 +297,17 @@ class Qdrouterd(Process):
self.defaults()
return "".join(["%s {\n%s}\n"%(n, props(p)) for n, p in self])
- def __init__(self, name, config=Config(), wait=True):
+ def __init__(self, name, config=Config(), pyinclude=None, wait=True):
"""
@param name: name used for for output files.
@param config: router configuration
@keyword wait: wait for router to be ready (call self.wait_ready())
"""
self.config = copy(config)
+ if not pyinclude and os.environ['QPID_DISPATCH_HOME']:
+ pyinclude = os.path.join(os.environ['QPID_DISPATCH_HOME'],
'python')
super(Qdrouterd, self).__init__(
- name, ['qdrouterd', '-c', config.write(name)],
expect=Process.RUNNING)
+ name, ['qdrouterd', '-c', config.write(name), '-I', pyinclude],
expect=Process.RUNNING)
self._agent = None
if wait:
self.wait_ready()
@@ -346,10 +348,31 @@ class Qdrouterd(Process):
return c
return None
- def wait_connectors(self):
- """Wait for all connectors to be connected"""
+ def wait_address(self, address, subscribers=0, remotes=0, **retry_kwargs):
+ """
+ Wait for an address to be visible on the router.
+ @keyword subscribers: Wait till subscriberCount >= subscribers
+ @keyword remotes: Wait till remoteCount >= remotes
+ @param retry_kwargs: keyword args for L{retry}
+ """
+ def check():
+ # FIXME aconway 2014-06-12: this should be a request by name, not
a query.
+ addrs = self.agent.query(
+ entity_type='org.apache.qpid.dispatch.router.address',
+ attribute_names=['name', 'subscriberCount', 'remoteCount'])
+ # FIXME aconway 2014-06-12: endswith check is because of M0/L
prefixes
+ addrs = [a for a in addrs if a.name.endswith(address)]
+ return addrs and addrs[0].subscriberCount >= subscribers and
addrs[0].remoteCount >= remotes
+ assert retry(check, **retry_kwargs)
+
+
+ def wait_connectors(self, **retry_kwargs):
+ """
+ Wait for all connectors to be connected
+ @param retry_kwargs: keyword args for L{retry}
+ """
for c in self.config.sections('connector'):
- retry(lambda: self.is_connected(c['port']))
+ assert retry(lambda: self.is_connected(c['port']), **retry_kwargs)
def wait_ready(self):
"""Wait for ports and connectors to be ready"""
@@ -531,7 +554,8 @@ class TestCase(unittest.TestCase, Tester
@classmethod
def base_dir(cls):
if not cls._base_dir:
- cls._base_dir = os.path.abspath(os.path.join(__name__+'.dir',
cls.__name__))
+ cls._base_dir = os.path.abspath(
+ os.path.join(__name__+'.dir', cls.__module__, cls.__name__))
return cls._base_dir
@classmethod
Modified: qpid/dispatch/trunk/tests/system_tests_two_routers.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_two_routers.py?rev=1602498&r1=1602497&r2=1602498&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_two_routers.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_two_routers.py Fri Jun 13 19:08:37
2014
@@ -17,161 +17,86 @@
# under the License.
#
-import sys
-import os
-import time
-import unittest
-import subprocess
-from proton import Messenger, Message, PENDING, ACCEPTED, REJECTED, RELEASED
-
-def wait_for_addr(messenger, addr, local_count, remote_count):
- msub = messenger.subscribe("amqp:/#")
- reply = msub.address
- req = Message()
- rsp = Message()
-
- done = False
- while not done:
- req.address = "amqp:/_local/$management"
- req.reply_to = reply
- req.properties = {u'operation':u'QUERY',
u'entityType':u'org.apache.qpid.dispatch.router.address'}
- req.body = {u'attributeNames': [u'name', u'subscriberCount',
u'remoteCount']}
- messenger.put(req)
- messenger.send()
- messenger.recv()
- messenger.get(rsp)
- for item in rsp.body[u'results']:
- if item[0][2:] == addr and \
- local_count == item[1] and \
- remote_count == item[2]:
- done = True
- time.sleep(0.2)
-
-def wait_for_routethrough(messenger, addr):
- msub = messenger.subscribe("amqp:/#")
- reply = msub.address
- req = Message()
- rsp = Message()
-
- done = False
- while not done:
- req.address = "amqp:/_topo/0/%s/$management" % addr
- req.reply_to = reply
- req.properties = {u'operation':u'GET-OPERATIONS',
u'type':u'org.amqp.management', u'name':u'self'}
- messenger.put(req)
- messenger.send()
- try:
- messenger.recv()
- done = True
- except Exception:
- pass
- time.sleep(0.2)
-
-def startRouter(obj):
- default_home = os.path.normpath('/usr/lib/qpid-dispatch')
- home = os.environ.get('QPID_DISPATCH_HOME', default_home)
- if obj.ssl_option == "ssl":
- configA_file = '%s/tests/config-2/A-ssl.conf' % home
- configB_file = '%s/tests/config-2/B-ssl.conf' % home
- else:
- configA_file = '%s/tests/config-2/A.conf' % home
- configB_file = '%s/tests/config-2/B.conf' % home
-
- obj.routerA = subprocess.Popen(['qdrouterd', '-c', configA_file],
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE)
- obj.routerB = subprocess.Popen(['qdrouterd', '-c', configB_file],
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE)
- time.sleep(1)
- print "Waiting for router topology to stabilize..."
-
- M1 = Messenger()
- M2 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
-
- M1.start()
- M2.start()
-
- M1.timeout = 0.5
- M2.timeout = 0.5
-
- wait_for_routethrough(M1, "QDR.B")
- wait_for_routethrough(M2, "QDR.A")
-
- M1.stop()
- M2.stop()
-
-
-def stopRouter(obj):
- obj.routerA.terminate()
- obj.routerB.terminate()
- obj.routerA.wait()
- obj.routerB.wait()
+import unittest, sys, time, os
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
+from system_test import TestCase, Messenger, Qdrouterd, retry_exception
+from qpid_dispatch_internal.management import Node
-
-class RouterTest(unittest.TestCase):
+class RouterTest(TestCase):
ssl_option = None
- if (sys.version_info[0] == 2) and (sys.version_info[1] < 7):
- def setUp(self):
- startRouter(self)
-
- def tearDown(self):
- stopRouter(self)
- else:
- @classmethod
- def setUpClass(cls):
- startRouter(cls)
-
- @classmethod
- def tearDownClass(cls):
- stopRouter(cls)
-
- def flush(self, messenger):
- while messenger.work(0.1):
- pass
-
- def subscribe(self, messenger, address):
- sub = messenger.subscribe(address)
- self.flush(messenger)
- return sub
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router and a messenger"""
+ super(RouterTest, cls).setUpClass()
+
+ def ssl_config(password):
+ if not cls.ssl_option: return []
+ def ssl_file(name):
+ return os.path.join(os.path.dirname(__file__), 'config-2',
name)
+ return [
+ ('ssl-profile', {
+ 'name': 'ssl-profile-name',
+ 'cert-db': ssl_file('ca-certificate.pem'),
+ 'cert-file': ssl_file('server-certificate.pem'),
+ 'key-file': ssl_file('server-private-key.pem'),
+ 'password': password})]
+
+ def router(name, password, connection):
+ if cls.ssl_option:
+ connection[1]['ssl-profile'] = 'ssl-profile-name'
+ config = Qdrouterd.Config(ssl_config(password) + [
+ ('log', {'module':'DEFAULT', 'level':'trace',
'output':name+".log"}),
+ ('container', {'worker-threads': 4, 'container-name':
'Qpid.Dispatch.Router.%s'%name}),
+ ('router', {'mode': 'interior', 'router-id': 'QDR.%s'%name}),
+ ('listener', {'port': cls.tester.get_port()}),
+ ('fixed-address', {'prefix': '/closest/', 'fanout': 'single',
'bias': 'closest'}),
+ ('fixed-address', {'prefix': '/spread/', 'fanout': 'single',
'bias': 'spread'}),
+ ('fixed-address', {'prefix': '/multicast/', 'fanout':
'multiple'}),
+ ('fixed-address', {'prefix': '/', 'fanout': 'multiple'}),
+ connection
+ ])
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+ router('A', 'server-password',
+ ('listener', {'role': 'inter-router', 'port':
cls.tester.get_port()}))
+ router('B', 'client-password',
+ ('connector', {'role': 'inter-router', 'port':
cls.routers[0].ports[1]}))
+
+ def query_through(address, router):
+ n = Node(address, router, timeout=0.2)
+ retry_exception(lambda: n.query('org.apache.qpid.dispatch.router'))
+ # Wait till we can query through each router to the other.
+ query_through(cls.routers[0].addresses[0], 'QDR.B')
+ query_through(cls.routers[1].addresses[0], 'QDR.A')
+
def test_00_discard(self):
- addr = "amqp://0.0.0.0:20100/discard/1"
- M1 = Messenger()
- M1.timeout = 1.0
- M1.start()
+ addr = self.routers[0].addresses[0]+"/discard/1"
+ M1 = self.messenger()
tm = Message()
tm.address = addr
for i in range(100):
tm.body = {'number': i}
M1.put(tm)
M1.send()
- M1.stop()
-
def test_01_pre_settled(self):
addr = "amqp:/pre_settled/1"
- M1 = Messenger()
- M2 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1 = self.messenger()
+ M2 = self.messenger()
- M1.timeout = 1.0
- M2.timeout = 1.0
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
- M1.start()
- M2.start()
- self.subscribe(M2, addr)
+ M2.subscribe(addr, flush=True)
tm = Message()
rm = Message()
- wait_for_addr(M1, "pre_settled/1", 0, 1)
+ self.routers[0].wait_address("pre_settled/1", 0, 1, timeout=30)
tm.address = addr
for i in range(100):
@@ -190,15 +115,15 @@ class RouterTest(unittest.TestCase):
def test_02_multicast(self):
addr = "amqp:/pre_settled/multicast/1"
- M1 = Messenger()
- M2 = Messenger()
- M3 = Messenger()
- M4 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
- M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1 = self.messenger()
+ M2 = self.messenger()
+ M3 = self.messenger()
+ M4 = self.messenger()
+
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+ M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -209,10 +134,10 @@ class RouterTest(unittest.TestCase):
M2.start()
M3.start()
M4.start()
- self.subscribe(M2, addr)
- self.subscribe(M3, addr)
- self.subscribe(M4, addr)
- wait_for_addr(M1, "pre_settled/multicast/1", 1, 1)
+ M2.subscribe(addr, flush=True)
+ M3.subscribe(addr, flush=True)
+ M4.subscribe(addr, flush=True)
+ self.routers[0].wait_address("pre_settled/multicast/1", 1, 1)
tm = Message()
rm = Message()
@@ -244,15 +169,15 @@ class RouterTest(unittest.TestCase):
def test_02a_multicast_unsettled(self):
addr = "amqp:/pre_settled/multicast/2"
- M1 = Messenger()
- M2 = Messenger()
- M3 = Messenger()
- M4 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
- M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1 = self.messenger()
+ M2 = self.messenger()
+ M3 = self.messenger()
+ M4 = self.messenger()
+
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+ M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -269,10 +194,10 @@ class RouterTest(unittest.TestCase):
M3.start()
M4.start()
- self.subscribe(M2, addr)
- self.subscribe(M3, addr)
- self.subscribe(M4, addr)
- wait_for_addr(M1, "pre_settled/multicast/2", 1, 1)
+ M2.subscribe(addr, flush=True)
+ M3.subscribe(addr, flush=True)
+ M4.subscribe(addr, flush=True)
+ self.routers[0].wait_address("pre_settled/multicast/2", 1, 1)
tm = Message()
rm = Message()
@@ -310,11 +235,11 @@ class RouterTest(unittest.TestCase):
def test_02c_sender_settles_first(self):
addr = "amqp:/settled/senderfirst/1"
- M1 = Messenger()
- M2 = Messenger()
+ M1 = self.messenger()
+ M2 = self.messenger()
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -324,8 +249,8 @@ class RouterTest(unittest.TestCase):
M1.start()
M2.start()
- self.subscribe(M2, addr)
- wait_for_addr(M1, "settled/senderfirst/1", 0, 1)
+ M2.subscribe(addr, flush=True)
+ self.routers[0].wait_address("settled/senderfirst/1", 0, 1)
tm = Message()
rm = Message()
@@ -336,8 +261,8 @@ class RouterTest(unittest.TestCase):
M1.send(0)
M1.settle(ttrk)
- self.flush(M1)
- self.flush(M2)
+ M1.flush()
+ M2.flush()
M2.recv(1)
rtrk = M2.get(rm)
@@ -345,8 +270,8 @@ class RouterTest(unittest.TestCase):
M2.settle(rtrk)
self.assertEqual(0, rm.body['number'])
- self.flush(M1)
- self.flush(M2)
+ M1.flush()
+ M2.flush()
M1.stop()
M2.stop()
@@ -354,11 +279,11 @@ class RouterTest(unittest.TestCase):
def test_03_propagated_disposition(self):
addr = "amqp:/unsettled/2"
- M1 = Messenger()
- M2 = Messenger()
+ M1 = self.messenger()
+ M2 = self.messenger()
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -368,8 +293,8 @@ class RouterTest(unittest.TestCase):
M1.start()
M2.start()
- self.subscribe(M2, addr)
- wait_for_addr(M1, "unsettled/2", 0, 1)
+ M2.subscribe(addr, flush=True)
+ self.routers[0].wait_address("unsettled/2", 0, 1)
tm = Message()
rm = Message()
@@ -390,8 +315,8 @@ class RouterTest(unittest.TestCase):
M2.accept(rx_tracker)
M2.settle(rx_tracker)
- self.flush(M2)
- self.flush(M1)
+ M2.flush()
+ M1.flush()
self.assertEqual(ACCEPTED, M1.status(tx_tracker))
@@ -408,8 +333,8 @@ class RouterTest(unittest.TestCase):
M2.reject(rx_tracker)
M2.settle(rx_tracker)
- self.flush(M2)
- self.flush(M1)
+ M2.flush()
+ M1.flush()
self.assertEqual(REJECTED, M1.status(tx_tracker))
@@ -418,8 +343,8 @@ class RouterTest(unittest.TestCase):
def test_04_unsettled_undeliverable(self):
- addr = "amqp://0.0.0.0:20100/unsettled_undeliverable/1"
- M1 = Messenger()
+ addr = self.routers[0].addresses[0]+"/unsettled_undeliverable/1"
+ M1 = self.messenger()
M1.timeout = 1.0
M1.outgoing_window = 5
@@ -431,7 +356,7 @@ class RouterTest(unittest.TestCase):
tx_tracker = M1.put(tm)
M1.send(0)
- self.flush(M1)
+ M1.flush()
self.assertEqual(RELEASED, M1.status(tx_tracker))
M1.stop()
@@ -439,11 +364,11 @@ class RouterTest(unittest.TestCase):
def test_05_three_ack(self):
addr = "amqp:/three_ack/1"
- M1 = Messenger()
- M2 = Messenger()
+ M1 = self.messenger()
+ M2 = self.messenger()
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -453,8 +378,8 @@ class RouterTest(unittest.TestCase):
M1.start()
M2.start()
- self.subscribe(M2, addr)
- wait_for_addr(M1, "three_ack/1", 0, 1)
+ M2.subscribe(addr, flush=True)
+ self.routers[0].wait_address("three_ack/1", 0, 1)
tm = Message()
rm = Message()
@@ -471,15 +396,15 @@ class RouterTest(unittest.TestCase):
M2.accept(rx_tracker)
- self.flush(M2)
- self.flush(M1)
+ M2.flush()
+ M1.flush()
self.assertEqual(ACCEPTED, M1.status(tx_tracker))
M1.settle(tx_tracker)
- self.flush(M1)
- self.flush(M2)
+ M1.flush()
+ M2.flush()
##
## We need a way to verify on M2 (receiver) that the tracker has been
@@ -488,35 +413,35 @@ class RouterTest(unittest.TestCase):
M2.settle(rx_tracker)
- self.flush(M2)
- self.flush(M1)
+ M2.flush()
+ M1.flush()
M1.stop()
M2.stop()
def notest_06_link_route_sender(self):
- pass
+ pass
def notest_07_link_route_receiver(self):
- pass
+ pass
def test_08_delivery_annotations(self):
addr = "amqp:/ma/1"
- M1 = Messenger()
- M2 = Messenger()
+ M1 = self.messenger()
+ M2 = self.messenger()
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
M1.start()
M2.start()
- self.subscribe(M2, addr)
- wait_for_addr(M1, "ma/1", 0, 1)
+ M2.subscribe(addr, flush=True)
+ self.routers[0].wait_address("ma/1", 0, 1)
tm = Message()
rm = Message()
@@ -545,11 +470,11 @@ class RouterTest(unittest.TestCase):
def test_09_management(self):
- M = Messenger()
+ M = self.messenger()
M.timeout = 2.0
M.start()
- M.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- sub = self.subscribe(M, "amqp:/#")
+ M.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ sub = M.subscribe("amqp:/#")
reply = sub.address
request = Message()
@@ -583,15 +508,15 @@ class RouterTest(unittest.TestCase):
def test_10_semantics_multicast(self):
addr = "amqp:/multicast/1"
- M1 = Messenger()
- M2 = Messenger()
- M3 = Messenger()
- M4 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
- M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1 = self.messenger()
+ M2 = self.messenger()
+ M3 = self.messenger()
+ M4 = self.messenger()
+
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+ M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -603,10 +528,10 @@ class RouterTest(unittest.TestCase):
M3.start()
M4.start()
- self.subscribe(M2, addr)
- self.subscribe(M3, addr)
- self.subscribe(M4, addr)
- wait_for_addr(M1, "multicast/1", 1, 1)
+ M2.subscribe(addr, flush=True)
+ M3.subscribe(addr, flush=True)
+ M4.subscribe(addr, flush=True)
+ self.routers[0].wait_address("multicast/1", 1, 1)
tm = Message()
rm = Message()
@@ -646,15 +571,15 @@ class RouterTest(unittest.TestCase):
def test_11a_semantics_closest_is_local(self):
addr = "amqp:/closest/1"
- M1 = Messenger()
- M2 = Messenger()
- M3 = Messenger()
- M4 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
- M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1 = self.messenger()
+ M2 = self.messenger()
+ M3 = self.messenger()
+ M4 = self.messenger()
+
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+ M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -666,10 +591,10 @@ class RouterTest(unittest.TestCase):
M3.start()
M4.start()
- self.subscribe(M2, addr)
- self.subscribe(M3, addr)
- self.subscribe(M4, addr)
- wait_for_addr(M1, "closest/1", 1, 1)
+ M2.subscribe(addr, flush=True)
+ M3.subscribe(addr, flush=True)
+ M4.subscribe(addr, flush=True)
+ self.routers[0].wait_address("closest/1", 1, 1)
tm = Message()
rm = Message()
@@ -699,15 +624,15 @@ class RouterTest(unittest.TestCase):
def test_11b_semantics_closest_is_remote(self):
addr = "amqp:/closest/2"
- M1 = Messenger()
- M2 = Messenger()
- M3 = Messenger()
- M4 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
- M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1 = self.messenger()
+ M2 = self.messenger()
+ M3 = self.messenger()
+ M4 = self.messenger()
+
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+ M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
@@ -719,9 +644,9 @@ class RouterTest(unittest.TestCase):
M3.start()
M4.start()
- self.subscribe(M2, addr)
- self.subscribe(M4, addr)
- wait_for_addr(M1, "closest/2", 0, 1)
+ M2.subscribe(addr, flush=True)
+ M4.subscribe(addr, flush=True)
+ self.routers[0].wait_address("closest/2", 0, 1)
tm = Message()
rm = Message()
@@ -755,15 +680,15 @@ class RouterTest(unittest.TestCase):
def test_12_semantics_spread(self):
addr = "amqp:/spread/1"
- M1 = Messenger()
- M2 = Messenger()
- M3 = Messenger()
- M4 = Messenger()
-
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
- M3.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M4.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1 = self.messenger()
+ M2 = self.messenger()
+ M3 = self.messenger()
+ M4 = self.messenger()
+
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+ M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 0.2
@@ -774,10 +699,10 @@ class RouterTest(unittest.TestCase):
M2.start()
M3.start()
M4.start()
- self.subscribe(M2, addr)
- self.subscribe(M3, addr)
- self.subscribe(M4, addr)
- wait_for_addr(M1, "spread/1", 1, 1)
+ M2.subscribe(addr, flush=True)
+ M3.subscribe(addr, flush=True)
+ M4.subscribe(addr, flush=True)
+ self.routers[0].wait_address("spread/1", 1, 1)
tm = Message()
rm = Message()
@@ -834,19 +759,19 @@ class RouterTest(unittest.TestCase):
def test_13_to_override(self):
addr = "amqp:/toov/1"
- M1 = Messenger()
- M2 = Messenger()
+ M1 = self.messenger()
+ M2 = self.messenger()
- M1.route("amqp:/*", "amqp://0.0.0.0:20100/$1")
- M2.route("amqp:/*", "amqp://0.0.0.0:20101/$1")
+ M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
M1.timeout = 1.0
M2.timeout = 1.0
M1.start()
M2.start()
- self.subscribe(M2, addr)
- wait_for_addr(M1, "toov/1", 0, 1)
+ M2.subscribe(addr, flush=True)
+ self.routers[0].wait_address("toov/1", 0, 1)
tm = Message()
rm = Message()
@@ -879,7 +804,4 @@ if __name__ == '__main__':
if '--ssl' in sys.argv:
sys.argv.remove('--ssl')
RouterTest.ssl_option = "ssl"
- print "...Using SSL configuration"
- else:
- print "...Using non-SSL configuration"
unittest.main()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]