This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new e13d1ce  DISPATCH-2036: do not log TCP deliveries as stuck
e13d1ce is described below

commit e13d1ce7473268c115048b4c473e3c0fa2da0b8a
Author: Kenneth Giusti <[email protected]>
AuthorDate: Fri Nov 12 10:55:49 2021 -0500

    DISPATCH-2036: do not log TCP deliveries as stuck
    
    This closes #1434
---
 .../stuck_delivery_detection/delivery_tracker.c    |   6 +
 tests/system_tests_tcp_adaptor.py                  | 143 +++++++++++++++++++++
 2 files changed, 149 insertions(+)

diff --git 
a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c 
b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
index c9bea1e..a348011 100644
--- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
+++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
@@ -46,6 +46,12 @@ struct tracker_t {
 
 static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_delivery_t *dlv)
 {
+    // DISPATCH-2036: ignore "infinitely long" streaming messages (like TCP
+    // adaptor deliveries)
+    if (dlv->msg && qd_message_is_streaming(dlv->msg)) {
+        return;
+    }
+
     if (!dlv->stuck && ((core->uptime_ticks - link->core_ticks) > stuck_age)) {
         dlv->stuck = true;
         link->deliveries_stuck++;
diff --git a/tests/system_tests_tcp_adaptor.py 
b/tests/system_tests_tcp_adaptor.py
index 7db9510..1b0be7f 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -978,6 +978,149 @@ class TcpAdaptor(TestCase):
         self.logger.log(tname + " SUCCESS")
 
 
+class TcpAdaptorStuckDeliveryTest(TestCase):
+    """
+    Verify that the routers stuck delivery detection is not applied to TCP
+    deliveries.  See Dispatch-2036.
+    """
+    @classmethod
+    def setUpClass(cls):
+        super(TcpAdaptorStuckDeliveryTest, cls).setUpClass()
+
+        if DISABLE_SELECTOR_TESTS:
+            return
+
+        cls.test_name = 'TCPStuckDeliveryTest'
+
+        # Topology: linear.
+        # tcp-client->edge1->interior1->interior2->edge2->tcp-server
+
+        cls.edge1_tcp_listener_port = cls.tester.get_port()
+        cls.int1_edge_listener_port = cls.tester.get_port()
+        cls.int2_interior_listener_port = cls.tester.get_port()
+        cls.int2_edge_listener_port = cls.tester.get_port()
+        cls.edge2_tcp_connector_port = cls.tester.get_port()
+
+        def router(cls, name, mode, config):
+            base_config = [
+                ('router', {'mode': mode,
+                            'id': name}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port()}),
+                ('address', {'prefix': 'closest',   'distribution': 
'closest'}),
+                ('address', {'prefix': 'multicast', 'distribution': 
'multicast'}),
+            ]
+            config = base_config + config
+            config = Qdrouterd.Config(config)
+            return cls.tester.qdrouterd(name, config, wait=False,
+                                        # enable stuck delivery short timer
+                                        # (3 seconds)
+                                        cl_args=['-T'])
+
+        cls.i_router1 = router(cls,
+                               "%s_I1" % cls.test_name,
+                               "interior",
+                               [('listener', {'role': 'edge',
+                                              'host': '0.0.0.0',
+                                              'port':
+                                              cls.int1_edge_listener_port}),
+                                ('connector', {'role': 'inter-router',
+                                               'port':
+                                               
cls.int2_interior_listener_port})])
+        cls.i_router2 = router(cls,
+                               "%s_I2" % cls.test_name,
+                               "interior",
+                               [('listener', {'role': 'edge',
+                                              'host': '0.0.0.0',
+                                              'port':
+                                              cls.int2_edge_listener_port}),
+                                ('listener', {'role': 'inter-router',
+                                              'host': '0.0.0.0',
+                                              'port':
+                                              
cls.int2_interior_listener_port})])
+        cls.e_router1 = router(cls,
+                               "%s_E1" % cls.test_name,
+                               "edge",
+                               [('tcpListener', {'host': "0.0.0.0",
+                                                 'port':
+                                                 cls.edge1_tcp_listener_port,
+                                                 'address': 'nostuck'}),
+                                ('connector', {'role': 'edge',
+                                               'port':
+                                               cls.int1_edge_listener_port})])
+        cls.e_router2 = router(cls,
+                               "%s_E2" % cls.test_name,
+                               "edge",
+                               [('tcpConnector', {'host': "127.0.0.1",
+                                                  'port':
+                                                  cls.edge2_tcp_connector_port,
+                                                  'address': 'nostuck'}),
+                                ('connector', {'role': 'edge',
+                                               'port':
+                                               cls.int1_edge_listener_port})])
+        cls.i_router1.wait_ready()
+        cls.i_router2.wait_ready()
+        cls.e_router1.wait_ready()
+        cls.e_router2.wait_ready()
+
+    @unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
+    def test_01_ignore_stuck_deliveries(self):
+        """
+        Create a TCP delivery, wait for it to be classified as stuck and ensure
+        that no stuck delivery logs are posted
+        """
+
+        # step 1: create a TCP listener for edge2 to connect to
+        try:
+            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            server.settimeout(TIMEOUT)
+            server.bind(("", self.edge2_tcp_connector_port))
+            server.listen(1)
+        except Exception as exc:
+            print("%s: failed creating tcp server: %s" % (self.test_name, exc),
+                  flush=True)
+            raise
+
+        # step 2: initiate client connection to edge1.
+        try:
+            client_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            client_conn.settimeout(TIMEOUT)
+            client_conn.connect(('127.0.0.1', self.edge1_tcp_listener_port))
+        except Exception as exc:
+            print("%s: failed creating client connection: %s" % 
(self.test_name,
+                                                                 exc),
+                  flush=True)
+            raise
+
+        # step 3: accept the connection request from edge2:
+        try:
+            server_conn, addr = server.accept()
+            server.close()
+        except Exception as exc:
+            print("%s: failed to accept at server: %s" % (self.test_name,
+                                                          exc),
+                  flush=True)
+            raise
+
+        # step 4: now wait long enough for the stuck delivery detection cycle
+        # to run on each router (approx 5 seconds)
+        time.sleep(12)
+        client_conn.close()
+        server_conn.close()
+
+        # step 5: expect no log messages for stuck delivery were issued
+        for router in [self.e_router1, self.i_router1, self.i_router2,
+                       self.e_router2]:
+            logfile = router.logfile_path
+            router.teardown()
+            with io.open(logfile) as f:
+                for line in f:
+                    self.assertNotIn("Stuck delivery: At least one delivery",
+                                     line,
+                                     "Stuck deliveries should not be logged!")
+
+
 class TcpAdaptorManagementTest(TestCase):
     """
     Test Creation and deletion of TCP management entities

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to