Repository: qpid-dispatch Updated Branches: refs/heads/master 3406c55ed -> 7968e159e
DISPATCH-460 - Fixed problem with link-routed dynamic sources. Added a proper test. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7968e159 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7968e159 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7968e159 Branch: refs/heads/master Commit: 7968e159e8ed45707a7b0bd6709d12ce9956c6e9 Parents: 3406c55 Author: Ted Ross <[email protected]> Authored: Mon Jul 25 12:11:41 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon Jul 25 12:20:15 2016 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 11 +++++-- src/router_core/terminus.c | 2 +- tests/system_tests_link_routes.py | 58 +++++++++++++++++++++++++++++++++- 3 files changed, 67 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7968e159/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 701a91b..336b90f 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -779,6 +779,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, // qd_field_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus); if (dnp_address) { + qd_address_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_HASH); qd_address_iterator_override_prefix(dnp_address, qdr_prefix_for_dir(dir)); qd_hash_retrieve_prefix(core->addr_hash, dnp_address, (void**) &addr); qd_field_iterator_free(dnp_address); @@ -1098,11 +1099,17 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_terminus_free(target); } - else if (link_route) + else if (link_route) { // // This is a link-routed destination, forward the attach to the next hop // - qdr_forward_attach_CT(core, addr, link, source, target); + bool success = qdr_forward_attach_CT(core, addr, link, source, target); + if (!success) { + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); + qdr_terminus_free(source); + qdr_terminus_free(target); + } + } else { // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7968e159/src/router_core/terminus.c ---------------------------------------------------------------------- diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index 71b911d..4dbba75 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -169,7 +169,7 @@ qd_field_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term) if (pn_data_next(props)) { pn_bytes_t val = pn_data_get_string(props); if (val.start && *val.start != '\0') - return qd_field_iterator_string(val.start); + return qd_field_iterator_binary(val.start, val.size); } } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7968e159/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 1c080cf..0156edd 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -25,7 +25,7 @@ from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process from proton import Message, Endpoint from proton.handlers import MessagingHandler -from proton.reactor import AtMostOnce, Container +from proton.reactor import AtMostOnce, Container, DynamicNodeProperties from proton.utils import BlockingConnection, LinkDetached from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler @@ -505,6 +505,11 @@ class LinkRoutePatternTest(TestCase): drain_support.run() self.assertEqual(None, drain_support.error) + def test_dynamic_source(self): + test = DynamicSourceTest(self.routers[1].addresses[0], self.routers[1].addresses[1]) + test.run() + self.assertEqual(None, test.error) + class DeliveryTagsTest(MessagingHandler): def __init__(self, sender_address, listening_address, qdstat_address): @@ -630,6 +635,57 @@ class CloseWithUnsettledTest(MessagingHandler): def run(self): Container(self).run() + +class DynamicSourceTest(MessagingHandler): + ## + ## This test verifies that a dynamic source can be propagated via link-route to + ## a route-container. + ## + def __init__(self, normal_addr, route_addr): + super(DynamicSourceTest, self).__init__(prefetch=0, auto_accept=False) + self.normal_addr = normal_addr + self.route_addr = route_addr + self.dest = "pulp.task.DynamicSource" + self.address = "DynamicSourceAddress" + self.error = None + + def timeout(self): + self.error = "Timeout Expired - Check for cores" + self.conn_normal.close() + self.conn_route.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.conn_route = event.container.connect(self.route_addr) + + def on_connection_opened(self, event): + if event.connection == self.conn_route: + self.conn_normal = event.container.connect(self.normal_addr) + elif event.connection == self.conn_normal: + self.receiver = event.container.create_receiver(self.conn_normal, None, dynamic=True,options=DynamicNodeProperties({"x-opt-qd.address":u"pulp.task.abc"})) + + def on_link_opened(self, event): + if event.receiver == self.receiver: + if self.receiver.remote_source.address != self.address: + self.error = "Expected %s, got %s" % (self.address, self.receiver.remote_source.address) + self.conn_normal.close() + self.conn_route.close() + self.timer.cancel() + + def on_link_opening(self, event): + if event.sender: + self.sender = event.sender + if not self.sender.remote_source.dynamic: + self.error = "Expected sender with dynamic source" + self.conn_normal.close() + self.conn_route.close() + self.timer.cancel() + self.sender.source.address = self.address + self.sender.open() + + def run(self): + Container(self).run() + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
