This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new bc98248  DISPATCH-1757: Add error logging for system_tests_edge_router
bc98248 is described below

commit bc9824899e55af427771e7a57e85311aa41eee76
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Tue Jul 20 11:41:33 2021 -0400

    DISPATCH-1757: Add error logging for system_tests_edge_router
    
    This closes #1313
---
 tests/system_test.py              | 31 ++++++++++++++++++++++++++
 tests/system_tests_edge_router.py | 46 +++++++++++++++++++++++++++------------
 tests/test_broker.py              | 31 ++++++++++++++++++++++----
 3 files changed, 90 insertions(+), 18 deletions(-)

diff --git a/tests/system_test.py b/tests/system_test.py
index 01cb585..94a6613 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -868,11 +868,15 @@ class AsyncTestReceiver(MessagingHandler):
         def get(self, timeout=TIMEOUT):
             self._async_receiver.num_queue_gets += 1
             msg = super(AsyncTestReceiver.MyQueue, self).get(timeout=timeout)
+            self._async_receiver._logger.log("message %d get"
+                                             % 
self._async_receiver.num_queue_gets)
             return msg
 
         def put(self, msg):
             self._async_receiver.num_queue_puts += 1
             super(AsyncTestReceiver.MyQueue, self).put(msg)
+            self._async_receiver._logger.log("message %d put"
+                                             % 
self._async_receiver.num_queue_puts)
 
     def __init__(self, address, source, conn_args=None, container_id=None,
                  wait=True, recover_link=False, msg_args=None):
@@ -892,6 +896,7 @@ class AsyncTestReceiver(MessagingHandler):
         self._recover_count = 0
         self._stop_thread = False
         self._thread = Thread(target=self._main)
+        self._logger = Logger(title="AsyncTestReceiver %s" % cid)
         self._thread.daemon = True
         self._thread.start()
         self.num_queue_puts = 0
@@ -906,16 +911,19 @@ class AsyncTestReceiver(MessagingHandler):
     def _main(self):
         self._container.timeout = 0.5
         self._container.start()
+        self._logger.log("Starting reactor")
         while self._container.process():
             if self._stop_thread:
                 if self._conn:
                     self._conn.close()
                     self._conn = None
+        self._logger.log("reactor thread done")
 
     def stop(self, timeout=TIMEOUT):
         self._stop_thread = True
         self._container.wakeup()
         self._thread.join(timeout=TIMEOUT)
+        self._logger.log("thread done")
         if self._thread.is_alive():
             raise Exception("AsyncTestReceiver did not exit")
         del self._conn
@@ -928,13 +936,16 @@ class AsyncTestReceiver(MessagingHandler):
         self._conn = event.container.connect(**kwargs)
 
     def on_connection_opened(self, event):
+        self._logger.log("Connection opened")
         kwargs = {'source': self.source}
         event.container.create_receiver(event.connection, **kwargs)
 
     def on_link_opened(self, event):
+        self._logger.log("link opened")
         self._ready.set()
 
     def on_link_closing(self, event):
+        self._logger.log("link closing")
         event.link.close()
         if self._recover_link and not self._stop_thread:
             # lesson learned: the generated link name will be the same as the
@@ -951,10 +962,14 @@ class AsyncTestReceiver(MessagingHandler):
     def on_disconnected(self, event):
         # if remote terminates the connection kill the thread else it will spin
         # on the cpu
+        self._logger.log("Disconnected")
         if self._conn:
             self._conn.close()
             self._conn = None
 
