DISPATCH-495 - Improved stability of autolinks and added a set of autolink tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bba79f3c Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bba79f3c Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bba79f3c Branch: refs/heads/master Commit: bba79f3c68d771624a9b1b54d36ae96f368205b4 Parents: 41b7407 Author: Ted Ross <[email protected]> Authored: Fri Sep 2 17:15:48 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Fri Sep 2 17:15:48 2016 -0400 ---------------------------------------------------------------------- src/container.c | 6 +- src/server.c | 2 +- tests/CMakeLists.txt | 1 + tests/system_tests_autolinks.py | 196 +++++++++++++++++++++++++++++++++++ 4 files changed, 202 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index 1c9fec8..92c0e0e 100644 --- a/src/container.c +++ b/src/container.c @@ -309,13 +309,14 @@ static int close_handler(qd_container_t *container, void* conn_context, pn_conne pn_link_t *pn_link = pn_link_head(conn, 0); while (pn_link) { qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); + pn_link_t *next = pn_link_next(pn_link, 0); if (link) { qd_node_t *node = link->node; if (node) { node->ntype->link_detach_handler(node->context, link, QD_LOST); } } - pn_link = pn_link_next(pn_link, 0); + pn_link = next; } // close the connection @@ -779,9 +780,10 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c link->close_sess_with_link = true; // - // Keep the borrowed link reference + // Keep the borrowed references // pn_incref(link->pn_link); + pn_incref(link->pn_sess); pn_link_set_context(link->pn_link, link); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index e74b879..aa08ae6 100644 --- a/src/server.c +++ b/src/server.c @@ -798,7 +798,7 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr) if (ctx->connector) { ce = QD_CONN_EVENT_CONNECTOR_OPEN; - ctx->connector->delay = 0; + ctx->connector->delay = 2000; // Delay on re-connect in case there is a recurring error } else assert(ctx->listener); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1c1be1c..9d133a7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -72,6 +72,7 @@ add_test(router_policy_test ${TEST_WRAP} -m unittest -v router_policy_test) foreach(py_test_module # system_tests_broker system_tests_link_routes + system_tests_autolinks system_tests_drain system_tests_management system_tests_one_router http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/tests/system_tests_autolinks.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_autolinks.py b/tests/system_tests_autolinks.py new file mode 100644 index 0000000..cc79c64 --- /dev/null +++ b/tests/system_tests_autolinks.py @@ -0,0 +1,196 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED +from system_test import TestCase, Qdrouterd, main_module +from proton.handlers import MessagingHandler +from proton.reactor import Container, AtMostOnce, AtLeastOnce +from proton.utils import BlockingConnection, SyncRequestResponse +from qpid_dispatch.management.client import Node + +CONNECTION_PROPERTIES = {u'connection': u'properties', u'int_property': 6451} + +class AutolinkTest(TestCase): + """System tests involving a single router""" + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(AutolinkTest, cls).setUpClass() + name = "test-router" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + # + # Create a general-purpose listener for sending and receiving deliveries + # + ('listener', {'port': cls.tester.get_port()}), + + # + # Create a route-container listener for the autolinks + # + ('listener', {'port': cls.tester.get_port(), 'role': 'route-container'}), + + # + # Create a pair of default auto-links for 'node.1' + # + ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'dir': 'in'}), + ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'dir': 'out'}), + ('address', {'prefix': 'node', 'waypoint': 'yes'}), + + # + # Create a pair of auto-links on non-default phases for container-to-container transfers + # + ('autoLink', {'addr': 'xfer.2', 'containerId': 'container.2', 'dir': 'in', 'phase': '4'}), + ('autoLink', {'addr': 'xfer.2', 'containerId': 'container.3', 'dir': 'out', 'phase': '4'}), + ]) + + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.normal_address = cls.router.addresses[0] + cls.route_address = cls.router.addresses[1] + + + def test_01_autolink_attach(self): + """ + Create the route-container connection and verify that the appropriate links are attached. + Disconnect, reconnect, and verify that the links are re-attached. + """ + test = AutolinkAttachTest(self.route_address) + test.run() + self.assertEqual(None, test.error) + + + def test_02_autolink_credit(self): + """ + Create a normal connection and a sender to the autolink address. Then create the route-container + connection and ensure that the on_sendable did not arrive until after the autolinks were created. + """ + test = AutolinkCreditTest(self.normal_address, self.route_address) + test.run() + self.assertEqual(None, test.error) + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class AutolinkAttachTest(MessagingHandler): + def __init__(self, address): + super(AutolinkAttachTest, self).__init__(prefetch=0) + self.address = address + self.error = None + self.sender = None + self.receiver = None + + self.n_rx_attach = 0 + self.n_tx_attach = 0 + + def timeout(self): + self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach) + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.conn = event.container.connect(self.address) + + def on_connection_closed(self, event): + if self.n_tx_attach == 1: + self.conn = event.container.connect(self.address) + + def on_link_opened(self, event): + if event.sender: + self.n_tx_attach += 1 + if event.sender.remote_source.address != 'node.1': + self.error = "Expected sender address 'node.1', got '%s'" % event.sender.remote_source.address + self.timer.cancel() + self.conn.close() + elif event.receiver: + self.n_rx_attach += 1 + if event.receiver.remote_target.address != 'node.1': + self.error = "Expected receiver address 'node.1', got '%s'" % event.receiver.remote_target.address + self.timer.cancel() + self.conn.close() + if self.n_tx_attach == 1 and self.n_rx_attach == 1: + self.conn.close() + if self.n_tx_attach == 2 and self.n_rx_attach == 2: + self.conn.close() + self.timer.cancel() + + def run(self): + container = Container(self) + container.container_id = 'container.1' + container.run() + + +class AutolinkCreditTest(MessagingHandler): + def __init__(self, normal_address, route_address): + super(AutolinkCreditTest, self).__init__(prefetch=0) + self.normal_address = normal_address + self.route_address = route_address + self.dest = 'node.1' + self.normal_conn = None + self.route_conn = None + self.error = None + self.last_action = "None" + + def timeout(self): + self.error = "Timeout Expired: last_action=%s" % self.last_action + if self.normal_conn: + self.normal_conn.close() + if self.route_conn: + self.route_conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.normal_conn = event.container.connect(self.normal_address) + self.sender = event.container.create_sender(self.normal_conn, self.dest) + self.last_action = "Attached normal sender" + + def on_link_opening(self, event): + if event.sender: + event.sender.source.address = event.sender.remote_source.address + if event.receiver: + event.receiver.target.address = event.receiver.remote_target.address + + def on_link_opened(self, event): + if event.sender == self.sender: + self.route_conn = event.container.connect(self.route_address) + self.last_action = "Opened route connection" + + def on_sendable(self, event): + if event.sender == self.sender: + if self.last_action != "Opened route connection": + self.error = "Events out of sequence: last_action=%s" % self.last_action + self.timer.cancel() + self.route_conn.close() + self.normal_conn.close() + + def run(self): + container = Container(self) + container.container_id = 'container.1' + container.run() + + +if __name__ == '__main__': + unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
