This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 53cff646b9 Revert "[AMQ-9392] Prevent InactivityMonitor read check
Timer leak when TCP c…" (#1528)
53cff646b9 is described below
commit 53cff646b993ca02889174a6b9a49a0fc91bc27c
Author: JB Onofré <[email protected]>
AuthorDate: Tue Nov 4 20:48:35 2025 +0100
Revert "[AMQ-9392] Prevent InactivityMonitor read check Timer leak when TCP
c…" (#1528)
This reverts commit b6037c720bcdbea1821982d78794313a3cf50150.
---
.../transport/AbstractInactivityMonitor.java | 23 +---
.../transport/tcp/InactivityMonitorTest.java | 119 +++++++++------------
2 files changed, 56 insertions(+), 86 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
index 9db5fa1462..6182be7c42 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
@@ -437,17 +437,6 @@ public abstract class AbstractInactivityMonitor extends
TransportFilter {
synchronized (AbstractInactivityMonitor.class) {
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
- if (CHECKER_COUNTER == 0) {
- if (READ_CHECK_TIMER != null) {
- READ_CHECK_TIMER.cancel();
- READ_CHECK_TIMER = null;
- }
- try {
- ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
- } finally {
- ASYNC_TASKS = null;
- }
- }
}
}
}
@@ -508,14 +497,10 @@ public abstract class AbstractInactivityMonitor extends
TransportFilter {
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if (CHECKER_COUNTER == 0) {
- if (WRITE_CHECK_TIMER != null) {
- WRITE_CHECK_TIMER.cancel();
- WRITE_CHECK_TIMER = null;
- }
- if (READ_CHECK_TIMER != null) {
- READ_CHECK_TIMER.cancel();
- READ_CHECK_TIMER = null;
- }
+ WRITE_CHECK_TIMER.cancel();
+ READ_CHECK_TIMER.cancel();
+ WRITE_CHECK_TIMER = null;
+ READ_CHECK_TIMER = null;
try {
ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
} finally {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index e8d07202b7..4e5415a91d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -16,18 +16,9 @@
*/
package org.apache.activemq.transport.tcp;
-import static java.lang.Thread.getAllStackTraces;
-import static java.util.stream.Collectors.toList;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsCollectionContaining.hasItem;
-import static org.hamcrest.core.IsNot.not;
-
import java.io.IOException;
-import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,7 +33,6 @@ import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
-import org.hamcrest.Matcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +73,32 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
*/
private void startClient() throws Exception, URISyntaxException {
clientTransport = TransportFactory.connect(new URI("tcp://localhost:"
+ serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
- clientTransport.setTransportListener(new
TestClientTransportListener());
+ clientTransport.setTransportListener(new TransportListener() {
+ @Override
+ public void onCommand(Object command) {
+ clientReceiveCount.incrementAndGet();
+ if (clientRunOnCommand != null) {
+ clientRunOnCommand.run();
+ }
+ }
+
+ @Override
+ public void onException(IOException error) {
+ if (!ignoreClientError.get()) {
+ LOG.info("Client transport error:");
+ error.printStackTrace();
+ clientErrorCount.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void transportInterupted() {
+ }
+
+ @Override
+ public void transportResumed() {
+ }
+ });
clientTransport.start();
}
@@ -166,7 +181,32 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
// Manually create a client transport so that it does not send
KeepAlive
// packets. this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(),
SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
- clientTransport.setTransportListener(new
TestClientTransportListener());
+ clientTransport.setTransportListener(new TransportListener() {
+ @Override
+ public void onCommand(Object command) {
+ clientReceiveCount.incrementAndGet();
+ if (clientRunOnCommand != null) {
+ clientRunOnCommand.run();
+ }
+ }
+
+ @Override
+ public void onException(IOException error) {
+ if (!ignoreClientError.get()) {
+ LOG.info("Client transport error:");
+ error.printStackTrace();
+ clientErrorCount.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void transportInterupted() {
+ }
+
+ @Override
+ public void transportResumed() {
+ }
+ });
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
@@ -197,34 +237,6 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
assertEquals(0, serverErrorCount.get());
}
- public void testReadCheckTimerIsNotLeakedOnError() throws Exception {
- // Intentionally picks a port that is not the listening port to
generate a failure
- clientTransport = TransportFactory.connect(new URI("tcp://localhost:"
+ (serverPort ^ 1)));
- clientTransport.setTransportListener(new
TestClientTransportListener());
-
- // Control test to verify there was no timer from a previous test
- assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
-
- try {
- clientTransport.start();
- fail("A ConnectionException was expected");
- } catch (SocketException e) {
- // A SocketException is expected.
- }
-
- // If there is any read check timer at this point, calling stop should
clean it up (because CHECKER_COUNTER becomes 0)
- clientTransport.stop();
- assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
- }
-
- private static Matcher<Iterable<? super String>> hasReadCheckTimer() {
- return hasItem("ActiveMQ InactivityMonitor ReadCheckTimer");
- }
-
- private static List<String> getCurrentThreadNames() {
- return
getAllStackTraces().keySet().stream().map(Thread::getName).collect(toList());
- }
-
/**
* Used to test when a operation blocks. This should not cause transport to
* get disconnected.
@@ -260,31 +272,4 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get());
}
-
- private class TestClientTransportListener implements TransportListener {
- @Override
- public void onCommand(Object command) {
- clientReceiveCount.incrementAndGet();
- if (clientRunOnCommand != null) {
- clientRunOnCommand.run();
- }
- }
-
- @Override
- public void onException(IOException error) {
- if (!ignoreClientError.get()) {
- LOG.info("Client transport error:");
- error.printStackTrace();
- clientErrorCount.incrementAndGet();
- }
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact