This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 9275c1e DISPATCH-2164: Removed timers and subsciber count checks in favor of sending test messages. This closes #1332. 9275c1e is described below commit 9275c1e72251340953747ae80b7d50e752377312 Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Wed Aug 4 12:45:23 2021 -0400 DISPATCH-2164: Removed timers and subsciber count checks in favor of sending test messages. This closes #1332. --- tests/system_tests_edge_router.py | 321 ++++++++++++++------------------------ 1 file changed, 115 insertions(+), 206 deletions(-) diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index 84ce7b4..628f72b 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -619,9 +619,7 @@ class RouterTest(TestCase): self.routers[2].addresses[0], self.routers[2].addresses[0], self.routers[2].addresses[0], - "multicast.24", - self.routers[2].addresses[0], - subscriber_count=3) + "multicast.24") test.run() self.assertIsNone(test.error) @@ -635,9 +633,7 @@ class RouterTest(TestCase): self.routers[2].addresses[0], self.routers[3].addresses[0], self.routers[3].addresses[0], - "multicast.25", - self.routers[0].addresses[0], - subscriber_count=2) + "multicast.25") test.run() self.assertIsNone(test.error) @@ -651,9 +647,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[0].addresses[0], self.routers[2].addresses[0], - "multicast.26", - self.routers[0].addresses[0], - subscriber_count=3) + "multicast.26") test.run() self.assertIsNone(test.error) @@ -666,9 +660,7 @@ class RouterTest(TestCase): self.routers[2].addresses[0], self.routers[3].addresses[0], self.routers[0].addresses[0], - "multicast.27", - self.routers[0].addresses[0], - subscriber_count=2) + "multicast.27") test.run() self.assertIsNone(test.error) @@ -682,9 +674,7 @@ class RouterTest(TestCase): self.routers[2].addresses[0], self.routers[3].addresses[0], self.routers[1].addresses[0], - "multicast.28", - self.routers[0].addresses[0], - subscriber_count=2) + "multicast.28") test.run() self.assertIsNone(test.error) @@ -697,9 +687,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[4].addresses[0], self.routers[0].addresses[0], - "multicast.29", - self.routers[0].addresses[0], - subscriber_count=3) + "multicast.29") test.run() self.assertIsNone(test.error) @@ -711,9 +699,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[4].addresses[0], self.routers[5].addresses[0], - "multicast.30", - self.routers[0].addresses[0], - subscriber_count=3) + "multicast.30") test.run() self.assertIsNone(test.error) @@ -730,9 +716,7 @@ class RouterTest(TestCase): self.routers[2].addresses[0], self.routers[2].addresses[0], "multicast.31", - self.routers[2].addresses[0], - large_msg=True, - subscriber_count=3) + large_msg=True) test.run() self.assertIsNone(test.error) @@ -747,9 +731,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[3].addresses[0], "multicast.32", - self.routers[0].addresses[0], - large_msg=True, - subscriber_count=2) + large_msg=True) test.run() self.assertIsNone(test.error) @@ -764,9 +746,7 @@ class RouterTest(TestCase): self.routers[0].addresses[0], self.routers[2].addresses[0], "multicast.33", - self.routers[0].addresses[0], - large_msg=True, - subscriber_count=3) + large_msg=True) test.run() self.assertIsNone(test.error) @@ -780,9 +760,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[0].addresses[0], "multicast.34", - self.routers[0].addresses[0], - large_msg=True, - subscriber_count=2) + large_msg=True) test.run() self.assertIsNone(test.error) @@ -797,9 +775,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[1].addresses[0], "multicast.35", - self.routers[0].addresses[0], - large_msg=True, - subscriber_count=2) + large_msg=True) test.run() self.assertIsNone(test.error) @@ -813,9 +789,7 @@ class RouterTest(TestCase): self.routers[4].addresses[0], self.routers[0].addresses[0], "multicast.36", - self.routers[0].addresses[0], - large_msg=True, - subscriber_count=3) + large_msg=True) test.run() self.assertIsNone(test.error) @@ -828,9 +802,7 @@ class RouterTest(TestCase): self.routers[4].addresses[0], self.routers[5].addresses[0], "multicast.37", - self.routers[0].addresses[0], - large_msg=True, - subscriber_count=3) + large_msg=True) test.run() self.assertIsNone(test.error) @@ -1001,9 +973,7 @@ class RouterTest(TestCase): self.routers[2].addresses[0], self.routers[2].addresses[0], "multicast.52", - self.routers[2].addresses[0], - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1018,9 +988,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[3].addresses[0], "multicast.53", - self.routers[0].addresses[0], - anon_sender=True, - subscriber_count=2) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1035,9 +1003,7 @@ class RouterTest(TestCase): self.routers[0].addresses[0], self.routers[2].addresses[0], "multicast.54", - self.routers[0].addresses[0], - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1051,9 +1017,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[0].addresses[0], "multicast.55", - self.routers[0].addresses[0], - anon_sender=True, - subscriber_count=2) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1068,9 +1032,7 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[1].addresses[0], "multicast.56", - self.routers[0].addresses[0], - anon_sender=True, - subscriber_count=2) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1084,9 +1046,7 @@ class RouterTest(TestCase): self.routers[4].addresses[0], self.routers[0].addresses[0], "multicast.57", - self.routers[0].addresses[0], - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1099,9 +1059,7 @@ class RouterTest(TestCase): self.routers[4].addresses[0], self.routers[5].addresses[0], "multicast.58", - self.routers[0].addresses[0], - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1118,10 +1076,8 @@ class RouterTest(TestCase): self.routers[2].addresses[0], self.routers[2].addresses[0], "multicast.59", - self.routers[2].addresses[0], large_msg=True, - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1136,10 +1092,8 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[3].addresses[0], "multicast.60", - self.routers[0].addresses[0], large_msg=True, - anon_sender=True, - subscriber_count=2) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1154,10 +1108,8 @@ class RouterTest(TestCase): self.routers[0].addresses[0], self.routers[2].addresses[0], "multicast.61", - self.routers[0].addresses[0], large_msg=True, - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1171,10 +1123,8 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[0].addresses[0], "multicast.62", - self.routers[0].addresses[0], large_msg=True, - anon_sender=True, - subscriber_count=2) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1189,10 +1139,8 @@ class RouterTest(TestCase): self.routers[3].addresses[0], self.routers[1].addresses[0], "multicast.63", - self.routers[0].addresses[0], large_msg=True, - anon_sender=True, - subscriber_count=2) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1206,10 +1154,8 @@ class RouterTest(TestCase): self.routers[4].addresses[0], self.routers[0].addresses[0], "multicast.64", - self.routers[0].addresses[0], large_msg=True, - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -1222,10 +1168,8 @@ class RouterTest(TestCase): self.routers[4].addresses[0], self.routers[5].addresses[0], "multicast.65", - self.routers[0].addresses[0], large_msg=True, - anon_sender=True, - subscriber_count=3) + anon_sender=True) test.run() self.assertIsNone(test.error) @@ -2499,8 +2443,8 @@ class MobileAddressOneSenderTwoReceiversTest(MessagingHandler): class MobileAddressMulticastTest(MessagingHandler): def __init__(self, receiver1_host, receiver2_host, receiver3_host, - sender_host, address, check_addr_host=None, large_msg=False, - anon_sender=False, subscriber_count=0): + sender_host, address, large_msg=False, + anon_sender=False): super(MobileAddressMulticastTest, self).__init__() self.receiver1_host = receiver1_host self.receiver2_host = receiver2_host @@ -2508,7 +2452,6 @@ class MobileAddressMulticastTest(MessagingHandler): self.sender_host = sender_host self.address = address self.anon_sender = anon_sender - self.subscriber_count = subscriber_count # One sender connection and two receiver connections self.receiver1_conn = None @@ -2547,11 +2490,12 @@ class MobileAddressMulticastTest(MessagingHandler): # address has propagated. self.max_attempts = 5 self.num_attempts = 0 - self.num_attempts = 0 self.container = None - self.check_addr_host = check_addr_host - if not self.check_addr_host: - self.check_addr_host = self.sender_host + self.test_msg_received_r1 = False + self.test_msg_received_r2 = False + self.test_msg_received_r3 = False + self.initial_msg_sent = False + self.n_accepted = 0 if self.large_msg: self.body = "0123456789101112131415" * 5000 @@ -2576,105 +2520,95 @@ class MobileAddressMulticastTest(MessagingHandler): if self.sender_conn: self.sender_conn.close() - def create_sndr(self): - self.sender_conn = self.container.connect(self.sender_host) - if self.anon_sender: - self.sender = self.container.create_sender(self.sender_conn) - else: - self.sender = self.container.create_sender(self.sender_conn, - self.address) - - def check_address(self): - local_node = Node.connect(self.check_addr_host, timeout=TIMEOUT) - outs = local_node.query(type='org.apache.qpid.dispatch.router.address') - found = False - - subscriber_count_index = outs.attribute_names.index("subscriberCount") - remote_count_index = outs.attribute_names.index("remoteCount") - - self.num_attempts += 1 - for result in outs.results: - if self.address in result[0]: - # We are good if the sum of subscriberCount and remoteCount - # equals the total subscriber_count - if self.subscriber_count == 0 or (result[subscriber_count_index] + result[remote_count_index] == self.subscriber_count): - # The address is in the address table and the subscriber count is as expected. - # subscriberCount match means that both edge routers have - # told the interior router about the existence of the address - # If this has not happened yet, we will try again. - found = True - self.create_sndr() - local_node.close() - self.addr_timer.cancel() - break - - if not found: - if self.num_attempts < self.max_attempts: - self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self)) - else: - self.error = "Unable to create sender because of " \ - "absence of address in the address table" - self.timeout() - local_node.close() - def on_start(self, event): self.reactor = event.reactor self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) - # Create two receivers self.receiver1_conn = event.container.connect(self.receiver1_host) self.receiver2_conn = event.container.connect(self.receiver2_host) self.receiver3_conn = event.container.connect(self.receiver3_host) + + # Create receivers and sender all in one shot, no need to check for any address table + # before creating sender self.receiver1 = event.container.create_receiver(self.receiver1_conn, self.address) self.receiver2 = event.container.create_receiver(self.receiver2_conn, self.address) self.receiver3 = event.container.create_receiver(self.receiver3_conn, self.address) - self.container = event.container + self.sender_conn = event.container.connect(self.sender_host) + if self.anon_sender: + self.sender = event.container.create_sender(self.sender_conn) + else: + self.sender = event.container.create_sender(self.sender_conn, + self.address) - def on_link_opened(self, event): - if event.receiver == self.receiver1 or \ - event.receiver == self.receiver2 or \ - event.receiver == self.receiver3: - self.r_attaches += 1 - if self.r_attaches == 3: - self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self)) + def send_test_message(self): + msg = Message(body="Test Message") + if self.anon_sender: + msg.address = self.address + self.sender.send(msg) + + def send(self): + if self.large_msg: + msg = Message(body=self.body, properties=self.properties) + else: + msg = Message(body="Message %d" % self.n_sent) + if self.anon_sender: + msg.address = self.address + msg.correlation_id = self.n_sent + self.sender.send(msg) + + def on_accepted(self, event): + if self.test_msg_received_r1 and self.test_msg_received_r2 and self.test_msg_received_r3: + # All receivers have received the test message. + # Now fire off 100 messages to see if the message was multicasted to all + # receivers. + self.n_accepted += 1 + while self.n_sent < self.count: + self.send() + self.n_sent += 1 + else: + self.send_test_message() def on_sendable(self, event): - while self.n_sent < self.count: - msg = None - if self.large_msg: - msg = Message(body=self.body, properties=self.properties) - else: - msg = Message(body="Message %d" % self.n_sent) - if self.anon_sender: - msg.address = self.address - msg.correlation_id = self.n_sent - self.sender.send(msg) - self.n_sent += 1 + if not self.initial_msg_sent: + # First send a single test message. This message + # could be accepted or released based on if + # some receiver is already online to receive the message + self.send_test_message() + self.initial_msg_sent = True def on_message(self, event): if event.receiver == self.receiver1: - if self.recvd1_msgs.get(event.message.correlation_id): - self.dup_msg = event.message.correlation_id - self.receiver_name = "Receiver 1" - self.timeout() - self.n_rcvd1 += 1 - self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id + if event.message.body == "Test Message": + self.test_msg_received_r1 = True + else: + if self.recvd1_msgs.get(event.message.correlation_id): + self.dup_msg = event.message.correlation_id + self.receiver_name = "Receiver 1" + self.timeout() + self.n_rcvd1 += 1 + self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id if event.receiver == self.receiver2: - if self.recvd2_msgs.get(event.message.correlation_id): - self.dup_msg = event.message.correlation_id - self.receiver_name = "Receiver 2" - self.timeout() - self.n_rcvd2 += 1 - self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id + if event.message.body == "Test Message": + self.test_msg_received_r2 = True + else: + if self.recvd2_msgs.get(event.message.correlation_id): + self.dup_msg = event.message.correlation_id + self.receiver_name = "Receiver 2" + self.timeout() + self.n_rcvd2 += 1 + self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id if event.receiver == self.receiver3: - if self.recvd3_msgs.get(event.message.correlation_id): - self.dup_msg = event.message.correlation_id - self.receiver_name = "Receiver 3" - self.timeout() - self.n_rcvd3 += 1 - self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id + if event.message.body == "Test Message": + self.test_msg_received_r3 = True + else: + if self.recvd3_msgs.get(event.message.correlation_id): + self.dup_msg = event.message.correlation_id + self.receiver_name = "Receiver 3" + self.timeout() + self.n_rcvd3 += 1 + self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id if self.n_rcvd1 == self.count and self.n_rcvd2 == self.count and \ self.n_rcvd3 == self.count: @@ -2684,6 +2618,9 @@ class MobileAddressMulticastTest(MessagingHandler): self.receiver3_conn.close() self.sender_conn.close() + def on_released(self, event): + self.send_test_message() + def run(self): Container(self).run() @@ -2692,15 +2629,15 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest): # failure scenario - cause some receiving clients to close while a large # message is in transit def __init__(self, receiver1_host, receiver2_host, receiver3_host, - sender_host, address, check_addr_host=None, large_msg=True): + sender_host, address, large_msg=True, anon_sender=False): super(MobileAddrMcastDroppedRxTest, self).__init__(receiver1_host, receiver2_host, receiver3_host, sender_host, address, - check_addr_host=check_addr_host, - large_msg=large_msg) - self.n_accepted = 0 + large_msg=large_msg, + anon_sender=anon_sender) + self.n_released = 0 self.recv1_closed = False self.recv2_closed = False @@ -2725,59 +2662,31 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest): self.receiver2_conn.close() def on_accepted(self, event): - self.n_accepted += 1 + super(MobileAddrMcastDroppedRxTest, self).on_accepted(event) self._check_done() def on_released(self, event): + super(MobileAddrMcastDroppedRxTest, self).on_released(event) self.n_released += 1 self._check_done() -class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddressMulticastTest): +class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddrMcastDroppedRxTest): # failure scenario - cause some receiving clients to close while a large # message is in transit def __init__(self, receiver1_host, receiver2_host, receiver3_host, - sender_host, address, check_addr_host=None, large_msg=True, anon_sender=True): + sender_host, address, large_msg=True, anon_sender=True): super(MobileAddrMcastAnonSenderDroppedRxTest, self).__init__(receiver1_host, receiver2_host, receiver3_host, sender_host, address, - check_addr_host=check_addr_host, large_msg=large_msg, anon_sender=anon_sender) - self.n_accepted = 0 self.n_released = 0 self.recv1_closed = False self.recv2_closed = False - def _check_done(self): - if self.n_accepted + self.n_released == self.count: - self.receiver3_conn.close() - self.sender_conn.close() - self.timer.cancel() - - def on_message(self, event): - super(MobileAddrMcastAnonSenderDroppedRxTest, self).on_message(event) - - # start closing receivers - if self.n_rcvd1 == 50: - if not self.recv1_closed: - self.receiver1_conn.close() - self.recv1_closed = True - if self.n_rcvd2 == 75: - if not self.recv2_closed: - self.recv2_closed = True - self.receiver2_conn.close() - - def on_accepted(self, event): - self.n_accepted += 1 - self._check_done() - - def on_released(self, event): - self.n_released += 1 - self._check_done() - class MobileAddressEventTest(MessagingHandler): def __init__(self, receiver1_host, receiver2_host, receiver3_host, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org