Repository: qpid-dispatch Updated Branches: refs/heads/master 9ee07b1d6 -> adc3ca6a4
DISPATCH-357 - Added terminus_addr field on link to display the terminus addr of a link route Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/adc3ca6a Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/adc3ca6a Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/adc3ca6a Branch: refs/heads/master Commit: adc3ca6a4b98572a8f18f99fe75349ef0b84fb71 Parents: 9ee07b1 Author: Ganesh Murthy <[email protected]> Authored: Tue Mar 21 13:16:59 2017 -0400 Committer: Ganesh Murthy <[email protected]> Committed: Wed Mar 22 08:56:13 2017 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 5 +- src/router_core/agent_link.c | 6 +- src/router_core/connections.c | 16 +++- src/router_core/forwarder.c | 2 + src/router_core/router_core_private.h | 1 + src/router_node.c | 12 ++- tests/system_tests_link_routes.py | 129 +++++++++++++++++++++++++++++ 7 files changed, 165 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 0d66d1c..2f749b7 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -481,13 +481,16 @@ const char *qdr_link_name(const qdr_link_t *link); * @param dir Direction of the new link, incoming or outgoing * @param source Source terminus of the attach * @param target Target terminus of the attach + * @param name - name of the link + * @param terminus_addr - terminus address if any * @return A pointer to a new qdr_link_t object to track the link */ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, qdr_terminus_t *source, qdr_terminus_t *target, - const char *name); + const char *name, + const char *terminus_addr); /** * qdr_link_second_attach http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/agent_link.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index fca63c0..692931f 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -119,7 +119,11 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li break; case QDR_LINK_OWNING_ADDR: - if (link->owning_addr) + if(link->terminus_addr) + qd_compose_insert_string(body, link->terminus_addr); + else if (link->connected_link && link->connected_link->terminus_addr) + qd_compose_insert_string(body, link->connected_link->terminus_addr); + else if (link->owning_addr) qd_compose_insert_string(body, address_key(link->owning_addr)); else qd_compose_insert_null(body); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index abc6a4c..5d80bf1 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -377,7 +377,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, qdr_terminus_t *source, qdr_terminus_t *target, - const char *name) + const char *name, + const char *terminus_addr) { qdr_action_t *action = qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach"); qdr_link_t *link = new_qdr_link_t(); @@ -388,6 +389,15 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, link->identity = qdr_identifier(conn->core); link->conn = conn; link->name = (char*) malloc(strlen(name) + 1); + + if (terminus_addr) { + char *term_addr = malloc((strlen(terminus_addr) + 3) * sizeof(char)); + term_addr[0] = '\0'; + strcat(term_addr, "M0"); + strcat(term_addr, terminus_addr); + link->terminus_addr = term_addr; + } + strcpy(link->name, name); link->link_direction = dir; link->capacity = conn->link_capacity; @@ -729,9 +739,10 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li sys_mutex_unlock(conn->work_lock); // - // Free the link's name + // Free the link's name and terminus_addr // free(link->name); + free(link->terminus_addr); link->name = 0; } @@ -757,6 +768,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->link_direction = dir; link->capacity = conn->link_capacity; link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8); + link->terminus_addr = 0; qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8); link->admin_enabled = true; link->oper_status = QDR_LINK_OPER_DOWN; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index b37b4b8..407404f 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -704,6 +704,8 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core, out_link->link_type = QD_LINK_ENDPOINT; out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING; out_link->admin_enabled = true; + out_link->terminus_addr = 0; + out_link->oper_status = QDR_LINK_OPER_DOWN; out_link->name = (char*) malloc(strlen(in_link->name) + 1); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 414647b..c3c81c5 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -362,6 +362,7 @@ struct qdr_link_t { qd_direction_t link_direction; qdr_link_work_list_t work_list; char *name; + char *terminus_addr; int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle qdr_address_t *owning_addr; ///< [ref] Address record that owns this link qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 5868570..e16ae33 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -509,10 +509,14 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link) { qd_connection_t *conn = qd_link_connection(link); qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); + + char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_target((pn_link_t *)qd_link_pn(link))); + qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING, qdr_terminus(qd_link_remote_source(link)), qdr_terminus(qd_link_remote_target(link)), - pn_link_name(qd_link_pn(link))); + pn_link_name(qd_link_pn(link)), + terminus_addr); qdr_link_set_context(qdr_link, link); qd_link_set_context(link, qdr_link); @@ -527,10 +531,14 @@ static int AMQP_outgoing_link_handler(void* context, qd_link_t *link) { qd_connection_t *conn = qd_link_connection(link); qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); + + char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_source((pn_link_t *)qd_link_pn(link))); + qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING, qdr_terminus(qd_link_remote_source(link)), qdr_terminus(qd_link_remote_target(link)), - pn_link_name(qd_link_pn(link))); + pn_link_name(qd_link_pn(link)), + terminus_addr); qdr_link_set_context(qdr_link, link); qd_link_set_context(link, qdr_link); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 5648920..7aed283 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -547,6 +547,24 @@ class LinkRouteTest(TestCase): drain_support.run() self.assertEqual(None, drain_support.error) + def test_link_route_terminus_address(self): + # The receiver is attaching to router B to a listener that has link route for address 'pulp.task' setup. + listening_address = self.routers[1].addresses[1] + # Run the query on a normal port + query_address_listening = self.routers[1].addresses[0] + + # Sender is attaching to router C + sender_address = self.routers[2].addresses[0] + query_address_sending = self.routers[2].addresses[0] + + test = TerminusAddrTest(sender_address, listening_address, query_address_sending, query_address_listening) + test.run() + + self.assertTrue(test.in_receiver_found) + self.assertTrue(test.out_receiver_found) + self.assertTrue(test.in_sender_found) + self.assertTrue(test.out_sender_found) + def test_dynamic_source(self): test = DynamicSourceTest(self.routers[1].addresses[0], self.routers[1].addresses[1]) test.run() @@ -937,6 +955,117 @@ class DetachMixedCloseTest(MessagingHandler): def run(self): Container(self).run() +class TerminusAddrTest(MessagingHandler): + """ + This tests makes sure that the link route address is visible in the output of qdstat -l command. + + Sets up a sender on address pulp.task.terminusTestSender and a receiver on pulp.task.terminusTestReceiver. + Connects to the router to which the sender is attached and makes sure that the pulp.task.terminusTestSender address + shows up with an 'in' and 'out' + Similarly connects to the router to which the receiver is attached and makes sure that the + pulp.task.terminusTestReceiver address shows up with an 'in' and 'out' + + """ + def __init__(self, sender_address, listening_address, query_address_sending, query_address_listening): + super(TerminusAddrTest, self).__init__() + self.sender_address = sender_address + self.listening_address = listening_address + self.sender = None + self.receiver = None + self.message_received = False + self.receiver_connection = None + self.sender_connection = None + # We will run a query on the same router where the sender is attached + self.query_address_sending = query_address_sending + + # We will run a query on the same router where the receiver is attached + self.query_address_listening = query_address_listening + self.count = 0 + + self.in_receiver_found = False + self.out_receiver_found = False + self.in_sender_found = False + self.out_sender_found = False + + self.receiver_link_opened = False + self.sender_link_opened = False + + def on_start(self, event): + self.receiver_connection = event.container.connect(self.listening_address) + + def on_connection_remote_open(self, event): + if event.connection == self.receiver_connection: + continue_loop = True + # The following loops introduces a wait. It gives time to the + # router so that the address Dpulp.task can show up on the remoteCount + i = 0 + while continue_loop: + if i > 100: # If we have run the read command for more than hundred times and we still do not have + # the remoteCount set to 1, there is a problem, just exit out of the function instead + # of looping to infinity. + self.receiver_connection.close() + return + local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT) + out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount + if out == 1: + continue_loop = False + i += 1 + sleep(0.25) + + self.sender_connection = event.container.connect(self.sender_address) + + # Notice here that the receiver and sender are listening on different addresses. Receiver on + # pulp.task.terminusTestReceiver and the sender on pulp.task.terminusTestSender + self.receiver = event.container.create_receiver(self.receiver_connection, "pulp.task.terminusTestReceiver") + self.sender = event.container.create_sender(self.sender_connection, "pulp.task.terminusTestSender", options=AtMostOnce()) + + def on_link_opened(self, event): + if event.receiver == self.receiver: + self.receiver_link_opened = True + + local_node = Node.connect(self.query_address_listening, timeout=TIMEOUT) + out = local_node.query(type='org.apache.qpid.dispatch.router.link') + + link_dir_index = out.attribute_names.index("linkDir") + owning_addr_index = out.attribute_names.index("owningAddr") + + # Make sure that the owningAddr M0pulp.task.terminusTestReceiver shows up on both in and out. + # The 'out' link is on address M0pulp.task.terminusTestReceiver outgoing from the router B to the receiver + # The 'in' link is on address M0pulp.task.terminusTestReceiver incoming from router C to router B + for result in out.results: + if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver': + self.in_receiver_found = True + if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver': + self.out_receiver_found = True + + if event.sender == self.sender: + self.sender_link_opened = True + + local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT) + out = local_node.query(type='org.apache.qpid.dispatch.router.link') + + link_dir_index = out.attribute_names.index("linkDir") + owning_addr_index = out.attribute_names.index("owningAddr") + + # Make sure that the owningAddr M0pulp.task.terminusTestSender shows up on both in and out. + # The 'in' link is on address M0pulp.task.terminusTestSender incoming from sender to router + # The 'out' link is on address M0pulp.task.terminusTestSender outgoing from router C to router B + for result in out.results: + if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender': + self.in_sender_found = True + if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender': + self.out_sender_found = True + + # Shutdown the connections only if the on_link_opened has been called for sender and receiver links. + if self.sender_link_opened and self.receiver_link_opened: + self.sender.close() + self.receiver.close() + self.sender_connection.close() + self.receiver_connection.close() + + def run(self): + Container(self).run() + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
