Repository: qpid-dispatch Updated Branches: refs/heads/master c685d046d -> 78cbc5e67
DISPATCH-981: disambiguated routed link names at ingress Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/78cbc5e6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/78cbc5e6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/78cbc5e6 Branch: refs/heads/master Commit: 78cbc5e6799fc8ed65e8a869a8fcfa3291862bdc Parents: c685d04 Author: Gordon Sim <[email protected]> Authored: Fri Apr 27 22:51:29 2018 +0100 Committer: Gordon Sim <[email protected]> Committed: Tue May 1 14:11:56 2018 +0100 ---------------------------------------------------------------------- src/router_core/connections.c | 20 +++++ src/router_core/forwarder.c | 4 +- src/router_core/router_core_private.h | 1 + tests/system_tests_link_routes.py | 129 +++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 9608d79..0e655d3 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -840,6 +840,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li // Free the link's name and terminus_addr // free(link->name); + free(link->disambiguated_name); free(link->terminus_addr); free(link->ingress_histogram); link->name = 0; @@ -867,6 +868,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->link_direction = dir; link->capacity = conn->link_capacity; link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8); + link->disambiguated_name = 0; link->terminus_addr = 0; qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8); link->admin_enabled = true; @@ -1346,6 +1348,18 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo qdr_connection_free(conn); } +static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original) +{ + size_t olen = strlen(original); + size_t clen = strlen(conn->container); + char *name = (char*) malloc(olen + clen + 2); + memset(name, 0, olen + clen + 2); + strcat(name, original); + name[olen] = '@'; + strcat(name + olen + 1, conn->container); + return name; +} + static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) @@ -1429,6 +1443,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_terminus_free(source); qdr_terminus_free(target); } else { + if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) { + link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name); + } success = qdr_forward_attach_CT(core, addr, link, source, target); if (!success) { @@ -1522,6 +1539,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_terminus_free(source); qdr_terminus_free(target); } else { + if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) { + link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name); + } bool success = qdr_forward_attach_CT(core, addr, link, source, target); if (!success) { qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index b8f2291..7ab8a46 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -675,7 +675,6 @@ int qdr_forward_balanced_CT(qdr_core_t *core, return 0; } - bool qdr_forward_link_balanced_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *in_link, @@ -750,8 +749,7 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core, out_link->oper_status = QDR_LINK_OPER_DOWN; - out_link->name = (char*) malloc(strlen(in_link->name) + 1); - strcpy(out_link->name, in_link->name); + out_link->name = strdup(in_link->disambiguated_name ? in_link->disambiguated_name : in_link->name); out_link->connected_link = in_link; in_link->connected_link = out_link; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 0f22df5..b3c9798 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -384,6 +384,7 @@ struct qdr_link_t { qd_direction_t link_direction; qdr_link_work_list_t work_list; char *name; + char *disambiguated_name; char *terminus_addr; int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle qdr_address_t *owning_addr; ///< [ref] Address record that owns this link http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 6134c0f..87fb448 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -713,6 +713,25 @@ class LinkRouteTest(TestCase): test.run() self.assertEqual(None, test.error) + def _multi_link_send_receive(self, send_host, receive_host, name): + senders = ["%s/%s" % (send_host, address) for address in ["org.apache.foo", "org.apache.bar"]] + receivers = ["%s/%s" % (receive_host, address) for address in ["org.apache.foo", "org.apache.bar"]] + test = MultiLinkSendReceive(senders, receivers, name) + test.run() + self.assertEqual(None, test.error) + + def test_same_name_route_receivers_through_B(self): + self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[1].addresses[0], "recv_through_B") + + def test_same_name_route_senders_through_B(self): + self._multi_link_send_receive(self.routers[1].addresses[0], self.routers[0].addresses[0], "send_through_B") + + def test_same_name_route_receivers_through_C(self): + self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[2].addresses[0], "recv_through_C") + + def test_same_name_route_senders_through_C(self): + self._multi_link_send_receive(self.routers[2].addresses[0], self.routers[0].addresses[0], "send_through_C") + class Timeout(object): def __init__(self, parent): @@ -1194,6 +1213,116 @@ class TerminusAddrTest(MessagingHandler): def run(self): Container(self).run() +class MultiLinkSendReceive(MessagingHandler): + class SendState(object): + def __init__(self, link): + self.link = link + self.sent = False + self.accepted = False + self.done = False + self.closed = False + + def send(self, subject, body): + if not self.sent: + self.link.send(Message(subject=subject,body=body,address=self.link.target.address)) + self.sent = True + + def on_accepted(self): + self.accepted = True + self.done = True + + def close(self): + if not self.closed: + self.closed = True + self.link.close() + self.link.connection.close() + + class RecvState(object): + def __init__(self, link): + self.link = link + self.received = False + self.done = False + self.closed = False + + def on_message(self): + self.received = True + self.done = True + + def close(self): + if not self.closed: + self.closed = True + self.link.close() + self.link.connection.close() + + def __init__(self, send_urls, recv_urls, name, message=None): + super(MultiLinkSendReceive, self).__init__() + self.send_urls = send_urls + self.recv_urls = recv_urls + self.senders = {} + self.receivers = {} + self.message = message or "SendReceiveTest" + self.sent = False + self.error = None + self.name = name + + def close(self): + for sender in self.senders.values(): + sender.close() + for receiver in self.receivers.values(): + receiver.close() + + def all_done(self): + for sender in self.senders.values(): + if not sender.done: + return False + for receiver in self.receivers.values(): + if not receiver.done: + return False + return True + + def timeout(self): + self.error = "Timeout Expired" + self.close() + + def stop_if_all_done(self): + if self.all_done(): + self.stop() + + def stop(self): + self.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + event.container.container_id = None + for u in self.send_urls: + s = self.SendState(event.container.create_sender(u, name=self.name)) + self.senders[s.link.connection.container] = s + for u in self.recv_urls: + r = self.RecvState(event.container.create_receiver(u, name=self.name)) + self.receivers[r.link.connection.container] = r + + def on_link_remote_open(self, event): + print("link opened: %s %s %s" % (event.link.source.address, event.link.target.address, event.connection.container)) + + def on_sendable(self, event): + print("sendable: %s %s" % (event.link.target.address, event.connection.container)) + self.senders[event.connection.container].send(self.name, self.message) + + def on_message(self, event): + print("message received: %s %s" % (event.link.source.address, event.connection.container)) + if self.message != event.message.body: + error = "Incorrect message. Got %s, expected %s" % (event.message.body, self.message.body) + self.receivers[event.connection.container].on_message() + self.stop_if_all_done() + + def on_accepted(self, event): + print("accepted: %s %s" % (event.link.target.address, event.connection.container)) + self.senders[event.connection.container].on_accepted() + self.stop_if_all_done() + + def run(self): + Container(self).run() if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
