Repository: qpid-dispatch Updated Branches: refs/heads/master 121e4065c -> c895d1c44
DISPATCH-209 : parallel waypoint test Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c895d1c4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c895d1c4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c895d1c4 Branch: refs/heads/master Commit: c895d1c440e9ad24d1ae4415f27e5f26ee26357e Parents: 121e406 Author: mick goulish <[email protected]> Authored: Thu Oct 19 14:16:21 2017 -0400 Committer: mick goulish <[email protected]> Committed: Thu Oct 19 14:16:21 2017 -0400 ---------------------------------------------------------------------- tests/system_tests_distribution.py | 883 +++++++++++++++++++++++++++----- 1 file changed, 745 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c895d1c4/tests/system_tests_distribution.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_distribution.py b/tests/system_tests_distribution.py index 8084344..0156afc 100644 --- a/tests/system_tests_distribution.py +++ b/tests/system_tests_distribution.py @@ -121,8 +121,21 @@ class DistributionTests ( TestCase ): cls.linkroute_prefix = "0.0.0.0/linkroute" - cls.waypoint_prefix_1 = "0.0.0.0/queue_1" - cls.waypoint_prefix_2 = "0.0.0.0/queue_2" + cls.waypoint_prefix_1 = "0.0.0.0/process_1" + cls.waypoint_prefix_2 = "0.0.0.0/process_2" + cls.waypoint_prefix_3 = "0.0.0.0/process_3" + + #----------------------------------------------------- + # Container IDs are what associate route containers + # with links -- for the linkroute tests and the + # waypoint tests. + #----------------------------------------------------- + cls.container_ids = [ 'ethics_gradient', + 'honest_mistake', + 'frank_exchange_of_views', + 'zero_gravitas', + 'yawning_angel' + ] #----------------------------------------------------- # Here are some chunks of configuration that will be @@ -134,87 +147,160 @@ class DistributionTests ( TestCase ): ( 'linkRoute', { 'prefix': cls.linkroute_prefix, 'dir': 'in', - 'containerId': 'LinkRouteTest' + 'containerId': cls.container_ids[0] } ), ( 'linkRoute', { 'prefix': cls.linkroute_prefix, 'dir': 'out', - 'containerId': 'LinkRouteTest' + 'containerId': cls.container_ids[0] } ) ] - waypoint_configuration_1 = \ + single_waypoint_configuration = \ [ - ( 'address', - { 'prefix': cls.waypoint_prefix_1, + ( 'address', + { 'prefix': cls.waypoint_prefix_1, 'waypoint': 'yes' } ), - ( 'autoLink', - { 'addr': cls.waypoint_prefix_1 + '.waypoint', - 'containerId': 'WaypointTest', + ( 'autoLink', + { 'addr': cls.waypoint_prefix_1 + '.waypoint', + 'containerId': cls.container_ids[1], 'dir': 'in' } ), - ( 'autoLink', + ( 'autoLink', { 'addr': cls.waypoint_prefix_1 + '.waypoint', - 'containerId': 'WaypointTest', + 'containerId': cls.container_ids[1], 'dir': 'out' } ) ] - waypoint_configuration_2 = \ + #------------------------------------------------------------------- + # The phase-number is used by the router as an addition + # to the address for the link. To chain these two waypoints + # together in a serial fashion, we explicitly declare their + # phase numbers: + # Waypoint 1 + # out of router to process: phase 0 + # back from process to router: phase 1 + # Waypoint 2 + # out of router to process: phase 1 + # back from process to router: phase 2 + # + # Because of those two "phase 1" markings, messages coming back + # into the router from Waypoint 1 get routed back outbound to + # Waypoint 2. + # + # Because the address configuration specifies that phase 2 is + # the egress phase, messages coming into the router from that + # autolink are finally routed to the client receiver. + #------------------------------------------------------------------- + serial_waypoint_configuration = \ [ - ( 'address', - { 'prefix': cls.waypoint_prefix_2, + ( 'address', + { 'prefix': cls.waypoint_prefix_2, 'ingressPhase' : 0, # into the waypoint-process 'egressPhase' : 2, # out of the waypoint process } ), - ( 'autoLink', - { 'addr': cls.waypoint_prefix_2 + '.waypoint', - 'phase' : 0, - 'containerId': 'WaypointTest2', + + # Waypoint 1 configuration -------------------------- + ( 'autoLink', + { 'addr': cls.waypoint_prefix_2 + '.waypoint', + 'phase' : 0, + 'containerId': cls.container_ids[2], 'dir': 'out' # out-of-router } ), - ( 'autoLink', + ( 'autoLink', { 'addr': cls.waypoint_prefix_2 + '.waypoint', - 'phase' : 1, - 'containerId': 'WaypointTest2', + 'phase' : 1, + 'containerId': cls.container_ids[2], 'dir': 'in' # into-router } ), - ( 'autoLink', + + # Waypoint 2 configuration -------------------------- + ( 'autoLink', { 'addr': cls.waypoint_prefix_2 + '.waypoint', 'phase' : 1, # out-of-router - 'containerId': 'WaypointTest2', + 'containerId': cls.container_ids[2], 'dir': 'out' } ), - ( 'autoLink', + ( 'autoLink', { 'addr': cls.waypoint_prefix_2 + '.waypoint', 'phase' : 2, # into-router - 'containerId': 'WaypointTest2', + 'containerId': cls.container_ids[2], + 'dir': 'in' + } + ) + ] + + + #------------------------------------------------------------- + # In a parallel waypoint configuration, we use the default + # phase numbers: toward the waypoint is phase 0, back from + # the waypoint into the router is phase 1. + # The address configuration, by saying 'waypoint: yes' is + # shorthand for "ingress is phase 0, egress is phase 1" + # By making two identical waypoints, they will be used in + # parallel rather than serial. + #------------------------------------------------------------- + parallel_waypoint_configuration = \ + [ + ( 'address', + { 'prefix': cls.waypoint_prefix_3, + 'waypoint': 'yes' + } + ), + + # Waypoint 1 configuration ---------------------- + ( 'autoLink', + { 'addr': cls.waypoint_prefix_3 + '.waypoint', + 'containerId': cls.container_ids[3], + 'dir': 'in' + } + ), + ( 'autoLink', + { 'addr': cls.waypoint_prefix_3 + '.waypoint', + 'containerId': cls.container_ids[3], + 'dir': 'out' + } + ), + + # Waypoint 2 configuration ---------------------- + ( 'autoLink', + { 'addr': cls.waypoint_prefix_3 + '.waypoint', + 'containerId': cls.container_ids[3], 'dir': 'in' } + ), + ( 'autoLink', + { 'addr': cls.waypoint_prefix_3 + '.waypoint', + 'containerId': cls.container_ids[3], + 'dir': 'out' + } ) ] + def router(name, more_config): config = [ ('router', {'mode': 'interior', 'id': name}), ('address', {'prefix': 'closest', 'distribution': 'closest'}), ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) - ] \ - + linkroute_configuration \ - + waypoint_configuration_1 \ - + waypoint_configuration_2 \ + ] \ + + linkroute_configuration \ + + single_waypoint_configuration \ + + serial_waypoint_configuration \ + + parallel_waypoint_configuration \ + more_config config = Qdrouterd.Config(config) @@ -240,7 +326,7 @@ class DistributionTests ( TestCase ): # Note: in the above picture, an arrow from, i.e., B to A # means that B initiates the connection from itself to A. # So if you see "B ----> A" in the picture, you should also - # see a connector block in the configuration of B that + # see a connector block in the configuration of B that # connects to an inter-router port on A. # @@ -416,46 +502,97 @@ class DistributionTests ( TestCase ): cls.C_addr = router_C.addresses[0] cls.D_addr = router_D.addresses[0] + # 1 means skip that test. + cls.skip = { 'test_01' : 0, + 'test_02' : 0, + 'test_03' : 0, + 'test_04' : 0, + 'test_05' : 0, + 'test_06' : 0, + 'test_07' : 0, + 'test_08' : 0, + 'test_09' : 0, + 'test_10' : 0, + 'test_11' : 0, + 'test_12' : 0, + 'test_13' : 0, + 'test_14' : 0, + 'test_15' : 0, + 'test_16' : 0, + 'test_17' : 0, + 'test_18' : 0, + 'test_19' : 0, + 'test_20' : 0, + 'test_21' : 0, + 'test_22' : 0, + 'test_23' : 0, + 'test_24' : 0, + 'test_25' : 0 + } + def test_01_targeted_sender_AC ( self ): - test = TargetedSenderTest ( self.A_addr, self.C_addr, "closest/01" ) + name = 'test_01' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = TargetedSenderTest ( name, self.A_addr, self.C_addr, "closest/01" ) test.run() self.assertEqual ( None, test.error ) def test_02_targeted_sender_DC ( self ): - test = TargetedSenderTest ( self.D_addr, self.C_addr, "closest/02" ) + name = 'test_02' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = TargetedSenderTest ( name, self.D_addr, self.C_addr, "closest/02" ) test.run() self.assertEqual ( None, test.error ) def test_03_anonymous_sender_AC ( self ): - test = AnonymousSenderTest ( self.A_addr, self.C_addr ) + name = 'test_03' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = AnonymousSenderTest ( name, self.A_addr, self.C_addr ) test.run() self.assertEqual ( None, test.error ) def test_04_anonymous_sender_DC ( self ): - test = AnonymousSenderTest ( self.D_addr, self.C_addr ) + name = 'test_04' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = AnonymousSenderTest ( name, self.D_addr, self.C_addr ) test.run() self.assertEqual ( None, test.error ) def test_05_dynamic_reply_to_AC ( self ): - test = DynamicReplyTo ( self.A_addr, self.C_addr ) + name = 'test_05' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = DynamicReplyTo ( name, self.A_addr, self.C_addr ) test.run() self.assertEqual ( None, test.error ) def test_06_dynamic_reply_to_DC ( self ): - test = DynamicReplyTo ( self.D_addr, self.C_addr ) + name = 'test_06' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = DynamicReplyTo ( name, self.D_addr, self.C_addr ) test.run() self.assertEqual ( None, test.error ) def test_07_linkroute ( self ): - test = LinkAttachRouting ( self.C_addr, + name = 'test_07' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = LinkAttachRouting ( name, + self.container_ids[0], + self.C_addr, self.A_route_container_addr, self.linkroute_prefix, "addr_07" @@ -465,7 +602,12 @@ class DistributionTests ( TestCase ): def test_08_linkroute_check_only ( self ): - test = LinkAttachRoutingCheckOnly ( self.C_addr, + name = 'test_08' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = LinkAttachRoutingCheckOnly ( name, + self.container_ids[0], + self.C_addr, self.A_route_container_addr, self.linkroute_prefix, "addr_08" @@ -475,7 +617,11 @@ class DistributionTests ( TestCase ): def test_09_closest_linear ( self ): - test = ClosestTest ( self.A_addr, + name = 'test_09' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = ClosestTest ( name, + self.A_addr, self.B_addr, self.C_addr, "addr_09" @@ -485,7 +631,11 @@ class DistributionTests ( TestCase ): def test_10_closest_mesh ( self ): - test = ClosestTest ( self.A_addr, + name = 'test_10' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = ClosestTest ( name, + self.A_addr, self.B_addr, self.D_addr, "addr_10" @@ -562,6 +712,9 @@ class DistributionTests ( TestCase ): # def test_11_balanced_linear ( self ): + name = 'test_11' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) # slop is how much the second two values may diverge from # the expected. But they still must sum to total - A. total = 100 @@ -575,7 +728,8 @@ class DistributionTests ( TestCase ): slop = 1 omit_middle_receiver = False - test = BalancedTest ( self.A_addr, + test = BalancedTest ( name, + self.A_addr, self.B_addr, self.C_addr, "addr_11", @@ -591,6 +745,9 @@ class DistributionTests ( TestCase ): def test_12_balanced_linear_omit_middle_receiver ( self ): + name = 'test_12' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) # If we omit the middle receiver, then router A will count # up to cost ( A, B ) and the keep counting up a further # cost ( B, C ) before it starts to spill over. @@ -612,7 +769,8 @@ class DistributionTests ( TestCase ): slop = 1 omit_middle_receiver = True - test = BalancedTest ( self.A_addr, + test = BalancedTest ( name, + self.A_addr, self.B_addr, self.C_addr, "addr_12", @@ -684,13 +842,17 @@ class DistributionTests ( TestCase ): # 3. B and D are both with 1 of their expected values. # def test_13_balanced_mesh ( self ): + name = 'test_13' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) total = 100 expected_A = 54 expected_B = 43 expected_D = 3 slop = 1 omit_middle_receiver = False - test = BalancedTest ( self.A_addr, + test = BalancedTest ( name, + self.A_addr, self.B_addr, self.D_addr, "addr_13", @@ -706,7 +868,11 @@ class DistributionTests ( TestCase ): def test_14_multicast_linear ( self ): - test = MulticastTest ( self.A_addr, + name = 'test_14' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = MulticastTest ( name, + self.A_addr, self.B_addr, self.C_addr, "addr_14" @@ -716,7 +882,11 @@ class DistributionTests ( TestCase ): def test_15_multicast_mesh ( self ): - test = MulticastTest ( self.A_addr, + name = 'test_15' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = MulticastTest ( name, + self.A_addr, self.B_addr, self.D_addr, "addr_15" @@ -726,6 +896,9 @@ class DistributionTests ( TestCase ): def test_16_linkroute_linear_all_local ( self ) : + name = 'test_16' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) """ This test should route all senders' link-attaches to the local containers on router A. @@ -796,15 +969,16 @@ class DistributionTests ( TestCase ): } ] - test = RoutingTest ( self.A_addr, # all senders are attached here + test = RoutingTest ( name, + self.container_ids[0], + self.A_addr, # all senders are attached here routers, self.linkroute_prefix, addr_suffix, instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - "Test 16" + n_remote_routers ) test.run ( ) self.assertEqual ( None, test.error ) @@ -812,6 +986,9 @@ class DistributionTests ( TestCase ): def test_17_linkroute_linear_all_B ( self ) : + name = 'test_17' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) """ This test should route all senders' link-attaches to the remote connections on router B. @@ -882,15 +1059,16 @@ class DistributionTests ( TestCase ): } ] - test = RoutingTest ( self.A_addr, # all senders are attached here + test = RoutingTest ( name, + self.container_ids[0], + self.A_addr, # all senders are attached here routers, self.linkroute_prefix, addr_suffix, instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - "Test 17" + n_remote_routers ) test.run ( ) self.assertEqual ( None, test.error ) @@ -898,6 +1076,9 @@ class DistributionTests ( TestCase ): def test_18_linkroute_linear_all_C ( self ) : + name = 'test_18' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) """ This test should route all senders' link-attaches to the remote connections on router C. @@ -968,21 +1149,25 @@ class DistributionTests ( TestCase ): } ] - test = RoutingTest ( self.A_addr, # all senders are attached here + test = RoutingTest ( name, + self.container_ids[0], + self.A_addr, # all senders are attached here routers, self.linkroute_prefix, addr_suffix, instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - "Test 18" + n_remote_routers ) test.run ( ) self.assertEqual ( None, test.error ) def test_19_linkroute_linear_kill ( self ) : + name = 'test_19' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) """ Start out as usual, making four senders and seeing their link-attaches routed to router A (local). But then kill the two route-container @@ -1113,15 +1298,16 @@ class DistributionTests ( TestCase ): } ] - test = RoutingTest ( self.A_addr, # all senders are attached here + test = RoutingTest ( name, + self.container_ids[0], + self.A_addr, # all senders are attached here routers, self.linkroute_prefix, addr_suffix, instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - "Test 19" + n_remote_routers ) test.run ( ) self.assertEqual ( None, test.error ) @@ -1129,6 +1315,9 @@ class DistributionTests ( TestCase ): def test_20_linkroute_mesh_all_local ( self ) : + name = 'test_20' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) """ c c senders ---> A --------- B @@ -1213,21 +1402,25 @@ class DistributionTests ( TestCase ): } ] - test = RoutingTest ( self.A_addr, # all senders are attached here + test = RoutingTest ( name, + self.container_ids[0], + self.A_addr, # all senders are attached here routers, self.linkroute_prefix, addr_suffix, instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - "Test 20" + n_remote_routers ) test.run ( ) self.assertEqual ( None, test.error ) def test_21_linkroute_mesh_nonlocal ( self ) : + name = 'test_21' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) """ c senders ---> A --------- B @@ -1312,15 +1505,16 @@ class DistributionTests ( TestCase ): } ] - test = RoutingTest ( self.A_addr, # all senders are attached here + test = RoutingTest ( name, + self.container_ids[0], + self.A_addr, # all senders are attached here routers, self.linkroute_prefix, addr_suffix, instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - "Test 21" + n_remote_routers ) test.run ( ) self.assertEqual ( None, test.error ) @@ -1329,6 +1523,9 @@ class DistributionTests ( TestCase ): def test_22_linkroute_mesh_kill ( self ) : + name = 'test_22' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) """ c c senders ---> A --------- B @@ -1468,22 +1665,28 @@ class DistributionTests ( TestCase ): } ] - test = RoutingTest ( self.A_addr, # all senders are attached here + test = RoutingTest ( name, + self.container_ids[0], + self.A_addr, # all senders are attached here routers, self.linkroute_prefix, addr_suffix, instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - "Test 22" + n_remote_routers ) test.run ( ) self.assertEqual ( None, test.error ) def test_23_waypoint ( self ) : - test = WaypointTest ( self.A_addr, + name = 'test_23' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = WaypointTest ( name, + self.container_ids[1], + self.A_addr, self.B_addr, self.C_route_container_addr, self.waypoint_prefix_1 + '.waypoint' @@ -1493,7 +1696,12 @@ class DistributionTests ( TestCase ): def test_24_serial_waypoint_test ( self ): - test = SerialWaypointTest ( self.A_addr, + name = 'test_24' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = SerialWaypointTest ( name, + self.container_ids[2], + self.A_addr, self.B_addr, self.C_route_container_addr, self.waypoint_prefix_2 + '.waypoint' @@ -1501,7 +1709,19 @@ class DistributionTests ( TestCase ): test.run() self.assertEqual(None, test.error) - + def test_25_parallel_waypoint_test ( self ): + name = 'test_25' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = ParallelWaypointTest ( name, + self.container_ids[3], + self.A_addr, + self.B_addr, + self.C_route_container_addr, + self.waypoint_prefix_3 + '.waypoint' + ) + test.run() + self.assertEqual(None, test.error) @@ -1517,7 +1737,7 @@ class TargetedSenderTest ( MessagingHandler ): address we want to send to. (As opposed to letting the router pass back an address to us.) """ - def __init__ ( self, send_addr, recv_addr, destination ): + def __init__ ( self, test_name, send_addr, recv_addr, destination ): super(TargetedSenderTest, self).__init__(prefetch=0) self.send_addr = send_addr self.recv_addr = recv_addr @@ -1529,6 +1749,7 @@ class TargetedSenderTest ( MessagingHandler ): self.n_sent = 0 self.n_received = 0 self.n_accepted = 0 + self.test_name = test_name def timeout(self): @@ -1592,7 +1813,7 @@ class AnonymousSenderTest ( MessagingHandler ): information to us when we get the on_link_opened event. """ - def __init__(self, send_addr, recv_addr): + def __init__(self, test_name, send_addr, recv_addr): super(AnonymousSenderTest, self).__init__() self.send_addr = send_addr self.recv_addr = recv_addr @@ -1608,6 +1829,7 @@ class AnonymousSenderTest ( MessagingHandler ): self.n_sent = 0 self.n_received = 0 self.n_accepted = 0 + self.test_name = test_name def timeout ( self ): @@ -1673,7 +1895,7 @@ class DynamicReplyTo(MessagingHandler): the expected number of replies, or with failure if we time out before that happens. """ - def __init__(self, client_addr, server_addr): + def __init__(self, test_name, client_addr, server_addr): super(DynamicReplyTo, self).__init__(prefetch=10) self.client_addr = client_addr self.server_addr = server_addr @@ -1687,6 +1909,7 @@ class DynamicReplyTo(MessagingHandler): self.n_sent = 0 self.received_by_server = 0 self.received_by_client = 0 + self.test_name = test_name def timeout(self): @@ -1752,7 +1975,14 @@ class DynamicReplyTo(MessagingHandler): class LinkAttachRoutingCheckOnly ( MessagingHandler ): """ """ - def __init__ ( self, client_host, linkroute_container_host, linkroute_prefix, addr_suffix ): + def __init__ ( self, + test_name, + container_id, + client_host, + linkroute_container_host, + linkroute_prefix, + addr_suffix + ): super ( LinkAttachRoutingCheckOnly, self ).__init__(prefetch=0) self.client_host = client_host self.linkroute_container_host = linkroute_container_host @@ -1766,6 +1996,8 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ): self.linkroute_check_timer = None self.linkroute_check_receiver = None self.linkroute_check_sender = None + self.test_name = test_name + self.container_id = container_id self.debug = False @@ -1776,8 +2008,7 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ): def timeout ( self ): - self.bail ( "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" % - (self.n_sent, self.n_rcvd, self.n_settled) ) + self.bail ( "Timeout Expired" ) def address_check_timeout(self): @@ -1816,7 +2047,7 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ): if event.receiver == self.linkroute_check_receiver: event.receiver.flow(30) # Because we created the linkroute_check_receiver 'dynamic', when it opens - # it will have its address filled in. That is the address we want our + # it will have its address filled in. That is the address we want our # AddressChecker replies to go to. self.linkroute_checker = AddressChecker ( self.linkroute_check_receiver.remote_source.address ) self.linkroute_check() @@ -1855,7 +2086,7 @@ class LinkAttachRoutingCheckOnly ( MessagingHandler ): def run(self): container = Container(self) - container.container_id = 'LinkRouteTest' + container.container_id = self.container_id container.run() @@ -1867,7 +2098,14 @@ class LinkAttachRouting ( MessagingHandler ): the route container will connect to, and it will receive our messages. The near host is what our sender will attach to. """ - def __init__ ( self, nearside_host, farside_host, linkroute_prefix, addr_suffix ): + def __init__ ( self, + test_name, + container_id, + nearside_host, + farside_host, + linkroute_prefix, + addr_suffix + ): super ( LinkAttachRouting, self ).__init__(prefetch=0) self.nearside_host = nearside_host self.farside_host = farside_host @@ -1883,10 +2121,12 @@ class LinkAttachRouting ( MessagingHandler ): self.linkroute_check_receiver = None self.linkroute_check_sender = None - self.count = 10 - self.n_sent = 0 - self.n_rcvd = 0 - self.n_settled = 0 + self.count = 10 + self.n_sent = 0 + self.n_rcvd = 0 + self.n_settled = 0 + self.test_name = test_name + self.container_id = container_id def timeout ( self ): @@ -2003,11 +2243,13 @@ class LinkAttachRouting ( MessagingHandler ): def run(self): container = Container(self) - container.container_id = 'LinkRouteTest' + container.container_id = self.container_id container.run() + + class ClosestTest ( MessagingHandler ): """ Test whether distance-based message routing works in a @@ -2026,7 +2268,7 @@ class ClosestTest ( MessagingHandler ): router_1, and then 2 receivers each on all 3 routers. """ - def __init__ ( self, router_1, router_2, router_3, addr_suffix ): + def __init__ ( self, test_name, router_1, router_2, router_3, addr_suffix ): super ( ClosestTest, self ).__init__(prefetch=0) self.error = None self.router_1 = router_1 @@ -2053,7 +2295,8 @@ class ClosestTest ( MessagingHandler ): self.addr_check_timer = None self.addr_check_receiver = None self.addr_check_sender = None - self.bailed = False + self.bailed = False + self.test_name = test_name def timeout ( self ): self.bail ( "Timeout Expired " ) @@ -2224,7 +2467,19 @@ class BalancedTest ( MessagingHandler ): ( Slop can happen in some topologies when you can't tell whether spillover will happen first to node 2, or to node 3. """ - def __init__ ( self, router_1, router_2, router_3, addr_suffix, total_messages, expected_1, expected_2, expected_3, slop, omit_middle_receiver ): + def __init__ ( self, + test_name, + router_1, + router_2, + router_3, + addr_suffix, + total_messages, + expected_1, + expected_2, + expected_3, + slop, + omit_middle_receiver + ): super ( BalancedTest, self ).__init__(prefetch=0, auto_accept=False) self.error = None self.router_3 = router_3 @@ -2256,6 +2511,7 @@ class BalancedTest ( MessagingHandler ): self.address_check_sender = None self.payload_sender = None + self.test_name = test_name def timeout ( self ): @@ -2384,7 +2640,13 @@ class MulticastTest ( MessagingHandler ): Using multicast, we should see all receivers get everything, whether the topology is linear or mesh. """ - def __init__ ( self, router_1, router_2, router_3, addr_suffix ): + def __init__ ( self, + test_name, + router_1, + router_2, + router_3, + addr_suffix + ): super ( MulticastTest, self ).__init__(prefetch=0) self.error = None self.router_1 = router_1 @@ -2409,7 +2671,8 @@ class MulticastTest ( MessagingHandler ): self.addr_check_receiver = None self.addr_check_sender = None self.sender = None - self.bailed = False + self.bailed = False + self.test_name = test_name def timeout ( self ): self.bail ( "Timeout Expired " ) @@ -2567,9 +2830,9 @@ class RoutingTest ( MessagingHandler ): """ # NOTE that no payload messages are sent in this test! I send some - # management messages to see when the router network is ready for - # me, but other than that, all I care about is the link-attaches - # that happen each time I make a sender -- and where they are + # management messages to see when the router network is ready for + # me, but other than that, all I care about is the link-attaches + # that happen each time I make a sender -- and where they are # routed to. # NOTE about STEP comments: take a look at comments marked with the @@ -2578,6 +2841,8 @@ class RoutingTest ( MessagingHandler ): # list that is passed in from the caller. def __init__ ( self, + test_name, + container_id, sender_host, route_container_addrs, linkroute_prefix, @@ -2585,8 +2850,7 @@ class RoutingTest ( MessagingHandler ): instructions, where_to_make_connections, n_local_containers, - n_remote_routers, - test_name + n_remote_routers ): super ( RoutingTest, self ).__init__(prefetch=0) @@ -2643,6 +2907,7 @@ class RoutingTest ( MessagingHandler ): self.sent_address_ready = False self.status = 'start up' + self.container_id = container_id def debug_print ( self, message ) : @@ -3063,7 +3328,7 @@ class RoutingTest ( MessagingHandler ): def run(self): container = Container(self) - container.container_id = 'LinkRouteTest' + container.container_id = self.container_id container.run() @@ -3073,14 +3338,21 @@ class RoutingTest ( MessagingHandler ): class WaypointTest ( MessagingHandler ): """ - Messages from a client sender to a client receiver are first + Messages from a client sender to a client receiver are first diverted out of the router into a separate waypoint receiver, - which stores them in a fifo. This simulates reception by a + which stores them in a fifo. This simulates reception by a broker or some other arbitrary process. - The message then returns from the waypoint sender back to the + The message then returns from the waypoint sender back to the router, and then arrives at the client receiver. """ - def __init__ ( self, client_host_1, client_host_2, route_container_host, destination ): + def __init__ ( self, + test_name, + container_id, + client_host_1, + client_host_2, + route_container_host, + destination + ): super(WaypointTest, self).__init__() self.client_host_1 = client_host_1 self.client_host_2 = client_host_2 @@ -3093,10 +3365,11 @@ class WaypointTest ( MessagingHandler ): self.waypoint_sender = None self.waypoint_receiver = None self.waypoint_queue = [] + self.container_id = container_id self.messages_per_sender = 10 - self.senders = [ + self.senders = [ { 'sender' : None, 'to_send' : 0, @@ -3109,7 +3382,7 @@ class WaypointTest ( MessagingHandler ): 'n_sent' : 0 } ] - self.receivers = [ + self.receivers = [ { 'receiver' : None, 'n_received' : 0 @@ -3135,7 +3408,8 @@ class WaypointTest ( MessagingHandler ): # the waypoint. self.n_expected_transitions = self.messages_per_sender * self.n_senders * 4 - self.debug = False + self.debug = False + self.test_name = test_name def timeout(self): @@ -3191,9 +3465,11 @@ class WaypointTest ( MessagingHandler ): def on_start ( self, event ): self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) ) - self.client_connection = event.container.connect ( self.client_host_1 ) + self.client_connection = event.container.connect ( self.client_host_1 ) - # Creating this connection is what gets things started. + # Creating this connection is what gets things started. When we make this + # connection to a route container address, the router will look at our + # containerId, and will at that time instantiate any associated autolinks. self.route_container_connection = event.container.connect ( self.route_container_host ) self.debug_print ( " creating clients for connection" ) @@ -3201,14 +3477,14 @@ class WaypointTest ( MessagingHandler ): sender = self.senders[i] receiver = self.receivers[i] - sender['sender'] = event.container.create_sender ( self.client_connection, - self.destination, + sender['sender'] = event.container.create_sender ( self.client_connection, + self.destination, name="sender_%d" % i) sender['to_send'] = self.messages_per_sender sender['n_sent'] = 0 - receiver['receiver'] = event.container.create_receiver ( self.client_connection, - self.destination, + receiver['receiver'] = event.container.create_receiver ( self.client_connection, + self.destination, name="receiver_%d" % i) receiver['n_received'] = 0 @@ -3247,7 +3523,7 @@ class WaypointTest ( MessagingHandler ): return if event.sender == self.waypoint_sender : self.send_from_waypoint ( ) - + def on_message ( self, event ): self.debug_print ( "on_message ---------------------------- " ) @@ -3280,7 +3556,7 @@ class WaypointTest ( MessagingHandler ): def run(self): container = Container(self) - container.container_id = 'WaypointTest' + container.container_id = self.container_id container.run() @@ -3289,32 +3565,40 @@ class WaypointTest ( MessagingHandler ): class SerialWaypointTest ( MessagingHandler ): """ - Messages from a client sender on their way to a client receiver are + Messages from a client sender on their way to a client receiver are first re-routed to two separate waypoint 'processes', in serial. The waypoint processes are simulated in this test by separate 'waypoint' receivers that store the messages in fifo lists, and separate 'waypoint' - senders that pop them off the fifos and send them. This simulates + senders that pop them off the fifos and send them. This simulates either a broker, or some arbitrary processing on the message. """ - def __init__ ( self, client_host_1, client_host_2, route_container_host, destination ): + def __init__ ( self, + test_name, + container_id, + client_host_1, + client_host_2, + route_container_host, + destination + ): super(SerialWaypointTest, self).__init__() self.client_host_1 = client_host_1 self.client_host_2 = client_host_2 self.route_container_host = route_container_host self.destination = destination + self.sender_connections = [] + self.error = None + self.messages_per_sender = 100 + self.container_id = container_id - self.sender_connections = [] self.route_container_connection = None - self.error = None - self.messages_per_sender = 100 # There are 2 sending clients and 2 receiving clients - # only because I wanted to have more than 1, and 2 - # appeared to be the next available integer. - # This has nothing to do with the fact that there are + # only because I wanted to have more than 1, and 2 + # appeared to be the next available integer. + # This has nothing to do with the fact that there are # 2 waypoints. - self.senders = [ + self.senders = [ { 'sender' : None, 'to_send' : self.messages_per_sender, 'n_sent' : 0 @@ -3325,7 +3609,7 @@ class SerialWaypointTest ( MessagingHandler ): } ] - self.receivers = [ + self.receivers = [ { 'receiver' : None, 'n_received' : 0 }, @@ -3361,15 +3645,16 @@ class SerialWaypointTest ( MessagingHandler ): self.n_thru = 0 self.n_transitions = 0 self.n_expected_received = self.messages_per_sender * len(self.senders) - - # Each message is sent from one client sender, and finally received - # by one client receiver. In the meantime in goes into, and then + + # Each message is sent from one client sender, and finally received + # by one client receiver. In the meantime in goes into, and then # comes back out of, two separate waypoints. That's a total of # six links -- or 'transitions' -- for each message. n_links_per_message = 2 + 2 * len(self.waypoints) self.n_expected_transitions = len(self.senders) * self.messages_per_sender * n_links_per_message - self.debug = False + self.debug = False + self.test_name = test_name def timeout(self): @@ -3419,22 +3704,25 @@ class SerialWaypointTest ( MessagingHandler ): self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) ) self.sender_connections.append ( event.container.connect(self.client_host_1) ) self.sender_connections.append ( event.container.connect(self.client_host_2) ) - # Creating this connection is what gets things started. + # Creating this connection is what gets things started. When we make this + # connection to a route container address, the router will look at our + # containerId, and will at that time instantiate any associated autolinks. self.route_container_connection = event.container.connect ( self.route_container_host ) + for i in range(len(self.sender_connections)) : - cnx = self.sender_connections[i] + cnx = self.sender_connections[i] sender = self.senders[i] receiver = self.receivers[i] - sender['sender'] = event.container.create_sender ( cnx, - self.destination, + sender['sender'] = event.container.create_sender ( cnx, + self.destination, name="sender_%d" % i) sender['to_send'] = self.messages_per_sender sender['n_sent'] = 0 - - receiver['receiver'] = event.container.create_receiver ( cnx, - self.destination, + + receiver['receiver'] = event.container.create_receiver ( cnx, + self.destination, name="receiver_%d" % i) receiver['n_received'] = 0 @@ -3462,7 +3750,7 @@ class SerialWaypointTest ( MessagingHandler ): if self.n_waypoint_receivers < 2 : self.waypoints[self.n_waypoint_receivers]['receiver'] = event.receiver self.n_waypoint_receivers += 1 - + def on_sendable ( self, event ): @@ -3551,7 +3839,327 @@ class SerialWaypointTest ( MessagingHandler ): if total_actual_waypoint_receptions != total_expected_waypoint_receptions : self.bail ( "total waypoint receptions were %d, but %d were expected." % ( total_actual_waypoint_receptions, total_expected_waypoint_receptions) ) return - + + total_messages_received = 0 + for i in range(len(self.receivers)) : + this_receiver_got = self.receivers[i]['n_received'] + total_messages_received += this_receiver_got + + if total_messages_received != total_messages_sent : + self.bail ( "total_messages_received: %d but %d were expected." % (total_messages_received, total_messages_sent) ) + return + + self.debug_print ( "\nsuccess\n" ) + self.bail ( None ) + + + + def report ( self ) : + print "\n\n==========================================================\nreport\n" + + for i in range(len(self.senders)) : + print " client sender %d sent %d messages." % ( i, self.senders[i]['n_sent'] ) + + print "\n" + + for i in range(len(self.waypoints)) : + print " waypoint %d received %d messages." % ( i, self.waypoints[i]['n_received'] ) + print " waypoint %d sent %d messages." % ( i, self.waypoints[i]['n_sent'] ) + + print "\n" + + for i in range(len(self.receivers)) : + print " client receiver %d received %d messages." % ( i, self.receivers[i]['n_received'] ) + + print "\nend report\n=========================================================\n\n" + + + + def run(self): + container = Container(self) + container.container_id = self.container_id + container.run() + + + + +class ParallelWaypointTest ( MessagingHandler ): + """ + Messages from a client sender on their way to a client receiver are + first re-routed to one of two separate waypoint 'processes', in parallel. + The waypoint processes are simulated in this test by separate 'waypoint' + receivers that store the messages in fifo lists, and separate 'waypoint' + senders that pop them off the fifos and send them. This simulates + either a broker, or some arbitrary processing on the message. + """ + def __init__ ( self, + test_name, + container_id, + client_host_1, + client_host_2, + route_container_host, + destination + ): + super ( ParallelWaypointTest, self ). __init__() + self.client_host_1 = client_host_1 + self.client_host_2 = client_host_2 + self.route_container_host = route_container_host + self.destination = destination + self.sender_connections = [] + self.error = None + self.messages_per_sender = 100 + self.container_id = container_id + + self.route_container_connection = None + + self.senders = [ + { 'sender' : None, + 'to_send' : self.messages_per_sender, + 'n_sent' : 0 + }, + { 'sender' : None, + 'to_send' : self.messages_per_sender, + 'n_sent' : 0 + } + ] + + self.receivers = [ + { 'receiver' : None, + 'n_received' : 0 + }, + { 'receiver' : None, + 'n_received' : 0 + } + ] + + self.n_waypoint_senders = 0 + self.n_waypoint_receivers = 0 + + self.waypoints = [ + { 'sender' : None, + 'n_sent' : 0, + 'receiver' : None, + 'n_received' : 0, + 'queue' : [], + 'n_sent' : 0, + 'name' : '1' + }, + { 'sender' : None, + 'n_sent' : 0, + 'receiver' : None, + 'n_received' : 0, + 'queue' : [], + 'n_sent' : 0, + 'name' : '2' + } + ] + + self.n_sent = 0 + self.n_rcvd = 0 + self.n_thru = 0 + self.n_transitions = 0 + self.n_expected_received = self.messages_per_sender * len(self.senders) + + # Each message is sent from one client sender, and finally received + # by one client receiver. In the meantime in goes into, and then + # comes back out of, ONE waypoint. That's a total of + # four links -- or 'transitions' -- for each message. + n_links_per_message = 4 + self.n_expected_transitions = len(self.senders) * self.messages_per_sender * n_links_per_message + + self.debug = False + + self.test_name = test_name + + + def timeout(self): + self.bail ( "Timeout Expired: n_sent=%d n_rcvd=%d n_thru=%d" % (self.n_sent, self.n_rcvd, self.n_thru) ) + + + def bail ( self, text ): + self.error = text + self.route_container_connection.close() + for cnx in self.sender_connections : + cnx.close() + self.timer.cancel() + + + def debug_print ( self, message ) : + if self.debug : + print message + + + def send_from_client ( self, sender, n_messages, sender_index ): + n_sent = 0 + while sender.credit > 0 and n_sent < n_messages: + msg = Message ( body=n_sent ) + sender.send ( msg ) + n_sent += 1 + self.n_sent += 1 + self.n_transitions += 1 + self.debug_print ( "send_from_client -- sender: %d n_sent: %d" % ( sender_index, n_sent ) ) + + + + def send_from_waypoint ( self, waypoint ): + self.debug_print ( "send_from_waypoint ------------------------------" ) + + while waypoint['sender'].credit > 0 and len(waypoint['queue']) > 0: + m = waypoint['queue'].pop() + message_content_number = m.body + waypoint['sender'].send ( m ) + waypoint['n_sent'] += 1 + self.n_thru += 1 + self.n_transitions += 1 + self.debug_print ( "send_from_waypoint %s is %d " % ( waypoint['name'], message_content_number) ) + + + + def on_start ( self, event ): + self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) ) + self.sender_connections.append ( event.container.connect(self.client_host_1) ) + self.sender_connections.append ( event.container.connect(self.client_host_2) ) + # Creating this connection is what gets things started. When we make this + # connection to a route container address, the router will look at our + # containerId, and will at that time instantiate any associated autolinks. + # We will get an 'on_link_opening' for each of them. + self.route_container_connection = event.container.connect ( self.route_container_host ) + + for i in range(len(self.sender_connections)) : + cnx = self.sender_connections[i] + sender = self.senders[i] + receiver = self.receivers[i] + + sender['sender'] = event.container.create_sender ( cnx, + self.destination, + name="sender_%d" % i) + sender['to_send'] = self.messages_per_sender + sender['n_sent'] = 0 + receiver['receiver'] = event.container.create_receiver ( cnx, + self.destination, + name="receiver_%d" % i) + receiver['n_received'] = 0 + + + def on_link_opening ( self, event ): + + self.debug_print ( "on_link_opening -------------------------- " ) + + if event.sender: + self.debug_print ( " sender: %s" % event.sender.remote_source.address ) + event.sender.source.address = event.sender.remote_source.address + event.sender.open() + if event.sender.remote_source.address == self.destination: + if self.n_waypoint_senders < 2 : + self.debug_print ( " store this as one of my waypoint senders." ) + self.waypoints[self.n_waypoint_senders]['sender'] = event.sender + self.n_waypoint_senders += 1 + + elif event.receiver: + self.debug_print ( " receiver: %s" % event.receiver.remote_target.address ) + event.receiver.target.address = event.receiver.remote_target.address + event.receiver.open() + if event.receiver.remote_target.address == self.destination: + self.debug_print ( " store this as one of my waypoint receivers." ) + if self.n_waypoint_receivers < 2 : + self.waypoints[self.n_waypoint_receivers]['receiver'] = event.receiver + self.n_waypoint_receivers += 1 + + + + def on_sendable ( self, event ): + self.debug_print ( "on_sendable ------------------------------" ) + for index in range(len(self.senders)) : + sender = self.senders[index] + if event.sender == sender['sender'] : + self.debug_print ( " client sender %d" % index ) + if sender['n_sent'] < sender['to_send'] : + self.debug_print ( " sending %d" % sender['to_send'] ) + self.send_from_client ( sender['sender'], sender['to_send'], index ) + sender['n_sent'] = sender['to_send'] # n_sent = n_to_send + else : + self.debug_print ( " this sender is already finished." ) + return + + for j in range(len(self.waypoints)) : + sender = self.waypoints[j]['sender'] + if event.sender == sender : + self.debug_print ( " waypoint_sender %d" % j ) + self.send_from_waypoint ( self.waypoints[j] ) + return + + + def on_message(self, event): + + self.debug_print ( "on_message --------------------------- " ) + + # Is this one of our client receivers ? + for i in range(len(self.receivers)) : + receiver = self.receivers[i] + if event.receiver == receiver['receiver'] : + receiver['n_received'] += 1 + self.n_transitions += 1 + self.debug_print (" client receiver %d has %d messages." % ( i, receiver['n_received'] ) ) + message_content_number = event.message.body + self.n_rcvd += 1 + if self.n_rcvd >= self.n_expected_received and self.n_thru >= self.n_expected_received: + self.debug_print ( "DONE -- self.n_rcvd: %d self.n_thru: %d" % ( self.n_rcvd, self.n_thru ) ) + if self.debug : + self.report ( ) + self.check_results_and_bail ( ) + return + + # Is this one of our waypoint receivers ? + for j in range(len(self.waypoints)) : + waypoint = self.waypoints[j] + if event.receiver == waypoint['receiver'] : + m = Message ( body=event.message.body ) + waypoint [ 'queue' ].append ( m ) + waypoint [ 'n_received' ] += 1 + self.n_transitions += 1 + self.debug_print ( " message received at waypoint %d, queue depth is now %d" % (j, len(waypoint['queue']))) + self.send_from_waypoint ( waypoint ) + + + + def check_results_and_bail ( self ) : + + if self.n_expected_transitions != self.n_transitions : + self.bail ( "total transitions were %d, but %d were expected." % ( self.n_transitions, self.n_expected_transitions ) ) + return + + mps = self.messages_per_sender + n_senders = len(self.senders) + total_messages_sent = mps * n_senders + + # For total messages sent, the expected value and + # the actual value must be the same. The two receivers + # may receive different numbers (although the total should + # be correct) but each of the senders must send the expected + # number of messages or something is wrong. + for i in range(n_senders) : + sndr = self.senders[i] + if sndr['n_sent'] != mps : + self.bail ( "sender %d sent %d messages instead of %d" % ( i, sndr['n_sent'], mps ) ) + return + + n_waypoints = len(self.waypoints) + + # In this test, each message only hits one waypoint, not both. + # So the expected number of waypoint receptions is the same + # as the total number of messages sent. + total_expected_waypoint_receptions = total_messages_sent + total_actual_waypoint_receptions = 0 + + for i in range(n_waypoints) : + total_actual_waypoint_receptions += self.waypoints[i]['n_received'] + + if total_actual_waypoint_receptions != total_expected_waypoint_receptions : + self.bail ( "total waypoint receptions were %d, but %d were expected." % ( total_actual_waypoint_receptions, total_expected_waypoint_receptions) ) + return + + # Finally, our client receivers must receiv one message + # for every one that was originally sent out by the client senders. total_messages_received = 0 for i in range(len(self.receivers)) : this_receiver_got = self.receivers[i]['n_received'] @@ -3562,10 +4170,8 @@ class SerialWaypointTest ( MessagingHandler ): return self.debug_print ( "\nsuccess\n" ) - self.bail ( None ) - + self.bail ( None ) - def report ( self ) : @@ -3588,9 +4194,10 @@ class SerialWaypointTest ( MessagingHandler ): print "\nend report\n=========================================================\n\n" + def run(self): container = Container(self) - container.container_id = 'WaypointTest2' + container.container_id = self.container_id container.run() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
