Repository: qpid-dispatch Updated Branches: refs/heads/master 7978579c8 -> e43f475f4
DISPATCH-237 - Added unit test to make sure that the delivery tags are preserved during link routing Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e43f475f Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e43f475f Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e43f475f Branch: refs/heads/master Commit: e43f475f4b2cceea9b43a752c53e7a7937534f27 Parents: 7978579 Author: Ganesh Murthy <[email protected]> Authored: Fri Apr 15 10:04:52 2016 -0400 Committer: Ganesh Murthy <[email protected]> Committed: Fri Apr 15 10:04:52 2016 -0400 ---------------------------------------------------------------------- tests/system_tests_link_routes.py | 127 +++++++++++++++++++++++++++++---- 1 file changed, 112 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e43f475f/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index d7f673a..e0f9e1c 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -23,8 +23,9 @@ from subprocess import PIPE, STDOUT from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process -from proton import Message -from proton.reactor import AtMostOnce +from proton import Message, Endpoint +from proton.handlers import MessagingHandler +from proton.reactor import AtMostOnce, Container from proton.utils import BlockingConnection, LinkDetached from qpid_dispatch.management.client import Node @@ -75,6 +76,7 @@ class LinkRoutePatternTest(TestCase): a_listener_port = cls.tester.get_port() b_listener_port = cls.tester.get_port() c_listener_port = cls.tester.get_port() + test_tag_listener_port = cls.tester.get_port() router('A', [ @@ -83,12 +85,16 @@ class LinkRoutePatternTest(TestCase): router('B', [ ('listener', {'role': 'normal', 'addr': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ('listener', {'name': 'test-tag', 'role': 'route-container', 'addr': '0.0.0.0', 'port': test_tag_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + # This is an on-demand connection made from QDR.B's ephemeral port to a_listener_port - ('connector', {'name': 'broker', 'role': 'on-demand', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ('connector', {'name': 'broker', 'role': 'route-container', 'addr': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), # Only inter router communication must happen on 'inter-router' connectors. This connector makes # a connection from the router B's ephemeral port to c_listener_port ('connector', {'role': 'inter-router', 'addr': '0.0.0.0', 'port': c_listener_port}), - ('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'}) + ('linkRoutePattern', {'prefix': 'org.apache', 'connector': 'broker'}), + ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'in'}), + ('linkRoute', {'prefix': 'pulp.task', 'connection': 'test-tag', 'dir': 'out'}) ] ) router('C', @@ -98,7 +104,9 @@ class LinkRoutePatternTest(TestCase): ('listener', {'addr': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}), # Note here that the linkRoutePattern is set to org.apache. which makes it backward compatible. # The dot(.) at the end is ignored by the address hashing scheme. - ('linkRoutePattern', {'prefix': 'org.apache.'}) + ('linkRoutePattern', {'prefix': 'org.apache.'}), + ('linkRoute', {'prefix': 'pulp.task', 'dir': 'in'}), + ('linkRoute', {'prefix': 'pulp.task', 'dir': 'out'}) ] ) @@ -159,8 +167,8 @@ class LinkRoutePatternTest(TestCase): """ out = self.run_qdstat_linkRoute(self.routers[1].addresses[0]) out_list = out.split() - self.assertEqual(out_list.count('in'), 1) - self.assertEqual(out_list.count('out'), 1) + self.assertEqual(out_list.count('in'), 2) + self.assertEqual(out_list.count('out'), 2) def test_ccc_qdstat_link_routes_routerC(self): """ @@ -170,8 +178,8 @@ class LinkRoutePatternTest(TestCase): out = self.run_qdstat_linkRoute(self.routers[2].addresses[1]) out_list = out.split() - self.assertEqual(out_list.count('in'), 1) - self.assertEqual(out_list.count('out'), 1) + self.assertEqual(out_list.count('in'), 2) + self.assertEqual(out_list.count('out'), 2) def test_ddd_partial_link_route_match(self): """ @@ -223,7 +231,6 @@ class LinkRoutePatternTest(TestCase): # self.assertEqual(4, len() self.assertEquals(4, len(local_node.query(type='org.apache.qpid.dispatch.router.link').results)) - #blocking_receiver.close() blocking_connection.close() def test_partial_link_route_match_1(self): @@ -263,7 +270,6 @@ class LinkRoutePatternTest(TestCase): name='M0org.apache.dev').deliveriesIngress, "deliveriesIngress is wrong") - #blocking_receiver.close() blocking_connection.close() def test_full_link_route_match(self): @@ -307,7 +313,6 @@ class LinkRoutePatternTest(TestCase): name='M0org.apache').deliveriesIngress, "deliveriesIngress is wrong") - #blocking_receiver.close() blocking_connection.close() def test_full_link_route_match_1(self): @@ -327,6 +332,7 @@ class LinkRoutePatternTest(TestCase): # Sender to to org.apache blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options) + msg = Message(body=hello_world_4) # Send a message blocking_sender.send(msg) @@ -347,7 +353,6 @@ class LinkRoutePatternTest(TestCase): name='M0org.apache').deliveriesIngress, "deliveriesIngress is wrong") - #blocking_receiver.close() blocking_connection.close() def test_zzz_qdmanage_delete_link_route(self): @@ -361,6 +366,8 @@ class LinkRoutePatternTest(TestCase): identity_1 = result_list[0][1] identity_2 = result_list[1][1] + identity_3 = result_list[2][1] + identity_4 = result_list[3][1] cmd = 'DELETE --type=linkRoute --identity=' + identity_1 self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) @@ -368,16 +375,21 @@ class LinkRoutePatternTest(TestCase): cmd = 'DELETE --type=linkRoute --identity=' + identity_2 self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) + cmd = 'DELETE --type=linkRoute --identity=' + identity_3 + self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) + + cmd = 'DELETE --type=linkRoute --identity=' + identity_4 + self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) + cmd = 'QUERY --type=linkRoute' out = self.run_qdmanage(cmd=cmd, address=self.routers[1].addresses[0]) self.assertEquals(out.rstrip(), '[]') - sleep(1) - # linkRoutes now gone on QDR.B but remember that it still exist on QDR.C # We will now try to create a receiver on address org.apache.dev on QDR.C. # Since the linkRoute on QDR.B is gone, QDR.C # will not allow a receiver to be created since there is no route to destination. + # Connects to listener #2 on QDR.C addr = self.routers[2].addresses[1] @@ -398,6 +410,8 @@ class LinkRoutePatternTest(TestCase): identity_1 = result_list[0][1] identity_2 = result_list[1][1] + identity_3 = result_list[2][1] + identity_4 = result_list[3][1] cmd = 'DELETE --type=linkRoute --identity=' + identity_1 self.run_qdmanage(cmd=cmd, address=addr) @@ -405,6 +419,12 @@ class LinkRoutePatternTest(TestCase): cmd = 'DELETE --type=linkRoute --identity=' + identity_2 self.run_qdmanage(cmd=cmd, address=addr) + cmd = 'DELETE --type=linkRoute --identity=' + identity_3 + self.run_qdmanage(cmd=cmd, address=addr) + + cmd = 'DELETE --type=linkRoute --identity=' + identity_4 + self.run_qdmanage(cmd=cmd, address=addr) + cmd = 'QUERY --type=linkRoute' out = self.run_qdmanage(cmd=cmd, address=addr) self.assertEquals(out.rstrip(), '[]') @@ -436,5 +456,82 @@ class LinkRoutePatternTest(TestCase): name='M0org.apache.dev').deliveriesEgress, "deliveriesEgress is wrong") + def test_yyy_delivery_tag(self): + """ + Tests that the router carries over the delivery tag on a link routed delivery + """ + listening_address = self.routers[1].addresses[1] + sender_address = self.routers[2].addresses[1] + qdstat_address = self.routers[2].addresses[1] + test = DeliveryTagsTest(sender_address, listening_address, qdstat_address) + test.run() + self.assertTrue(test.message_received) + self.assertTrue(test.delivery_tag_verified) + +class DeliveryTagsTest(MessagingHandler): + def __init__(self, sender_address, listening_address, qdstat_address): + super(DeliveryTagsTest, self).__init__() + self.sender_address = sender_address + self.listening_address = listening_address + self.sender = None + self.message_received = False + self.receiver_connection = None + self.sender_connection = None + self.qdstat_address = qdstat_address + self.id = '1235' + self.times = 1 + self.delivery_tag_verified = False + # The delivery tag we are going to send in the transfer frame + # We will later make sure that the same delivery tag shows up on the receiving end in the link routed case. + self.delivery_tag = '92319' + + def on_start(self, event): + self.receiver_connection = event.container.connect(self.listening_address) + + def on_connection_remote_open(self, event): + if event.connection == self.receiver_connection: + continue_loop = True + # Dont open the sender connection unless we can make sure that there is a remote receiver ready to + # accept the message. + # If there is no remote receiver, the router will throw a 'No route to destination' error when + # creating sender connection. + # The following loops introduces a wait before creating the sender connection. It gives time to the + # router so that the address Dpulp.task can show up on the remoteCount + i = 0 + while continue_loop: + if i > 100: # If we have run the read command for more than hundred times and we still do not have + # the remoteCount set to 1, there is a problem, just exit out of the function instead + # of looping to infinity. + self.receiver_connection.close() + return + local_node = Node.connect(self.qdstat_address, timeout=TIMEOUT) + out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount + if out == 1: + continue_loop = False + i+=1 + + self.sender_connection = event.container.connect(self.sender_address) + self.sender = event.container.create_sender(self.sender_connection, "pulp.task", options=AtMostOnce()) + + def on_sendable(self, event): + if self.times == 1: + msg = Message(body="Hello World") + self.sender.send(msg, tag=self.delivery_tag) + self.sender_connection.close() + self.times +=1 + + def on_message(self, event): + if "Hello World" == event.message.body: + self.message_received = True + + # If the tag on the delivery is the same as the tag we sent with the initial transfer, it means + # that the router has propagated the delivery tag successfully because of link routing. + if self.delivery_tag == event.delivery.tag: + self.delivery_tag_verified = True + self.receiver_connection.close() + + def run(self): + Container(self).run() + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
