This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new a8fa4b09b5 [AMQ-9860] [AMQ-9861] connection async exception handling
and activemq pool tests connection leaks to other tests (#1657)
a8fa4b09b5 is described below
commit a8fa4b09b54eaca06d106e754796798bd88e21e9
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Feb 11 18:14:56 2026 +0100
[AMQ-9860] [AMQ-9861] connection async exception handling and activemq pool
tests connection leaks to other tests (#1657)
* [AMQ-9861] activemq-pool tests have resource leaks causing test
interference
activemq-pool tests use default broker name causing BrokerRegistry
collisions
Missing waitUntilStopped() after broker.stop() in tests
* [AMQ-9860] Refactor async exception handling to improve resource
management and prevent task execution on closed connections
* Increase timeout for exception event propagation in
ConnectionFailureEvictsFromPoolTest
* Refactor PooledConnectionFactory to remove AutoCloseable implementation
and improve resource management in tests
---
.../org/apache/activemq/ActiveMQConnection.java | 91 +++++++++++-----------
activemq-pool/pom.xml | 6 ++
.../apache/activemq/pool/ConfigFromPropsTest.java | 18 +++--
.../pool/ConnectionFailureEvictsFromPoolTest.java | 89 +++++++++++----------
.../activemq/pool/PooledConnectionFactoryTest.java | 16 ++--
.../apache/activemq/pool/PooledConsumerTest.java | 7 +-
.../apache/activemq/pool/XAConnectionPoolTest.java | 81 +++++++++++--------
7 files changed, 176 insertions(+), 132 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 74a25b728a..b9f0eebf9a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -1949,12 +1950,7 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
@Override
public Response processConnectionError(final
ConnectionError error) throws Exception {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- onAsyncException(error.getException());
- }
- });
+ executeAsync(() ->
onAsyncException(error.getException()));
return null;
}
@@ -2018,18 +2014,12 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
* @param error the exception that the problem
*/
public void onClientInternalException(final Throwable error) {
- if ( !closed.get() && !closing.get() ) {
- if ( this.clientInternalExceptionListener != null ) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
-
ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
- }
- });
- } else {
- LOG.debug("Async client internal exception occurred with no
exception listener registered: {}",
- error, error);
+ if (this.clientInternalExceptionListener != null) {
+ if (!executeAsync(() ->
clientInternalExceptionListener.onException(error))) {
+ LOG.debug("Async client internal exception occurred but
connection is closing: {}", error, error);
}
+ } else {
+ LOG.debug("Async client internal exception occurred with no
exception listener registered: {}", error, error);
}
}
@@ -2045,14 +2035,8 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
if (!(error instanceof JMSException)) {
error = JMSExceptionSupport.create(error);
}
- final JMSException e = (JMSException)error;
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
-
ActiveMQConnection.this.exceptionListener.onException(e);
- }
- });
+ final JMSException e = (JMSException) error;
+ executeAsync(() -> exceptionListener.onException(e));
} else {
LOG.debug("Async exception with no exception listener: {}",
error, error);
@@ -2063,25 +2047,19 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
@Override
public void onException(final IOException error) {
onAsyncException(error);
- if (!closed.get() && !closing.get()) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- transportFailed(error);
- ServiceSupport.dispose(ActiveMQConnection.this.transport);
- brokerInfoReceived.countDown();
- try {
- doCleanup(true);
- } catch (JMSException e) {
- LOG.warn("Exception during connection cleanup, " + e,
e);
- }
- for (Iterator<TransportListener> iter =
transportListeners.iterator(); iter.hasNext();) {
- TransportListener listener = iter.next();
- listener.onException(error);
- }
- }
- });
- }
+ executeAsync(() -> {
+ transportFailed(error);
+ ServiceSupport.dispose(ActiveMQConnection.this.transport);
+ brokerInfoReceived.countDown();
+ try {
+ doCleanup(true);
+ } catch (JMSException e) {
+ LOG.warn("Exception during connection cleanup, " + e, e);
+ }
+ for (final TransportListener listener : transportListeners) {
+ listener.onException(error);
+ }
+ });
}
@Override
@@ -2490,6 +2468,31 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
return this.executor;
}
+ /**
+ * Safely executes a task on the connection's executor, handling the case
where
+ * the executor may be shutdown due to connection closure. See #close()
above.
+ * <p>
+ * We need to check if the connection is closed/closing and if the executor
+ * is shutdown before attempting to execute anything. We also need to catch
+ * {@link RejectedExecutionException} to handle check and call senario.
+ *
+ * @param task the task to execute
+ * @return true if the task was submitted successfully, false if the
executor
+ * was unavailable (connection closing or executor shutdown)
+ */
+ private boolean executeAsync(final Runnable task) {
+ if (closed.get() || closing.get() || executor.isShutdown()) {
+ return false;
+ }
+ try {
+ executor.execute(task);
+ return true;
+
+ } catch (final RejectedExecutionException e) {
+ return false; // connection already closing probably
+ }
+ }
+
protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
return sessions;
}
diff --git a/activemq-pool/pom.xml b/activemq-pool/pom.xml
index 677225b9bc..32942bbf50 100644
--- a/activemq-pool/pom.xml
+++ b/activemq-pool/pom.xml
@@ -126,6 +126,12 @@
<build>
<plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <reuseForks>false</reuseForks>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
diff --git
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConfigFromPropsTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConfigFromPropsTest.java
index 61855c95a8..923f007230 100644
---
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConfigFromPropsTest.java
+++
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConfigFromPropsTest.java
@@ -24,15 +24,17 @@ import static junit.framework.Assert.assertNotNull;
public class ConfigFromPropsTest {
- XaPooledConnectionFactory underTest;
-
@Test
public void testBrokerUrlForRarAdminObject() throws Exception {
- underTest = new XaPooledConnectionFactory();
- underTest.setBrokerUrl("vm://localhost?broker.persistent=false");
- Connection connection = underTest.createConnection();
- assertNotNull(connection);
- connection.close();
- assertNotNull(underTest.getBrokerUrl());
+ final XaPooledConnectionFactory underTest = new
XaPooledConnectionFactory();
+ try {
+
underTest.setBrokerUrl("vm://configFromPropsTest?broker.persistent=false");
+ final Connection connection = underTest.createConnection();
+ assertNotNull(connection);
+ connection.close();
+ assertNotNull(underTest.getBrokerUrl());
+ } finally {
+ underTest.stop();
+ }
}
}
diff --git
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
index d8c23d7722..12c46be421 100644
---
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
+++
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
@@ -49,6 +49,7 @@ public class ConnectionFailureEvictsFromPoolTest extends
TestSupport {
protected void setUp() throws Exception {
broker = new BrokerService();
+ broker.setBrokerName("connectionFailureEvictsFromPoolTest");
broker.setUseJmx(false);
broker.setPersistent(false);
connector = broker.addConnector("tcp://localhost:0");
@@ -56,64 +57,73 @@ public class ConnectionFailureEvictsFromPoolTest extends
TestSupport {
}
public void testEnhancedConnection() throws Exception {
- XaPooledConnectionFactory pooledFactory =
+ final XaPooledConnectionFactory pooledFactory =
new XaPooledConnectionFactory(new
ActiveMQXAConnectionFactory("mock:" + connector.getConnectUri() +
"?closeAsync=false"));
-
- PooledConnection connection = (PooledConnection)
pooledFactory.createConnection();
- EnhancedConnection enhancedConnection =
(EnhancedConnection)connection.getConnection();
- DestinationSource destinationSource =
enhancedConnection.getDestinationSource();
- assertNotNull(destinationSource);
-
+ try {
+ try (final PooledConnection connection = (PooledConnection)
pooledFactory.createConnection()) {
+ final EnhancedConnection enhancedConnection =
(EnhancedConnection) connection.getConnection();
+ final DestinationSource destinationSource =
enhancedConnection.getDestinationSource();
+ assertNotNull(destinationSource);
+ }
+ } finally {
+ pooledFactory.stop();
+ }
}
public void testEvictionXA() throws Exception {
- XaPooledConnectionFactory pooledFactory =
+ final XaPooledConnectionFactory pooledFactory =
new XaPooledConnectionFactory(new
ActiveMQXAConnectionFactory("mock:(" + connector.getConnectUri() +
"?closeAsync=false)?jms.xaAckMode=1"));
-
- doTestEviction(pooledFactory);
+ try {
+ doTestEviction(pooledFactory);
+ } finally {
+ pooledFactory.stop();
+ }
}
public void testEviction() throws Exception {
- PooledConnectionFactory pooledFactory =
+ final PooledConnectionFactory pooledFactory =
new PooledConnectionFactory(new
ActiveMQConnectionFactory("mock:" + connector.getConnectUri() +
"?closeAsync=false"));
-
- doTestEviction(pooledFactory);
+ try {
+ doTestEviction(pooledFactory);
+ } finally {
+ pooledFactory.stop();
+ }
}
public void doTestEviction(ConnectionFactory pooledFactory) throws
Exception {
- PooledConnection connection = (PooledConnection)
pooledFactory.createConnection();
- ActiveMQConnection amqC = (ActiveMQConnection)
connection.getConnection();
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
- amqC.addTransportListener(new TransportListener() {
- public void onCommand(Object command) {
- }
- public void onException(IOException error) {
- // we know connection is dead...
- // listeners are fired async
- gotExceptionEvent.countDown();
- }
- public void transportInterupted() {
- }
- public void transportResumed() {
- }
- });
+ try (final PooledConnection connection = (PooledConnection)
pooledFactory.createConnection()) {
+ final ActiveMQConnection amqC = (ActiveMQConnection)
connection.getConnection();
+ amqC.addTransportListener(new TransportListener() {
+ public void onCommand(Object command) {
+ }
+ public void onException(IOException error) {
+ // we know connection is dead...
+ // listeners are fired async
+ gotExceptionEvent.countDown();
+ }
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }
+ });
- sendMessage(connection);
- LOG.info("sent one message worked fine");
- createConnectionFailure(connection);
- try {
sendMessage(connection);
- TestCase.fail("Expected Error");
- } catch (JMSException e) {
- } finally {
- connection.close();
+ LOG.info("sent one message worked fine");
+ createConnectionFailure(connection);
+ try {
+ sendMessage(connection);
+ TestCase.fail("Expected Error");
+ } catch (JMSException e) {
+ }
}
- TestCase.assertTrue("exception event propagated ok",
gotExceptionEvent.await(5, TimeUnit.SECONDS));
+ TestCase.assertTrue("exception event propagated ok",
gotExceptionEvent.await(15, TimeUnit.SECONDS));
// If we get another connection now it should be a new connection that
// works.
LOG.info("expect new connection after failure");
- Connection connection2 = pooledFactory.createConnection();
- sendMessage(connection2);
+ try (final Connection connection2 = pooledFactory.createConnection()) {
+ sendMessage(connection2);
+ }
}
private void createConnectionFailure(Connection connection) throws
Exception {
@@ -132,5 +142,6 @@ public class ConnectionFailureEvictsFromPoolTest extends
TestSupport {
protected void tearDown() throws Exception {
broker.stop();
+ broker.waitUntilStopped();
}
}
diff --git
a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
index e2e33cd38a..be80764d63 100644
---
a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
+++
b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
@@ -29,18 +29,22 @@ import org.slf4j.LoggerFactory;
*/
public class PooledConnectionFactoryTest {
- private final Logger LOG =
LoggerFactory.getLogger(PooledConnectionFactoryTest.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(PooledConnectionFactoryTest.class);
@Test(timeout=240000)
public void testGetReference() throws Exception {
- PooledConnectionFactory factory = createPooledConnectionFactory();
- Reference ref = factory.getReference();
- assertNotNull(ref);
+ final PooledConnectionFactory factory =
createPooledConnectionFactory();
+ try {
+ final Reference ref = factory.getReference();
+ assertNotNull(ref);
+ } finally {
+ factory.stop();
+ }
}
protected PooledConnectionFactory createPooledConnectionFactory() {
- PooledConnectionFactory cf = new PooledConnectionFactory(
- "vm://localhost?broker.persistent=false");
+ final PooledConnectionFactory cf = new PooledConnectionFactory(
+ "vm://pooledConnectionFactoryTest?broker.persistent=false");
LOG.debug("ConnectionFactory initialized.");
return cf;
}
diff --git
a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java
index a15dace2e8..fa048f6a22 100644
---
a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java
+++
b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java
@@ -62,15 +62,14 @@ public class PooledConsumerTest {
class PooledConsumer implements MessageListener {
- private ConnectionFactory factory;
+ private final PooledConnectionFactory factory;
private Connection connection;
public boolean done = false;
public PooledConsumer(String url) throws JMSException {
- org.apache.activemq.pool.PooledConnectionFactory factory = new
org.apache.activemq.pool.PooledConnectionFactory(url);
+ factory = new PooledConnectionFactory(url);
factory.setMaxConnections(5);
factory.setIdleTimeout(0);
- this.factory = factory;
init();
}
@@ -140,11 +139,13 @@ public class PooledConsumerTest {
public void done() {
done = true;
close();
+ factory.stop();
}
}
public void startBroker(String group, String trasport) throws Exception {
brokerService = new BrokerService();
+ brokerService.setBrokerName("pooledConsumerTest");
brokerService.addConnector(trasport);
brokerService.setPersistent(false);
brokerService.setUseJmx(false);
diff --git
a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
index 9d1a84a4b1..9dcd4b0fd1 100644
---
a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
+++
b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
@@ -58,7 +58,7 @@ public class XAConnectionPoolTest extends TestSupport {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
+ pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://xaConnectionPoolTest?broker.persistent=false"));
final Xid xid = createXid();
// simple TM that is in a tx and will track syncs
@@ -157,6 +157,7 @@ public class XAConnectionPoolTest extends TestSupport {
sync.afterCompletion(1);
}
connection.close();
+ pcf.stop();
}
static long txGenerator = 22;
@@ -188,7 +189,7 @@ public class XAConnectionPoolTest extends TestSupport {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode="
+ Session.CLIENT_ACKNOWLEDGE));
+ pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://xaConnectionPoolTest?broker.persistent=false&jms.xaAckMode="
+ Session.CLIENT_ACKNOWLEDGE));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager() {
@@ -280,52 +281,67 @@ public class XAConnectionPoolTest extends TestSupport {
sync.afterCompletion(1);
}
connection.close();
+ pcf.stop();
}
public void testInstanceOf() throws Exception {
- XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- assertTrue(pcf instanceof QueueConnectionFactory);
- assertTrue(pcf instanceof TopicConnectionFactory);
+ final XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+ try {
+ assertTrue(pcf instanceof QueueConnectionFactory);
+ assertTrue(pcf instanceof TopicConnectionFactory);
+ } finally {
+ pcf.stop();
+ }
}
public void testBindable() throws Exception {
- XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- assertTrue(pcf instanceof ObjectFactory);
- assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null, null,
null) instanceof XaPooledConnectionFactory);
- assertTrue(pcf.isTmFromJndi());
+ final XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+ try {
+ assertTrue(pcf instanceof ObjectFactory);
+ assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null,
null, null) instanceof XaPooledConnectionFactory);
+ assertTrue(pcf.isTmFromJndi());
+ } finally {
+ pcf.stop();
+ }
}
public void testBindableEnvOverrides() throws Exception {
- XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- assertTrue(pcf instanceof ObjectFactory);
- Hashtable<String, String> environment = new Hashtable<String,
String>();
- environment.put("tmFromJndi", String.valueOf(Boolean.FALSE));
- assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null, null,
environment) instanceof XaPooledConnectionFactory);
- assertFalse(pcf.isTmFromJndi());
+ final XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+ try {
+ assertTrue(pcf instanceof ObjectFactory);
+ final Hashtable<String, String> environment = new Hashtable<>();
+ environment.put("tmFromJndi", String.valueOf(Boolean.FALSE));
+ assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null,
null, environment) instanceof XaPooledConnectionFactory);
+ assertFalse(pcf.isTmFromJndi());
+ } finally {
+ pcf.stop();
+ }
}
public void testSenderAndPublisherDest() throws Exception {
- XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
-
- QueueConnection connection = pcf.createQueueConnection();
- QueueSession session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
- QueueSender sender = session.createSender(session.createQueue("AA"));
- assertNotNull(sender.getQueue().getQueueName());
-
- connection.close();
-
- TopicConnection topicConnection = pcf.createTopicConnection();
- TopicSession topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
- TopicPublisher topicPublisher =
topicSession.createPublisher(topicSession.createTopic("AA"));
- assertNotNull(topicPublisher.getTopic().getTopicName());
+ final XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+ try {
+ pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://xaConnectionPoolTest?broker.persistent=false"));
+
+ try (final QueueConnection connection =
pcf.createQueueConnection()) {
+ final QueueSession session =
connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ final QueueSender sender =
session.createSender(session.createQueue("AA"));
+ assertNotNull(sender.getQueue().getQueueName());
+ }
- topicConnection.close();
+ try (final TopicConnection topicConnection =
pcf.createTopicConnection()) {
+ final TopicSession topicSession =
topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TopicPublisher topicPublisher =
topicSession.createPublisher(topicSession.createTopic("AA"));
+ assertNotNull(topicPublisher.getTopic().getTopicName());
+ }
+ } finally {
+ pcf.stop();
+ }
}
public void testSessionArgsIgnoredWithTm() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
+ pcf.setConnectionFactory(new
ActiveMQXAConnectionFactory("vm://xaConnectionPoolTest?broker.persistent=false"));
// simple TM that with no tx
pcf.setTransactionManager(new TransactionManager() {
@Override
@@ -374,10 +390,11 @@ public class XAConnectionPoolTest extends TestSupport {
}
});
- QueueConnection connection = pcf.createQueueConnection();
+ final QueueConnection connection = pcf.createQueueConnection();
// like ee tck
assertNotNull("can create session(false, 0)",
connection.createQueueSession(false, 0));
connection.close();
+ pcf.stop();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact