Repository: qpid-dispatch Updated Branches: refs/heads/master fafd1d618 -> 15b70c433
NO-JIRA: add more tests for the edge router Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/15b70c43 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/15b70c43 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/15b70c43 Branch: refs/heads/master Commit: 15b70c43379c4fe730922d10425ef96d76ce9eeb Parents: fafd1d6 Author: Kenneth Giusti <[email protected]> Authored: Thu Dec 6 15:37:23 2018 -0500 Committer: Kenneth Giusti <[email protected]> Committed: Wed Dec 12 11:44:14 2018 -0500 ---------------------------------------------------------------------- tests/system_test.py | 9 ++-- tests/system_tests_edge_router.py | 93 +++++++++++++++++++++++++++++++++- tests/test_broker.py | 30 ++++++++--- 3 files changed, 119 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/system_test.py ---------------------------------------------------------------------- diff --git a/tests/system_test.py b/tests/system_test.py index ef15959..8820cc2 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -717,11 +717,14 @@ def main_module(): class AsyncTestReceiver(MessagingHandler): """ A simple receiver that runs in the background and queues any received - messages. Messages can be retrieved from this thread via the queue member + messages. Messages can be retrieved from this thread via the queue member. + :param wait: block the constructor until the link has been fully + established. """ Empty = Queue.Empty - def __init__(self, address, source, conn_args=None, container_id=None): + def __init__(self, address, source, conn_args=None, container_id=None, + wait=True): super(AsyncTestReceiver, self).__init__() self.address = address self.source = source @@ -736,7 +739,7 @@ class AsyncTestReceiver(MessagingHandler): self._thread = Thread(target=self._main) self._thread.daemon = True self._thread.start() - if self._ready.wait(timeout=TIMEOUT) is False: + if wait and self._ready.wait(timeout=TIMEOUT) is False: raise Exception("Timed out waiting for receiver start") def _main(self): http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/system_tests_edge_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index b84e214..3e2ead5 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -23,6 +23,7 @@ from __future__ import absolute_import from __future__ import print_function from time import sleep +from threading import Event from threading import Timer import unittest2 as unittest @@ -30,10 +31,13 @@ from proton import Message, Timeout from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy from system_test import AsyncTestReceiver from system_test import AsyncTestSender +from system_test import QdManager from system_tests_link_routes import ConnLinkRouteService from test_broker import FakeService +from test_broker import FakeBrokerStop from proton.handlers import MessagingHandler from proton.reactor import Container, DynamicNodeProperties +from proton.utils import BlockingConnection from qpid_dispatch.management.client import Node from subprocess import PIPE, STDOUT import re @@ -736,6 +740,91 @@ class LinkRouteProxyTest(TestCase): tr.queue.get(timeout=TIMEOUT) tr.stop() + def _validate_topology(self, router, expected_links, address): + """ + query existing links and verify they are set up as expected + """ + mgmt = QdManager(self, address=router) + # fetch all the connections + cl = mgmt.query('org.apache.qpid.dispatch.connection') + # map them by their identity + conns = dict([(c['identity'], c) for c in cl]) + + # now fetch all links for the address + ll = mgmt.query('org.apache.qpid.dispatch.router.link') + test_links = [l for l in ll if + l.get('owningAddr', '').find(address) != -1] + self.assertEqual(len(expected_links), len(test_links)) + + for elink in expected_links: + matches = filter(lambda l: (l['linkDir'] == elink[0] + and + conns[l['connectionId']]['container'] == elink[1] + and + conns[l['connectionId']]['role'] == elink[2]), + test_links) + self.assertTrue(len(matches) == 1) + + def test_link_topology(self): + """ + Verify that the link topology that results from activating a link route + and sending traffic is correct + """ + fs = FakeService(self.EA1.route_container) + self.INT_B.wait_address("CfgLinkRoute1") + + # create a sender on one edge and the receiver on another + bc_b = BlockingConnection(self.EB1.listener, timeout=TIMEOUT) + erx = bc_b.create_receiver(address="CfgLinkRoute1/buhbye", credit=10) + bc_a = BlockingConnection(self.EA1.listener, timeout=TIMEOUT) + etx = bc_a.create_sender(address="CfgLinkRoute1/buhbye") + + etx.send(Message(body="HI THERE"), timeout=TIMEOUT) + self.assertEqual("HI THERE", erx.receive(timeout=TIMEOUT).body) + erx.accept() + + # expect the following links have been established for the + # "CfgLinkRoute1/buhbye" address: + + # EA1 + # 1 out link to INT.A (connection role: edge) + # 1 in link from bc_a (normal) + # 1 in link from FakeBroker (route-container) + # 1 out link to FakeBroker (route-container) + # INT.A + # 1 in link from EA1 (edge) + # 1 out link to INT.B (inter-router) + # INT.B + # 1 out link to EB1 (edge) + # 1 in link from INT.A (inter-router) + # EB1 + # 1 out link to bc_b (normal) + # 1 in link from INT.B (edge) + + expect = { + self.EA1.listener: [ + ('in', bc_a.container.container_id, 'normal'), + ('in', 'FakeBroker', 'route-container'), + ('out', 'FakeBroker', 'route-container'), + ('out', 'INT.A', 'edge')], + self.INT_A.listener: [ + ('in', 'EA1', 'edge'), + ('out', 'INT.B', 'inter-router')], + self.INT_B.listener: [ + ('in', 'INT.A', 'inter-router'), + ('out', 'EB1', 'edge')], + self.EB1.listener: [ + ('in', 'INT.B', 'edge'), + ('out', bc_b.container.container_id, 'normal')] + } + for router, expected_links in expect.items(): + self._validate_topology(router, expected_links, + 'CfgLinkRoute1/buhbye') + + fs.join() + self.assertEqual(1, fs.in_count) + self.assertEqual(1, fs.out_count) + def test_link_route_proxy_configured(self): """ Activate the configured link routes via a FakeService, verify proxies @@ -777,7 +866,7 @@ class LinkRouteProxyTest(TestCase): configured some link routes. Then have clients on the interior exchange messages via the fake service. """ - fs = ConnLinkRouteService(self.EA1.addresses[1], + fs = ConnLinkRouteService(self.EA1.route_container, container_id="FakeService", config = [("ConnLinkRoute1", {"pattern": "Conn/*/One", @@ -866,7 +955,7 @@ class LinkRouteProxyTest(TestCase): # activate the pre-configured link routes ea1_mgmt = self.EA1.management - fs = FakeService(self.EA1.addresses[1]) + fs = FakeService(self.EA1.route_container) self.INT_B.wait_address("CfgLinkRoute1") for i in range(10): http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/15b70c43/tests/test_broker.py ---------------------------------------------------------------------- diff --git a/tests/test_broker.py b/tests/test_broker.py index 0b9bdfa..a05df02 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -39,6 +39,11 @@ from proton.reactor import AtMostOnce from system_test import TIMEOUT +class FakeBrokerStop(Exception): + """stop the broker from a handler callback""" + pass + + class FakeBroker(MessagingHandler): """ A fake broker-like service that listens for client connections @@ -104,14 +109,23 @@ class FakeBroker(MessagingHandler): def _main(self): self._container.timeout = 1.0 self._container.start() - while self._container.process(): - if self._stop_thread: - if self.acceptor: - self.acceptor.close() - self.acceptor = None - for c in self._connections: - c.close() - self._connections = [] + + try: + while self._container.process(): + if self._stop_thread: + break + + if self.acceptor: + self.acceptor.close() + self.acceptor = None + for c in self._connections: + c.close() + self._connections = [] + self._container.process() + except FakeBrokerStop: + # this abruptly kills the broker useful to test how dispatch deals + # with hung/stopped containers + pass def join(self): self._stop_thread = True --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
