This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new e13d1ce DISPATCH-2036: do not log TCP deliveries as stuck
e13d1ce is described below
commit e13d1ce7473268c115048b4c473e3c0fa2da0b8a
Author: Kenneth Giusti <[email protected]>
AuthorDate: Fri Nov 12 10:55:49 2021 -0500
DISPATCH-2036: do not log TCP deliveries as stuck
This closes #1434
---
.../stuck_delivery_detection/delivery_tracker.c | 6 +
tests/system_tests_tcp_adaptor.py | 143 +++++++++++++++++++++
2 files changed, 149 insertions(+)
diff --git
a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
index c9bea1e..a348011 100644
--- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
+++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
@@ -46,6 +46,12 @@ struct tracker_t {
static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link,
qdr_delivery_t *dlv)
{
+ // DISPATCH-2036: ignore "infinitely long" streaming messages (like TCP
+ // adaptor deliveries)
+ if (dlv->msg && qd_message_is_streaming(dlv->msg)) {
+ return;
+ }
+
if (!dlv->stuck && ((core->uptime_ticks - link->core_ticks) > stuck_age)) {
dlv->stuck = true;
link->deliveries_stuck++;
diff --git a/tests/system_tests_tcp_adaptor.py
b/tests/system_tests_tcp_adaptor.py
index 7db9510..1b0be7f 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -978,6 +978,149 @@ class TcpAdaptor(TestCase):
self.logger.log(tname + " SUCCESS")
+class TcpAdaptorStuckDeliveryTest(TestCase):
+ """
+ Verify that the routers stuck delivery detection is not applied to TCP
+ deliveries. See Dispatch-2036.
+ """
+ @classmethod
+ def setUpClass(cls):
+ super(TcpAdaptorStuckDeliveryTest, cls).setUpClass()
+
+ if DISABLE_SELECTOR_TESTS:
+ return
+
+ cls.test_name = 'TCPStuckDeliveryTest'
+
+ # Topology: linear.
+ # tcp-client->edge1->interior1->interior2->edge2->tcp-server
+
+ cls.edge1_tcp_listener_port = cls.tester.get_port()
+ cls.int1_edge_listener_port = cls.tester.get_port()
+ cls.int2_interior_listener_port = cls.tester.get_port()
+ cls.int2_edge_listener_port = cls.tester.get_port()
+ cls.edge2_tcp_connector_port = cls.tester.get_port()
+
+ def router(cls, name, mode, config):
+ base_config = [
+ ('router', {'mode': mode,
+ 'id': name}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port()}),
+ ('address', {'prefix': 'closest', 'distribution':
'closest'}),
+ ('address', {'prefix': 'multicast', 'distribution':
'multicast'}),
+ ]
+ config = base_config + config
+ config = Qdrouterd.Config(config)
+ return cls.tester.qdrouterd(name, config, wait=False,
+ # enable stuck delivery short timer
+ # (3 seconds)
+ cl_args=['-T'])
+
+ cls.i_router1 = router(cls,
+ "%s_I1" % cls.test_name,
+ "interior",
+ [('listener', {'role': 'edge',
+ 'host': '0.0.0.0',
+ 'port':
+ cls.int1_edge_listener_port}),
+ ('connector', {'role': 'inter-router',
+ 'port':
+
cls.int2_interior_listener_port})])
+ cls.i_router2 = router(cls,
+ "%s_I2" % cls.test_name,
+ "interior",
+ [('listener', {'role': 'edge',
+ 'host': '0.0.0.0',
+ 'port':
+ cls.int2_edge_listener_port}),
+ ('listener', {'role': 'inter-router',
+ 'host': '0.0.0.0',
+ 'port':
+
cls.int2_interior_listener_port})])
+ cls.e_router1 = router(cls,
+ "%s_E1" % cls.test_name,
+ "edge",
+ [('tcpListener', {'host': "0.0.0.0",
+ 'port':
+ cls.edge1_tcp_listener_port,
+ 'address': 'nostuck'}),
+ ('connector', {'role': 'edge',
+ 'port':
+ cls.int1_edge_listener_port})])
+ cls.e_router2 = router(cls,
+ "%s_E2" % cls.test_name,
+ "edge",
+ [('tcpConnector', {'host': "127.0.0.1",
+ 'port':
+ cls.edge2_tcp_connector_port,
+ 'address': 'nostuck'}),
+ ('connector', {'role': 'edge',
+ 'port':
+ cls.int1_edge_listener_port})])
+ cls.i_router1.wait_ready()
+ cls.i_router2.wait_ready()
+ cls.e_router1.wait_ready()
+ cls.e_router2.wait_ready()
+
+ @unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
+ def test_01_ignore_stuck_deliveries(self):
+ """
+ Create a TCP delivery, wait for it to be classified as stuck and ensure
+ that no stuck delivery logs are posted
+ """
+
+ # step 1: create a TCP listener for edge2 to connect to
+ try:
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server.settimeout(TIMEOUT)
+ server.bind(("", self.edge2_tcp_connector_port))
+ server.listen(1)
+ except Exception as exc:
+ print("%s: failed creating tcp server: %s" % (self.test_name, exc),
+ flush=True)
+ raise
+
+ # step 2: initiate client connection to edge1.
+ try:
+ client_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ client_conn.settimeout(TIMEOUT)
+ client_conn.connect(('127.0.0.1', self.edge1_tcp_listener_port))
+ except Exception as exc:
+ print("%s: failed creating client connection: %s" %
(self.test_name,
+ exc),
+ flush=True)
+ raise
+
+ # step 3: accept the connection request from edge2:
+ try:
+ server_conn, addr = server.accept()
+ server.close()
+ except Exception as exc:
+ print("%s: failed to accept at server: %s" % (self.test_name,
+ exc),
+ flush=True)
+ raise
+
+ # step 4: now wait long enough for the stuck delivery detection cycle
+ # to run on each router (approx 5 seconds)
+ time.sleep(12)
+ client_conn.close()
+ server_conn.close()
+
+ # step 5: expect no log messages for stuck delivery were issued
+ for router in [self.e_router1, self.i_router1, self.i_router2,
+ self.e_router2]:
+ logfile = router.logfile_path
+ router.teardown()
+ with io.open(logfile) as f:
+ for line in f:
+ self.assertNotIn("Stuck delivery: At least one delivery",
+ line,
+ "Stuck deliveries should not be logged!")
+
+
class TcpAdaptorManagementTest(TestCase):
"""
Test Creation and deletion of TCP management entities
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]