This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 53cff646b9 Revert "[AMQ-9392] Prevent InactivityMonitor read check 
Timer leak when TCP c…" (#1528)
53cff646b9 is described below

commit 53cff646b993ca02889174a6b9a49a0fc91bc27c
Author: JB Onofré <[email protected]>
AuthorDate: Tue Nov 4 20:48:35 2025 +0100

    Revert "[AMQ-9392] Prevent InactivityMonitor read check Timer leak when TCP 
c…" (#1528)
    
    This reverts commit b6037c720bcdbea1821982d78794313a3cf50150.
---
 .../transport/AbstractInactivityMonitor.java       |  23 +---
 .../transport/tcp/InactivityMonitorTest.java       | 119 +++++++++------------
 2 files changed, 56 insertions(+), 86 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
index 9db5fa1462..6182be7c42 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
@@ -437,17 +437,6 @@ public abstract class AbstractInactivityMonitor extends 
TransportFilter {
             synchronized (AbstractInactivityMonitor.class) {
                 READ_CHECK_TIMER.purge();
                 CHECKER_COUNTER--;
-                if (CHECKER_COUNTER == 0) {
-                    if (READ_CHECK_TIMER != null) {
-                        READ_CHECK_TIMER.cancel();
-                        READ_CHECK_TIMER = null;
-                    }
-                    try {
-                        ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
-                    } finally {
-                        ASYNC_TASKS = null;
-                    }
-                }
             }
         }
     }
@@ -508,14 +497,10 @@ public abstract class AbstractInactivityMonitor extends 
TransportFilter {
                 READ_CHECK_TIMER.purge();
                 CHECKER_COUNTER--;
                 if (CHECKER_COUNTER == 0) {
-                    if (WRITE_CHECK_TIMER != null) {
-                        WRITE_CHECK_TIMER.cancel();
-                        WRITE_CHECK_TIMER = null;
-                    }
-                    if (READ_CHECK_TIMER != null) {
-                        READ_CHECK_TIMER.cancel();
-                        READ_CHECK_TIMER = null;
-                    }
+                    WRITE_CHECK_TIMER.cancel();
+                    READ_CHECK_TIMER.cancel();
+                    WRITE_CHECK_TIMER = null;
+                    READ_CHECK_TIMER = null;
                     try {
                         ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
                     } finally {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index e8d07202b7..4e5415a91d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -16,18 +16,9 @@
  */
 package org.apache.activemq.transport.tcp;
 
-import static java.lang.Thread.getAllStackTraces;
-import static java.util.stream.Collectors.toList;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsCollectionContaining.hasItem;
-import static org.hamcrest.core.IsNot.not;
-
 import java.io.IOException;
-import java.net.SocketException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -42,7 +33,6 @@ import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.TransportServer;
-import org.hamcrest.Matcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +73,32 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
      */
     private void startClient() throws Exception, URISyntaxException {
         clientTransport = TransportFactory.connect(new URI("tcp://localhost:" 
+ serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
-        clientTransport.setTransportListener(new 
TestClientTransportListener());
+        clientTransport.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+                clientReceiveCount.incrementAndGet();
+                if (clientRunOnCommand != null) {
+                    clientRunOnCommand.run();
+                }
+            }
+
+            @Override
+            public void onException(IOException error) {
+                if (!ignoreClientError.get()) {
+                    LOG.info("Client transport error:");
+                    error.printStackTrace();
+                    clientErrorCount.incrementAndGet();
+                }
+            }
+
+            @Override
+            public void transportInterupted() {
+            }
+
+            @Override
+            public void transportResumed() {
+            }
+        });
 
         clientTransport.start();
     }
@@ -166,7 +181,32 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
         // Manually create a client transport so that it does not send 
KeepAlive
         // packets.  this should simulate a client hang.
         clientTransport = new TcpTransport(new OpenWireFormat(), 
SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
-        clientTransport.setTransportListener(new 
TestClientTransportListener());
+        clientTransport.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+                clientReceiveCount.incrementAndGet();
+                if (clientRunOnCommand != null) {
+                    clientRunOnCommand.run();
+                }
+            }
+
+            @Override
+            public void onException(IOException error) {
+                if (!ignoreClientError.get()) {
+                    LOG.info("Client transport error:");
+                    error.printStackTrace();
+                    clientErrorCount.incrementAndGet();
+                }
+            }
+
+            @Override
+            public void transportInterupted() {
+            }
+
+            @Override
+            public void transportResumed() {
+            }
+        });
 
         clientTransport.start();
         WireFormatInfo info = new WireFormatInfo();
@@ -197,34 +237,6 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
         assertEquals(0, serverErrorCount.get());
     }
 
-    public void testReadCheckTimerIsNotLeakedOnError() throws Exception {
-        // Intentionally picks a port that is not the listening port to 
generate a failure
-        clientTransport = TransportFactory.connect(new URI("tcp://localhost:" 
+ (serverPort ^ 1)));
-        clientTransport.setTransportListener(new 
TestClientTransportListener());
-
-        // Control test to verify there was no timer from a previous test
-        assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
-
-        try {
-            clientTransport.start();
-            fail("A ConnectionException was expected");
-        } catch (SocketException e) {
-            // A SocketException is expected.
-        }
-
-        // If there is any read check timer at this point, calling stop should 
clean it up (because CHECKER_COUNTER becomes 0)
-        clientTransport.stop();
-        assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
-    }
-
-    private static Matcher<Iterable<? super String>> hasReadCheckTimer() {
-        return hasItem("ActiveMQ InactivityMonitor ReadCheckTimer");
-    }
-
-    private static List<String> getCurrentThreadNames() {
-        return 
getAllStackTraces().keySet().stream().map(Thread::getName).collect(toList());
-    }
-
     /**
      * Used to test when a operation blocks. This should not cause transport to
      * get disconnected.
@@ -260,31 +272,4 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
         assertEquals(0, clientErrorCount.get());
         assertEquals(0, serverErrorCount.get());
     }
-
-    private class TestClientTransportListener implements TransportListener {
-        @Override
-        public void onCommand(Object command) {
-            clientReceiveCount.incrementAndGet();
-            if (clientRunOnCommand != null) {
-                clientRunOnCommand.run();
-            }
-        }
-
-        @Override
-        public void onException(IOException error) {
-            if (!ignoreClientError.get()) {
-                LOG.info("Client transport error:");
-                error.printStackTrace();
-                clientErrorCount.incrementAndGet();
-            }
-        }
-
-        @Override
-        public void transportInterupted() {
-        }
-
-        @Override
-        public void transportResumed() {
-        }
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to