+    def dump_log(self):
+        self._logger.dump()
+
 
 class AsyncTestSender(MessagingHandler):
     """
@@ -990,14 +1005,17 @@ class AsyncTestSender(MessagingHandler):
         self._link_name = "%s-%s" % (cid, "tx")
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
+        self._logger = Logger(title="AsyncTestSender %s" % cid)
         self._thread.start()
         self.msg_stats = "self.sent=%d, self.accepted=%d, self.released=%d, 
self.modified=%d, self.rejected=%d"
 
     def _main(self):
         self._container.timeout = 0.5
         self._container.start()
+        self._logger.log("Starting reactor")
         while self._container.process():
             self._check_if_done()
+        self._logger.log("reactor thread done")
 
     def get_msg_stats(self):
         return self.msg_stats % (self.sent, self.accepted, self.released, 
self.modified, self.rejected)
@@ -1005,6 +1023,7 @@ class AsyncTestSender(MessagingHandler):
     def wait(self):
         # don't stop it - wait until everything is sent
         self._thread.join(timeout=TIMEOUT)
+        self._logger.log("thread done")
         assert not self._thread.is_alive(), "sender did not complete"
         if self.error:
             raise AsyncTestSender.TestSenderException(self.error)
@@ -1016,6 +1035,7 @@ class AsyncTestSender(MessagingHandler):
         self._conn = self._container.connect(self.address)
 
     def on_connection_opened(self, event):
+        self._logger.log("Connection opened")
         option = AtMostOnce if self.presettle else AtLeastOnce
         self._sender = self._container.create_sender(self._conn,
                                                      target=self.target,
@@ -1026,6 +1046,7 @@ class AsyncTestSender(MessagingHandler):
         if self.sent < self.total:
             self._sender.send(self._message)
             self.sent += 1
+            self._logger.log("message %d sent" % self.sent)
 
     def _check_if_done(self):
         done = (self.sent == self.total
@@ -1037,10 +1058,12 @@ class AsyncTestSender(MessagingHandler):
                                             self.address)
             self._conn.close()
             self._conn = None
+            self._logger.log("Connection closed")
 
     def on_accepted(self, event):
         self.accepted += 1
         event.delivery.settle()
+        self._logger.log("message %d accepted" % self.accepted)
 
     def on_released(self, event):
         # for some reason Proton 'helpfully' calls on_released even though the
@@ -1049,17 +1072,21 @@ class AsyncTestSender(MessagingHandler):
             return self.on_modified(event)
         self.released += 1
         event.delivery.settle()
+        self._logger.log("message %d released" % self.released)
 
     def on_modified(self, event):
         self.modified += 1
         event.delivery.settle()
+        self._logger.log("message %d modified" % self.modified)
 
     def on_rejected(self, event):
         self.rejected += 1
         event.delivery.settle()
+        self._logger.log("message %d rejected" % self.rejected)
 
     def on_link_error(self, event):
         self.error = "link error:%s" % str(event.link.remote_condition)
+        self._logger.log(self.error)
         if self._conn:
             self._conn.close()
             self._conn = None
@@ -1068,10 +1095,14 @@ class AsyncTestSender(MessagingHandler):
         # if remote terminates the connection kill the thread else it will spin
         # on the cpu
         self.error = "connection to remote dropped"
+        self._logger.log(self.error)
         if self._conn:
             self._conn.close()
             self._conn = None
 
+    def dump_log(self):
+        self._logger.dump()
+
 
 class QdManager(object):
     """
diff --git a/tests/system_tests_edge_router.py 
b/tests/system_tests_edge_router.py
index c9a65a2..84ce7b4 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -1673,19 +1673,24 @@ class LinkRouteProxyTest(TestCase):
         while self._get_address(router, address):
             sleep(0.1)
 
-    def _test_traffic(self, sender, receiver, address, count=5):
+    def _test_traffic(self, sender, receiver, address, count=5, message=None):
         """Generate message traffic between two normal clients"""
+        error = None
         tr = AsyncTestReceiver(receiver, address)
-        ts = AsyncTestSender(sender, address, count)
+        ts = AsyncTestSender(sender, address, count,
+                             message=message)
         ts.wait()  # wait until all sent
         for i in range(count):
             try:
                 tr.queue.get()
             except AsyncTestReceiver.Empty:
