This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new f7993bcc6b test(unit-tests): improve reliability of message
consumption in tests (#1711)
f7993bcc6b is described below
commit f7993bcc6b9445b0b6695ab56a5d30f7e72fe0af
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Mar 3 09:58:49 2026 +0100
test(unit-tests): improve reliability of message consumption in tests
(#1711)
* test(unit-tests): improve reliability of message consumption in tests
* [test] Fix ConnectionFailureEvictsFromPoolTest: eliminate flaky async
races
Two race conditions caused testEvictionXA to fail intermittently:
1. Exception event propagation: ActiveMQConnection.addTransportListener()
callbacks fire via executeAsync(), which silently drops tasks when the
pool's ExceptionListener closes the connection and shuts down the
executor first. Fixed by intercepting at the MockTransport level where
exception propagation is synchronous.
2. Pool eviction timing: The pool evicts broken connections asynchronously
via ExceptionListener fired through executeAsync(). The test could
request a new connection before eviction completed. Fixed by using
Wait.waitFor() retry pattern (consistent with other pool tests).
* test(MaxFrameSizeEnabled): increase timeouts to improve test reliability
* test(AMQ2149): enhance prefetch policy for transactional connections
---------
Co-authored-by: root <[email protected]>
---
.../pool/ConnectionFailureEvictsFromPoolTest.java | 40 +++++++++++++++-------
.../java/org/apache/activemq/bugs/AMQ2149Test.java | 9 ++++-
.../org/apache/activemq/store/StoreOrderTest.java | 2 ++
.../activemq/store/jdbc/XACompletionTest.java | 7 +---
.../transport/MaxFrameSizeEnabledTest.java | 17 +++++----
.../RestrictedThreadPoolInactivityTimeoutTest.java | 2 ++
.../QueueZeroPrefetchLazyDispatchPriorityTest.java | 22 ++++++++----
7 files changed, 66 insertions(+), 33 deletions(-)
diff --git
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
index 596eb00fe3..3a0fc67943 100644
---
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
+++
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.jms.pool.PooledConnection;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.mock.MockTransport;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,17 +95,27 @@ public class ConnectionFailureEvictsFromPoolTest extends
TestSupport {
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
try (final PooledConnection connection = (PooledConnection)
pooledFactory.createConnection()) {
final ActiveMQConnection amqC = (ActiveMQConnection)
connection.getConnection();
- amqC.addTransportListener(new TransportListener() {
+ // Intercept exception propagation at the MockTransport level
where it fires
+ // synchronously. ActiveMQConnection.addTransportListener()
callbacks fire via
+ // executeAsync(), which silently drops the task if the pool's
ExceptionListener
+ // closes the connection and shuts down the executor first (race
condition that
+ // affects the XA path).
+ final MockTransport mockTransport = (MockTransport)
amqC.getTransportChannel().narrow(MockTransport.class);
+ final TransportListener originalListener =
mockTransport.getTransportListener();
+ mockTransport.setTransportListener(new TransportListener() {
public void onCommand(Object command) {
+ originalListener.onCommand(command);
}
public void onException(IOException error) {
- // we know connection is dead...
- // listeners are fired async
+ // fires synchronously when MockTransport.onException() is
called
gotExceptionEvent.countDown();
+ originalListener.onException(error);
}
public void transportInterupted() {
+ originalListener.transportInterupted();
}
public void transportResumed() {
+ originalListener.transportResumed();
}
});
@@ -116,18 +127,21 @@ public class ConnectionFailureEvictsFromPoolTest extends
TestSupport {
TestCase.fail("Expected Error");
} catch (JMSException e) {
}
- // Wait for async exception event BEFORE the try-with-resources
closes the connection.
- // ActiveMQConnection.onException() fires TransportListener
callbacks via executeAsync(),
- // so the callback runs in a separate thread. If we wait after
connection.close(), the
- // async executor may already be shut down and the callback never
fires.
- TestCase.assertTrue("exception event propagated ok",
gotExceptionEvent.await(15, TimeUnit.SECONDS));
+ TestCase.assertTrue("exception event propagated ok",
gotExceptionEvent.await(5, TimeUnit.SECONDS));
}
- // If we get another connection now it should be a new connection that
- // works.
+ // After the failure, a new connection from the pool should work.
+ // The pool eviction is async (ExceptionListener fires via
executeAsync),
+ // so retry until the pool returns a working connection.
LOG.info("expect new connection after failure");
- try (final Connection connection2 = pooledFactory.createConnection()) {
- sendMessage(connection2);
- }
+ assertTrue("pool should provide working connection after eviction",
+ Wait.waitFor(() -> {
+ try (final Connection connection2 =
pooledFactory.createConnection()) {
+ sendMessage(connection2);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 5000, 100));
}
private void createConnectionFailure(Connection connection) throws
Exception {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
index 37b6cdeb26..4ec8c59231 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
@@ -41,6 +41,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
@@ -171,6 +172,12 @@ public class AMQ2149Test {
this.transactional = transactional;
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(brokerURL);
connectionFactory.setWatchTopicAdvisories(false);
+ if (transactional) {
+ final ActiveMQPrefetchPolicy policy =
connectionFactory.getPrefetchPolicy();
+ policy.setQueuePrefetch(1);
+ policy.setTopicPrefetch(1);
+ policy.setDurableTopicPrefetch(1);
+ }
connection = connectionFactory.createConnection();
connection.setClientID(dest.toString());
session = connection.createSession(transactional, transactional ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
@@ -194,7 +201,7 @@ public class AMQ2149Test {
final int TRANSACITON_BATCH = 500;
boolean resumeOnNextOrPreviousIsOk = false;
- public void onMessage(Message message) {
+ public synchronized void onMessage(Message message) {
try {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % TRANSACITON_BATCH) == 0) {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
index 58c853e1ba..8dfe3da99b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
@@ -120,6 +120,7 @@ public abstract class StoreOrderTest {
}
if (broker != null) {
broker.stop();
+ broker.waitUntilStopped();
}
}
@@ -257,6 +258,7 @@ public abstract class StoreOrderTest {
configureBroker(newBroker);
newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
newBroker.start();
+ newBroker.waitUntilStarted();
return newBroker;
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index 5b7a3a417f..26e5784087 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -275,12 +275,7 @@ public class XACompletionTest extends TestSupport {
dumpMessages();
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return proxy.getInFlightCount() == 0l;
- }
- });
+ Wait.waitFor(() -> proxy.getInFlightCount() == 0L &&
proxy.cursorSize() == 0);
assertEquals("prefetch", 0, proxy.getInFlightCount());
assertEquals("size", 10, proxy.getQueueSize());
assertEquals("cursor size", 0, proxy.cursorSize());
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
index ed6635480d..77aafacbab 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
@@ -55,6 +55,9 @@ public class MaxFrameSizeEnabledTest {
private static final int CONNECTION_COUNT = 3;
private static final int MESSAGE_ATTEMPTS = 3;
private static final int BODY_SIZE = 20000; // large enough to trip 2k
limit, compressible enough for 60k
+ private static final long BROKER_START_TIMEOUT_MS = 30_000;
+ private static final long BROKER_STOP_TIMEOUT_MS = 30_000;
+ private static final int TEST_TIMEOUT_MS = 120_000;
private BrokerService broker;
private final String transportType;
@@ -158,30 +161,32 @@ public class MaxFrameSizeEnabledTest {
}
public BrokerService createBroker(String connectorName, String
connectorString) throws Exception {
- BrokerService broker = new BrokerService();
+ final BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
- TransportConnector connector = broker.addConnector(connectorString);
+ final TransportConnector connector =
broker.addConnector(connectorString);
connector.setName(connectorName);
broker.start();
- broker.waitUntilStarted();
+ assertTrue("Broker should start within timeout",
+ Wait.waitFor(broker::isStarted, BROKER_START_TIMEOUT_MS, 100));
return broker;
}
public void stopBroker(BrokerService broker) throws Exception {
if (broker != null) {
broker.stop();
- broker.waitUntilStopped();
+ assertTrue("Broker should stop within timeout",
+ Wait.waitFor(broker::isStopped, BROKER_STOP_TIMEOUT_MS,
100));
}
}
- @Test
+ @Test(timeout = TEST_TIMEOUT_MS)
public void testMaxFrameSize() throws Exception {
broker = createBroker(transportType, transportType +
"://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
testMaxFrameSize(transportType,
getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()),
false);
}
- @Test
+ @Test(timeout = TEST_TIMEOUT_MS)
public void testMaxFrameSizeCompression() throws Exception {
// Test message body length is 99841 bytes. Compresses to ~ 48000
broker = createBroker(transportType, transportType +
"://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
index 7eaefd23bd..bf3ab74a9a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport {
private static final Logger LOG =
LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class);
+ private static final int TEST_TIMEOUT_MS = 120_000;
public String brokerTransportScheme = "tcp";
public Boolean rejectWork = Boolean.FALSE;
@@ -86,6 +87,7 @@ public class RestrictedThreadPoolInactivityTimeoutTest
extends JmsTestSupport {
addCombinationValues("rejectWork", new Object[] {Boolean.TRUE,
Boolean.FALSE});
}
+ @org.junit.Test(timeout = TEST_TIMEOUT_MS)
public void testThreadsInvolvedInXInactivityTimeouts() throws Exception {
URI tcpBrokerUri =
URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
index 4c5b193741..7ebe48cc1d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
@@ -199,15 +199,23 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
assertNotNull(message);
assertEquals(5, message.getJMSPriority());
- // consume messages
- final ArrayList<Message> consumeList = consumeMessages("TestQ");
+ // Wait for remaining messages to be fully available after
consumeOneMessage closes its connection.
+ // With lazyDispatch=true + optimizedDispatch=true, messages may
briefly be in-flight
+ // during connection teardown and not yet re-queued for dispatch
to a new consumer.
+ final int remaining = numToSend - 1;
+ assertTrue("Remaining messages available for dispatch",
Wait.waitFor(() -> {
+ final Queue q = (Queue) broker.getDestination(destination);
+ return q != null
+ && q.getDestinationStatistics().getMessages().getCount()
== remaining
+ && q.getDestinationStatistics().getInflight().getCount()
== 0;
+ }, 5000, 100));
+
+ // consume messages (use timeout-based overload for reliable
dispatch on slow CI)
+ final ArrayList<Message> consumeList = consumeMessages("TestQ",
remaining, TimeUnit.SECONDS.toMillis(30));
LOG.info("Consumed list {}", consumeList.size());
- // compare lists
- // assertEquals("Iteration: " + i
- // +", message 1 should be priority high", 5,
- // consumeList.get(0).getJMSPriority());
- for (int j = 1; j < (numToSend - 1); j++) {
+ assertEquals("Iteration: " + i + ", all remaining messages
consumed", remaining, consumeList.size());
+ for (int j = 0; j < consumeList.size(); j++) {
assertEquals("Iteration: " + i + ", message " + j + " should
be priority medium", 4, consumeList.get(j).getJMSPriority());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact