This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new bc98248 DISPATCH-1757: Add error logging for system_tests_edge_router bc98248 is described below commit bc9824899e55af427771e7a57e85311aa41eee76 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Tue Jul 20 11:41:33 2021 -0400 DISPATCH-1757: Add error logging for system_tests_edge_router This closes #1313 --- tests/system_test.py | 31 ++++++++++++++++++++++++++ tests/system_tests_edge_router.py | 46 +++++++++++++++++++++++++++------------ tests/test_broker.py | 31 ++++++++++++++++++++++---- 3 files changed, 90 insertions(+), 18 deletions(-) diff --git a/tests/system_test.py b/tests/system_test.py index 01cb585..94a6613 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -868,11 +868,15 @@ class AsyncTestReceiver(MessagingHandler): def get(self, timeout=TIMEOUT): self._async_receiver.num_queue_gets += 1 msg = super(AsyncTestReceiver.MyQueue, self).get(timeout=timeout) + self._async_receiver._logger.log("message %d get" + % self._async_receiver.num_queue_gets) return msg def put(self, msg): self._async_receiver.num_queue_puts += 1 super(AsyncTestReceiver.MyQueue, self).put(msg) + self._async_receiver._logger.log("message %d put" + % self._async_receiver.num_queue_puts) def __init__(self, address, source, conn_args=None, container_id=None, wait=True, recover_link=False, msg_args=None): @@ -892,6 +896,7 @@ class AsyncTestReceiver(MessagingHandler): self._recover_count = 0 self._stop_thread = False self._thread = Thread(target=self._main) + self._logger = Logger(title="AsyncTestReceiver %s" % cid) self._thread.daemon = True self._thread.start() self.num_queue_puts = 0 @@ -906,16 +911,19 @@ class AsyncTestReceiver(MessagingHandler): def _main(self): self._container.timeout = 0.5 self._container.start() + self._logger.log("Starting reactor") while self._container.process(): if self._stop_thread: if self._conn: self._conn.close() self._conn = None + self._logger.log("reactor thread done") def stop(self, timeout=TIMEOUT): self._stop_thread = True self._container.wakeup() self._thread.join(timeout=TIMEOUT) + self._logger.log("thread done") if self._thread.is_alive(): raise Exception("AsyncTestReceiver did not exit") del self._conn @@ -928,13 +936,16 @@ class AsyncTestReceiver(MessagingHandler): self._conn = event.container.connect(**kwargs) def on_connection_opened(self, event): + self._logger.log("Connection opened") kwargs = {'source': self.source} event.container.create_receiver(event.connection, **kwargs) def on_link_opened(self, event): + self._logger.log("link opened") self._ready.set() def on_link_closing(self, event): + self._logger.log("link closing") event.link.close() if self._recover_link and not self._stop_thread: # lesson learned: the generated link name will be the same as the @@ -951,10 +962,14 @@ class AsyncTestReceiver(MessagingHandler): def on_disconnected(self, event): # if remote terminates the connection kill the thread else it will spin # on the cpu + self._logger.log("Disconnected") if self._conn: self._conn.close() self._conn = None + def dump_log(self): + self._logger.dump() + class AsyncTestSender(MessagingHandler): """ @@ -990,14 +1005,17 @@ class AsyncTestSender(MessagingHandler): self._link_name = "%s-%s" % (cid, "tx") self._thread = Thread(target=self._main) self._thread.daemon = True + self._logger = Logger(title="AsyncTestSender %s" % cid) self._thread.start() self.msg_stats = "self.sent=%d, self.accepted=%d, self.released=%d, self.modified=%d, self.rejected=%d" def _main(self): self._container.timeout = 0.5 self._container.start() + self._logger.log("Starting reactor") while self._container.process(): self._check_if_done() + self._logger.log("reactor thread done") def get_msg_stats(self): return self.msg_stats % (self.sent, self.accepted, self.released, self.modified, self.rejected) @@ -1005,6 +1023,7 @@ class AsyncTestSender(MessagingHandler): def wait(self): # don't stop it - wait until everything is sent self._thread.join(timeout=TIMEOUT) + self._logger.log("thread done") assert not self._thread.is_alive(), "sender did not complete" if self.error: raise AsyncTestSender.TestSenderException(self.error) @@ -1016,6 +1035,7 @@ class AsyncTestSender(MessagingHandler): self._conn = self._container.connect(self.address) def on_connection_opened(self, event): + self._logger.log("Connection opened") option = AtMostOnce if self.presettle else AtLeastOnce self._sender = self._container.create_sender(self._conn, target=self.target, @@ -1026,6 +1046,7 @@ class AsyncTestSender(MessagingHandler): if self.sent < self.total: self._sender.send(self._message) self.sent += 1 + self._logger.log("message %d sent" % self.sent) def _check_if_done(self): done = (self.sent == self.total @@ -1037,10 +1058,12 @@ class AsyncTestSender(MessagingHandler): self.address) self._conn.close() self._conn = None + self._logger.log("Connection closed") def on_accepted(self, event): self.accepted += 1 event.delivery.settle() + self._logger.log("message %d accepted" % self.accepted) def on_released(self, event): # for some reason Proton 'helpfully' calls on_released even though the @@ -1049,17 +1072,21 @@ class AsyncTestSender(MessagingHandler): return self.on_modified(event) self.released += 1 event.delivery.settle() + self._logger.log("message %d released" % self.released) def on_modified(self, event): self.modified += 1 event.delivery.settle() + self._logger.log("message %d modified" % self.modified) def on_rejected(self, event): self.rejected += 1 event.delivery.settle() + self._logger.log("message %d rejected" % self.rejected) def on_link_error(self, event): self.error = "link error:%s" % str(event.link.remote_condition) + self._logger.log(self.error) if self._conn: self._conn.close() self._conn = None @@ -1068,10 +1095,14 @@ class AsyncTestSender(MessagingHandler): # if remote terminates the connection kill the thread else it will spin # on the cpu self.error = "connection to remote dropped" + self._logger.log(self.error) if self._conn: self._conn.close() self._conn = None + def dump_log(self): + self._logger.dump() + class QdManager(object): """ diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index c9a65a2..84ce7b4 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -1673,19 +1673,24 @@ class LinkRouteProxyTest(TestCase): while self._get_address(router, address): sleep(0.1) - def _test_traffic(self, sender, receiver, address, count=5): + def _test_traffic(self, sender, receiver, address, count=5, message=None): """Generate message traffic between two normal clients""" + error = None tr = AsyncTestReceiver(receiver, address) - ts = AsyncTestSender(sender, address, count) + ts = AsyncTestSender(sender, address, count, + message=message) ts.wait() # wait until all sent for i in range(count): try: tr.queue.get() except AsyncTestReceiver.Empty: - return "Sender Stats=" + ts.get_msg_stats() + " Receiver Queue Stats=" + tr.get_queue_stats() + error = "Sender Stats=" + ts.get_msg_stats() + "\n Receiver Queue Stats=" + tr.get_queue_stats() tr.stop() - return None + if error: + tr.dump_log() + ts.dump_log() + return error def test_01_immedate_detach_reattach(self): if self.skip['test_01'] : @@ -1850,10 +1855,15 @@ class LinkRouteProxyTest(TestCase): self.INT_B.listener, "Edge1/One", count=5) - self.assertIsNone(out, out) fs.join() - self.assertEqual(5, fs.in_count) - self.assertEqual(5, fs.out_count) + + try: + self.assertIsNone(out, out) + self.assertEqual(5, fs.in_count) + self.assertEqual(5, fs.out_count) + except AssertionError: + fs.dump_log() + raise er.teardown() self._wait_address_gone(self.INT_B, "Edge1/*") @@ -1933,6 +1943,7 @@ class LinkRouteProxyTest(TestCase): self.skipTest("Test skipped during development.") a_type = 'org.apache.qpid.dispatch.router.address' + test_msg = Message(body="test_51_link_route_proxy_configured") fs = FakeService(self.EA1.route_container) self.INT_B.wait_address("CfgLinkRoute1", count=2) @@ -1940,11 +1951,17 @@ class LinkRouteProxyTest(TestCase): out = self._test_traffic(self.INT_B.listener, self.INT_B.listener, "CfgLinkRoute1/hi", - count=5) - self.assertIsNone(out, out) + count=5, + message=test_msg) fs.join() - self.assertEqual(5, fs.in_count) - self.assertEqual(5, fs.out_count) + + try: + self.assertIsNone(out, out) + self.assertEqual(5, fs.in_count) + self.assertEqual(5, fs.out_count) + except AssertionError: + fs.dump_log() + raise # now that FakeService is gone, the link route should no longer be # active: @@ -1958,12 +1975,13 @@ class LinkRouteProxyTest(TestCase): out = self._test_traffic(self.INT_A.listener, self.INT_A.listener, "MATCH.cfg.pattern", - count=5) - self.assertIsNone(out, out) - + count=5, + message=test_msg) fs.join() + self.assertIsNone(out, out) self.assertEqual(5, fs.in_count) self.assertEqual(5, fs.out_count) + self._wait_address_gone(self.INT_A, "*.cfg.pattern.#") def test_52_conn_link_route_proxy(self): diff --git a/tests/test_broker.py b/tests/test_broker.py index 859cd93..61a15b7 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -29,7 +29,7 @@ from proton import Endpoint from proton.handlers import MessagingHandler from proton.reactor import Container -from system_test import TIMEOUT +from system_test import TIMEOUT, Logger class FakeBroker(MessagingHandler): @@ -37,10 +37,14 @@ class FakeBroker(MessagingHandler): A fake broker-like service that listens for client connections """ class _Queue(object): - def __init__(self, dynamic=False): + def __init__(self, name, logger, dynamic=False): self.dynamic = dynamic self.queue = collections.deque() self.consumers = [] + self.logger = logger + self.name = name + self.sent = 0 + self.recv = 0 def subscribe(self, consumer): self.consumers.append(consumer) @@ -51,6 +55,8 @@ class FakeBroker(MessagingHandler): return len(self.consumers) == 0 and (self.dynamic or len(self.queue) == 0) def publish(self, message): + self.recv += 1 + self.logger.log("Received message %d" % self.recv) self.queue.append(message) return self.dispatch() @@ -74,6 +80,9 @@ class FakeBroker(MessagingHandler): if c.credit: c.send(self.queue.popleft()) result += 1 + self.sent += 1 + self.logger.log("Sent message %d" % self.sent) + return result except IndexError: # no more messages return 0 @@ -90,6 +99,7 @@ class FakeBroker(MessagingHandler): self._error = None self._container = Container(self) self._container.container_id = container_id or 'FakeBroker' + self._logger = Logger(title=self._container.container_id) self._thread = Thread(target=self._main) self._thread.daemon = True self._stop_thread = False @@ -98,6 +108,7 @@ class FakeBroker(MessagingHandler): def _main(self): self._container.timeout = 1.0 self._container.start() + self._logger.log("Starting reactor thread") while self._container.process(): if self._stop_thread: @@ -107,11 +118,13 @@ class FakeBroker(MessagingHandler): for c in self._connections: c.close() self._connections = [] + self._logger.log("reactor thread done") def join(self): self._stop_thread = True self._container.wakeup() self._thread.join(timeout=TIMEOUT) + self._logger.log("thread done") if self._thread.is_alive(): raise Exception("FakeBroker did not exit") if self._error: @@ -122,7 +135,7 @@ class FakeBroker(MessagingHandler): def _queue(self, address): if address not in self.queues: - self.queues[address] = self._Queue() + self.queues[address] = self._Queue(address, self._logger) return self.queues[address] def on_link_opening(self, event): @@ -130,24 +143,29 @@ class FakeBroker(MessagingHandler): if event.link.remote_source.dynamic: address = str(uuid.uuid4()) event.link.source.address = address - q = self._Queue(True) + q = self._Queue(address, self._logger, True) self.queues[address] = q q.subscribe(event.link) + self._logger.log("dynamic sending link opened %s" % address) elif event.link.remote_source.address: event.link.source.address = event.link.remote_source.address self._queue(event.link.source.address).subscribe(event.link) + self._logger.log("sending link opened %s" % event.link.source.address) elif event.link.remote_target.address: event.link.target.address = event.link.remote_target.address + self._logger.log("receiving link opened %s" % event.link.target.address) def _unsubscribe(self, link): if link.source.address in self.queues and self.queues[link.source.address].unsubscribe(link): del self.queues[link.source.address] def on_link_error(self, event): + self._logger.log("link error") self.link_errors += 1 self.on_link_closing(event) def on_link_closing(self, event): + self._logger.log("link closing") if event.link.is_sender: self._unsubscribe(event.link) @@ -156,12 +174,14 @@ class FakeBroker(MessagingHandler): pn_conn.container = self._container.container_id def on_connection_opened(self, event): + self._logger.log("connection opened") self._connections.append(event.connection) def on_connection_closing(self, event): self.remove_stale_consumers(event.connection) def on_connection_closed(self, event): + self._logger.log("connection closed") try: self._connections.remove(event.connection) except ValueError: @@ -184,6 +204,9 @@ class FakeBroker(MessagingHandler): self.in_count += 1 self.out_count += self._queue(event.link.target.address).publish(event.message) + def dump_log(self): + self._logger.dump() + class FakeService(FakeBroker): """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org