-                return "Sender Stats=" + ts.get_msg_stats() + " Receiver Queue 
Stats=" + tr.get_queue_stats()
+                error = "Sender Stats=" + ts.get_msg_stats() + "\n Receiver 
Queue Stats=" + tr.get_queue_stats()
 
         tr.stop()
-        return None
+        if error:
+            tr.dump_log()
+            ts.dump_log()
+        return error
 
     def test_01_immedate_detach_reattach(self):
         if self.skip['test_01'] :
@@ -1850,10 +1855,15 @@ class LinkRouteProxyTest(TestCase):
                                  self.INT_B.listener,
                                  "Edge1/One",
                                  count=5)
-        self.assertIsNone(out, out)
         fs.join()
-        self.assertEqual(5, fs.in_count)
-        self.assertEqual(5, fs.out_count)
+
+        try:
+            self.assertIsNone(out, out)
+            self.assertEqual(5, fs.in_count)
+            self.assertEqual(5, fs.out_count)
+        except AssertionError:
+            fs.dump_log()
+            raise
 
         er.teardown()
         self._wait_address_gone(self.INT_B, "Edge1/*")
@@ -1933,6 +1943,7 @@ class LinkRouteProxyTest(TestCase):
             self.skipTest("Test skipped during development.")
 
         a_type = 'org.apache.qpid.dispatch.router.address'
+        test_msg = Message(body="test_51_link_route_proxy_configured")
 
         fs = FakeService(self.EA1.route_container)
         self.INT_B.wait_address("CfgLinkRoute1", count=2)
@@ -1940,11 +1951,17 @@ class LinkRouteProxyTest(TestCase):
         out = self._test_traffic(self.INT_B.listener,
                                  self.INT_B.listener,
                                  "CfgLinkRoute1/hi",
-                                 count=5)
-        self.assertIsNone(out, out)
+                                 count=5,
+                                 message=test_msg)
         fs.join()
-        self.assertEqual(5, fs.in_count)
-        self.assertEqual(5, fs.out_count)
+
+        try:
+            self.assertIsNone(out, out)
+            self.assertEqual(5, fs.in_count)
+            self.assertEqual(5, fs.out_count)
+        except AssertionError:
+            fs.dump_log()
+            raise
 
         # now that FakeService is gone, the link route should no longer be
         # active:
@@ -1958,12 +1975,13 @@ class LinkRouteProxyTest(TestCase):
         out = self._test_traffic(self.INT_A.listener,
                                  self.INT_A.listener,
                                  "MATCH.cfg.pattern",
-                                 count=5)
-        self.assertIsNone(out, out)
-
+                                 count=5,
+                                 message=test_msg)
         fs.join()
+        self.assertIsNone(out, out)
         self.assertEqual(5, fs.in_count)
         self.assertEqual(5, fs.out_count)
+
         self._wait_address_gone(self.INT_A, "*.cfg.pattern.#")
 
     def test_52_conn_link_route_proxy(self):
diff --git a/tests/test_broker.py b/tests/test_broker.py
index 859cd93..61a15b7 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -29,7 +29,7 @@ from proton import Endpoint
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
 
-from system_test import TIMEOUT
+from system_test import TIMEOUT, Logger
 
 
 class FakeBroker(MessagingHandler):
