This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch DISPATCH-1264 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit a5aab94a090792b2c47d13188058a00bfab32ef7 Author: Chuck Rolke <[email protected]> AuthorDate: Wed Feb 6 12:28:47 2019 -0500 DISPATCH-1246: Create a hacked version of test for research * Add logging with uS timestamp for correlation with router logs * Message content modified to indicate which message from self.program * Fix DISPATCH-1246 by allowing only one message in flight at a time --- tests/system_tests_delivery_abort.py | 80 +++++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 25 deletions(-) diff --git a/tests/system_tests_delivery_abort.py b/tests/system_tests_delivery_abort.py index 9b7418b..edf857c 100644 --- a/tests/system_tests_delivery_abort.py +++ b/tests/system_tests_delivery_abort.py @@ -22,6 +22,8 @@ from __future__ import division from __future__ import absolute_import from __future__ import print_function +import sys +import datetime import unittest2 as unittest from proton import Message, Timeout from system_test import TestCase, Qdrouterd, main_module @@ -68,7 +70,7 @@ class RouterTest(TestCase): cls.routers[1].wait_router_connected('A') - def test_01_message_route_truncated_one_router(self): + def txest_01_message_route_truncated_one_router(self): test = MessageRouteTruncateTest(self.routers[0].addresses[0], self.routers[0].addresses[0], "addr_01") @@ -76,7 +78,7 @@ class RouterTest(TestCase): self.assertEqual(None, test.error) - def test_02_message_route_truncated_two_routers(self): + def txest_02_message_route_truncated_two_routers(self): test = MessageRouteTruncateTest(self.routers[0].addresses[0], self.routers[1].addresses[0], "addr_02") @@ -84,7 +86,7 @@ class RouterTest(TestCase): self.assertEqual(None, test.error) - def test_03_link_route_truncated_one_router(self): + def txest_03_link_route_truncated_one_router(self): test = LinkRouteTruncateTest(self.routers[0].addresses[0], self.routers[0].addresses[1], "link.addr_03", @@ -93,7 +95,7 @@ class RouterTest(TestCase): self.assertEqual(None, test.error) - def test_04_link_route_truncated_two_routers(self): + def txest_04_link_route_truncated_two_routers(self): test = LinkRouteTruncateTest(self.routers[1].addresses[0], self.routers[0].addresses[1], "link.addr_04", @@ -102,7 +104,7 @@ class RouterTest(TestCase): self.assertEqual(None, test.error) - def test_05_message_route_abort_one_router(self): + def txest_05_message_route_abort_one_router(self): test = MessageRouteAbortTest(self.routers[0].addresses[0], self.routers[0].addresses[0], "addr_05") @@ -118,7 +120,7 @@ class RouterTest(TestCase): self.assertEqual(None, test.error) - def test_07_multicast_truncate_one_router(self): + def txest_07_multicast_truncate_one_router(self): test = MulticastTruncateTest(self.routers[0].addresses[0], self.routers[0].addresses[0], self.routers[0].addresses[0], @@ -403,21 +405,32 @@ class MessageRouteAbortTest(MessagingHandler): self.sender1 = None self.receiver = None self.delivery = None - - self.program = [('D', 10), ('D', 10), ('A', 10), ('A', 10), ('D', 10), ('D', 10), - ('A', 100), ('D', 100), - ('A', 1000), ('A', 1000), ('A', 1000), ('A', 1000), ('A', 1000), ('D', 1000), - ('A', 10000), ('A', 10000), ('A', 10000), ('A', 10000), ('A', 10000), ('D', 10000), - ('A', 100000), ('A', 100000), ('A', 100000), ('A', 100000), ('A', 100000), ('D', 100000), ('F', 10)] + self.delivery_info = "" + self.ok_to_send = True + self.log_enable = True + + self.program = [('D', 10), ('D', 20), ('A', 30), ('A', 40), ('D', 50), ('D', 60), + ('A', 100), ('D', 200), + ('A', 1000), ('A', 2000), ('A', 3000), ('A', 4000), ('A', 5000), ('D', 6000), + ('A', 10000), ('A', 20000), ('A', 30000), ('A', 40000), ('A', 50000), ('D', 60000), + ('A', 100000), ('A', 200000), ('A', 300000), ('A', 400000), ('A', 500000), ('D', 600000), ('F', 10)] self.result = [] - self.expected_result = [10, 10, 10, 10, 100, 1000, 10000, 100000] + self.expected_result = [10, 20, 50, 60, 200, 6000, 60000, 600000] + + def log(self, format, args=None): + user_log = format % args if args is not None else format + dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') + print(dt, user_log) + sys.stdout.flush() def timeout(self): self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result) + self.log("timeout --- self.program : %s, result : %s", (self.program, self.result)) self.sender_conn.close() self.receiver_conn.close() def on_start(self, event): + print() self.timer = event.reactor.schedule(10.0, Timeout(self)) self.sender_conn = event.container.connect(self.sender_host) self.receiver_conn = event.container.connect(self.receiver_host) @@ -426,23 +439,20 @@ class MessageRouteAbortTest(MessagingHandler): def send(self): op, size = self.program.pop(0) if len(self.program) > 0 else (None, None) - + self.log("send() op: %s, size: %d", (op, size)) if op == None: return - body = "" - if op == 'F': - body = "FINISH" - else: - for i in range(size // 10): - body += "0123456789" - msg = Message(body=body) - + msg = Message(body="FINISH" if op == 'F' else ("%s %06d " % (op, size)) * (size // 10)) + if op in 'DF': + self.log("self.sender1.send(msg) : '%s'", (msg.body[0:20])) delivery = self.sender1.send(msg) if op == 'A': self.delivery = self.sender1.delivery(self.sender1.delivery_tag()) + self.delivery_info = "To abort length %d" % size + self.log("Sending message to be aborted : %s", (self.delivery_info)) encoded = msg.encode() self.sender1.stream(encoded) @@ -454,19 +464,39 @@ class MessageRouteAbortTest(MessagingHandler): self.timer.cancel() def on_sendable(self, event): + self.log("on_sendable()") if event.sender == self.sender1: if self.delivery: - self.delivery.abort() + self.log("Aborting: %s", (self.delivery_info)) + if not self.delivery.settled: + self.delivery.abort() + self.ok_to_send = True + else: + self.log("Can't abort delivery because it is already settled") self.delivery = None + self.delivery_info = "" else: - self.send() + if self.ok_to_send: + self.send() + self.ok_to_send = False + else: + self.log(" Not sender1") def on_message(self, event): m = event.message + self.log("on_message(). len: %d, m.body: %s", (len(m.body), m.body[0:9])) if m.body == "FINISH": self.finish() else: - self.result.append(len(m.body)) + if m.body.startswith("D"): + # message expected + self.result.append(len(m.body)) + else: + # message unexpected: DISPATCH-1264 finds on sendable too slow + # and abort gets missed + self.log("Received message that should have been aborted. Delivery_info: %s, body: %s", (self.delivery_info, m.body[0:9])) + self.delivery = None + self.delivery_info = "" self.send() def run(self): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