@@ -37,10 +37,14 @@ class FakeBroker(MessagingHandler):
     A fake broker-like service that listens for client connections
     """
     class _Queue(object):
-        def __init__(self, dynamic=False):
+        def __init__(self, name, logger, dynamic=False):
             self.dynamic = dynamic
             self.queue = collections.deque()
             self.consumers = []
+            self.logger = logger
+            self.name = name
+            self.sent = 0
+            self.recv = 0
 
         def subscribe(self, consumer):
             self.consumers.append(consumer)
@@ -51,6 +55,8 @@ class FakeBroker(MessagingHandler):
             return len(self.consumers) == 0 and (self.dynamic or 
len(self.queue) == 0)
 
         def publish(self, message):
+            self.recv += 1
+            self.logger.log("Received message %d" % self.recv)
             self.queue.append(message)
             return self.dispatch()
 
@@ -74,6 +80,9 @@ class FakeBroker(MessagingHandler):
                     if c.credit:
                         c.send(self.queue.popleft())
                         result += 1
+                        self.sent += 1
+                        self.logger.log("Sent message %d" % self.sent)
+
                 return result
             except IndexError:  # no more messages
                 return 0
@@ -90,6 +99,7 @@ class FakeBroker(MessagingHandler):
         self._error = None
         self._container = Container(self)
         self._container.container_id = container_id or 'FakeBroker'
+        self._logger = Logger(title=self._container.container_id)
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
         self._stop_thread = False
@@ -98,6 +108,7 @@ class FakeBroker(MessagingHandler):
     def _main(self):
         self._container.timeout = 1.0
         self._container.start()
+        self._logger.log("Starting reactor thread")
 
         while self._container.process():
             if self._stop_thread:
@@ -107,11 +118,13 @@ class FakeBroker(MessagingHandler):
                 for c in self._connections:
                     c.close()
                 self._connections = []
+        self._logger.log("reactor thread done")
 
     def join(self):
         self._stop_thread = True
         self._container.wakeup()
         self._thread.join(timeout=TIMEOUT)
+        self._logger.log("thread done")
         if self._thread.is_alive():
             raise Exception("FakeBroker did not exit")
         if self._error:
@@ -122,7 +135,7 @@ class FakeBroker(MessagingHandler):
 
     def _queue(self, address):
         if address not in self.queues:
-            self.queues[address] = self._Queue()
+            self.queues[address] = self._Queue(address, self._logger)
         return self.queues[address]
 
     def on_link_opening(self, event):
@@ -130,24 +143,29 @@ class FakeBroker(MessagingHandler):
             if event.link.remote_source.dynamic:
                 address = str(uuid.uuid4())
                 event.link.source.address = address
-                q = self._Queue(True)
+                q = self._Queue(address, self._logger, True)
                 self.queues[address] = q
                 q.subscribe(event.link)
+                self._logger.log("dynamic sending link opened %s" % address)
             elif event.link.remote_source.address:
                 event.link.source.address = event.link.remote_source.address
                 self._queue(event.link.source.address).subscribe(event.link)
+                self._logger.log("sending link opened %s" % 
event.link.source.address)
         elif event.link.remote_target.address:
             event.link.target.address = event.link.remote_target.address
+            self._logger.log("receiving link opened %s" % 
event.link.target.address)
 
     def _unsubscribe(self, link):
         if link.source.address in self.queues and 
self.queues[link.source.address].unsubscribe(link):
             del self.queues[link.source.address]
 
     def on_link_error(self, event):
+        self._logger.log("link error")
         self.link_errors += 1
         self.on_link_closing(event)
 
     def on_link_closing(self, event):
+        self._logger.log("link closing")
         if event.link.is_sender:
             self._unsubscribe(event.link)
 
@@ -156,12 +174,14 @@ class FakeBroker(MessagingHandler):
         pn_conn.container = self._container.container_id
 
     def on_connection_opened(self, event):
+        self._logger.log("connection opened")
         self._connections.append(event.connection)
 
     def on_connection_closing(self, event):
         self.remove_stale_consumers(event.connection)
 
     def on_connection_closed(self, event):
+        self._logger.log("connection closed")
         try:
             self._connections.remove(event.connection)
         except ValueError:
@@ -184,6 +204,9 @@ class FakeBroker(MessagingHandler):
         self.in_count += 1
         self.out_count += 
self._queue(event.link.target.address).publish(event.message)
 
+    def dump_log(self):
+        self._logger.dump()
+
 
 class FakeService(FakeBroker):
     """

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to