This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new c3272de289 Backport network flaky test improvements (#2046)
c3272de289 is described below
commit c3272de28948f423ca8c8b6933c1c199dd7e34b2
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Tue May 26 20:30:13 2026 -0400
Backport network flaky test improvements (#2046)
---
.../network/DurableSyncNetworkBridgeTest.java | 181 +++++++++++++++------
.../network/DynamicNetworkTestSupport.java | 49 +++---
2 files changed, 161 insertions(+), 69 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index aa36e26fba..b849f08366 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -16,22 +16,10 @@
*/
package org.apache.activemq.network;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.BrokerPlugin;
@@ -62,6 +50,18 @@ import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
@RunWith(Parameterized.class)
public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
@@ -83,7 +83,7 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
private final FLOW flow;
@Rule
- public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
+ public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
@Parameters
public static Collection<Object[]> data() {
@@ -138,6 +138,10 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
+ // Wait for subscription to become inactive before attempting removal
+ // It's very important to wait here, otherwise the removal may not
propagate
+ waitForSubscriptionInactive(broker1, topic, subName);
+
removeSubscription(broker1, subName);
assertSubscriptionsCount(broker1, topic, 0);
@@ -222,7 +226,12 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//Test that on successful reconnection of the bridge that
//the NC sub will be removed
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
+ // In REVERSE flow, broker2=localBroker has the bridge and broker1
(remoteBroker)
+ // is already running, so the sync may have already cleaned up the NC
durable sub.
+ // This "before sync" assertion is only valid in FORWARD flow.
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ }
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
@@ -249,7 +258,9 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//the NC sub will be removed because even though the local
subscription exists,
//it no longer matches the included filter
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ }
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
@@ -287,7 +298,9 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//the NC sub will be removed because even though the local
subscription exists,
//it no longer matches the included static filter
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ }
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
@@ -316,10 +329,13 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
//Test that on successful reconnection of the bridge that
//the NC sub will be removed for topic1 but will stay for topic2
- //before sync, the old NC should exist
+ //before sync, the old NC should exist (only verifiable in FORWARD
flow;
+ //in REVERSE, broker2=localBroker has the bridge and sync may already
have run)
restartBroker(broker2, true);
- assertNCDurableSubsCount(broker2, topic, 1);
- assertNCDurableSubsCount(broker2, topic2, 0);
+ if (flow == FLOW.FORWARD) {
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, topic2, 0);
+ }
//After sync, remove old NC and create one for topic 2
restartBroker(broker1, true);
@@ -527,7 +543,6 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
session1.createDurableSubscriber(topic, "sub3");
session1.createDurableSubscriber(excludeTopic, "sub-exclude");
- Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, excludeTopic, 0);
@@ -566,13 +581,10 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
secondConnector.start();
//Make sure both bridges are connected
- assertTrue(Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
-
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
- }
- }, 10000, 500));
+ assertTrue(Wait.waitFor(() ->
+
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
+
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1,
+ TimeUnit.SECONDS.toMillis(15), 500));
//Make sure NC durables exist for both bridges
assertNCDurableSubsCount(broker2, topic2, 1);
@@ -633,13 +645,7 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
final DestinationStatistics remoteDestStatistics2 =
remoteBroker.getDestination(
new
ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
- assertTrue(Wait.waitFor(new Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return remoteDestStatistics2.getMessages().getCount() == 501;
- }
- }));
+ assertTrue(Wait.waitFor(() ->
remoteDestStatistics2.getMessages().getCount() == 501));
}
@@ -719,8 +725,36 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
included = new ActiveMQTopic(testTopicName);
doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
doSetUpLocalBroker(deleteAllMessages, startNetworkConnector,
localDataDir);
- //Give time for advisories to propagate
- Thread.sleep(1000);
+ //Wait for the bridge to be fully started (advisory consumers
registered).
+ //Note: activeBridges().size() == 1 is NOT sufficient because bridges
are added
+ //to the map before start() completes asynchronously. We must wait for
the
+ //startedLatch which counts down after advisory consumers are
registered.
+ if (startNetworkConnector) {
+ waitForBridgeFullyStarted();
+ }
+ }
+
+ private void waitForBridgeFullyStarted() throws Exception {
+ // Wait for the local bridge to be fully started (advisory consumers
registered)
+ assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
+ if
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+ return false;
+ }
+ final NetworkBridge bridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ return ((DemandForwardingBridgeSupport)
bridge).startedLatch.getCount() == 0;
+ }
+ return true;
+ }, TimeUnit.SECONDS.toMillis(15), 100));
+
+ // Also wait for the duplex bridge on the remote broker to be fully
started.
+ // The duplex connector creates a separate DemandForwardingBridge on
the remote side
+ // that also needs its advisory consumers registered before it can
process events.
+ assertTrue("Duplex bridge should be fully started", Wait.waitFor(() ->
{
+ final DemandForwardingBridge duplexBridge = findDuplexBridge(
+ remoteBroker.getTransportConnectors().get(0));
+ return duplexBridge != null &&
duplexBridge.startedLatch.getCount() == 0;
+ }, TimeUnit.SECONDS.toMillis(15), 100));
}
protected void restartLocalBroker(boolean startNetworkConnector) throws
Exception {
@@ -729,13 +763,42 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
}
protected void restartRemoteBroker() throws Exception {
- int port = 0;
- if (remoteBroker != null) {
- List<TransportConnector> transportConnectors =
remoteBroker.getTransportConnectors();
- port = transportConnectors.get(0).getConnectUri().getPort();
- }
+ final int previousPort =
remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
+ final File dataDir = remoteBroker.getDataDirectoryFile();
stopRemoteBroker();
- doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port);
+ try {
+ doSetUpRemoteBroker(false, dataDir, previousPort);
+ } catch (final IOException e) {
+ if (e.getCause() instanceof java.net.BindException) {
+ // Previous port still in TIME_WAIT — use a new ephemeral port
+ doSetUpRemoteBroker(false, dataDir, 0);
+ // Update the local broker's network connector to point to the
new port
+ updateLocalNetworkConnectorUri();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * When the remote broker restarts on a new ephemeral port (BindException
fallback),
+ * any existing network connector on the local broker still points to the
old port.
+ * This method stops the old connector and replaces it with one targeting
the new URI.
+ */
+ private void updateLocalNetworkConnectorUri() throws Exception {
+ if (localBroker == null) {
+ return;
+ }
+ final List<NetworkConnector> connectors =
localBroker.getNetworkConnectors();
+ if (connectors.isEmpty()) {
+ return;
+ }
+ final NetworkConnector oldConnector = connectors.get(0);
+ oldConnector.stop();
+ localBroker.removeNetworkConnector(oldConnector);
+ final NetworkConnector newConnector = configureLocalNetworkConnector();
+ localBroker.addNetworkConnector(newConnector);
+ newConnector.start();
}
protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean
startNetworkConnector,
@@ -753,12 +816,14 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
localConnection.start();
if (startNetworkConnector) {
- Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
- }
- }, 5000, 500);
+ // Best-effort wait for the bridge to appear. Do NOT use
assertTrue here
+ // because some tests restart localBroker before remoteBroker is
running,
+ // relying on the bridge connecting later when remoteBroker
restarts.
+ // Tests that need the bridge to be fully started call
assertBridgeStarted() explicitly.
+ // Keep timeout short (5s) to avoid growing the NC reconnect
backoff too much,
+ // which would delay bridge formation when the remote broker
starts later.
+ Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+ TimeUnit.SECONDS.toMillis(5), 500);
}
localSession = localConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -869,4 +934,24 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
return brokerService;
}
+ /**
+ * Wait for a durable subscription to become inactive before attempting
removal.
+ * This prevents "Durable consumer is in use" errors when consumer close
operations
+ * complete asynchronously (especially visible with Java 25's different
thread scheduling).
+ */
+ protected void waitForSubscriptionInactive(final BrokerService
brokerService,
+ final ActiveMQTopic topic,
+ final String subName) throws Exception {
+ assertTrue("Subscription should become inactive", Wait.waitFor(() -> {
+ final
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs =
getSubscriptions(brokerService, topic);
+ for (final
org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
+ if
(sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
+ return !sub.isActive();
+ }
+ }
+ // If subscription doesn't exist, it's considered inactive
+ return true;
+ }, TimeUnit.SECONDS.toMillis(15), 100));
+ }
+
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 2d83fb71b7..4e0ecf7480 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -45,7 +46,6 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@@ -95,9 +95,16 @@ public abstract class DynamicNetworkTestSupport {
}
protected void assertBridgeStarted() throws Exception {
- assertTrue(Wait.waitFor(
- () ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
- 10000, 500));
+ assertTrue("Bridge should be fully started", Wait.waitFor(() -> {
+ if
(localBroker.getNetworkConnectors().get(0).activeBridges().size() != 1) {
+ return false;
+ }
+ final NetworkBridge bridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ return ((DemandForwardingBridgeSupport)
bridge).startedLatch.getCount() == 0;
+ }
+ return true;
+ }, TimeUnit.SECONDS.toMillis(10), 500));
}
protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final
ConnectionContext context,
@@ -119,8 +126,8 @@ public abstract class DynamicNetworkTestSupport {
protected void waitForDispatchFromLocalBroker(final DestinationStatistics
destinationStatistics, final int count) throws Exception {
assertTrue(Wait.waitFor(() -> count ==
destinationStatistics.getDequeues().getCount() &&
- count == destinationStatistics.getDispatched().getCount() &&
- count == destinationStatistics.getForwards().getCount()));
+ count == destinationStatistics.getDispatched().getCount() &&
+ count == destinationStatistics.getForwards().getCount()));
}
protected void assertLocalBrokerStatistics(final DestinationStatistics
localStatistics, final int count) {
@@ -135,17 +142,19 @@ public abstract class DynamicNetworkTestSupport {
protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
- assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService,
dest).size(),
- 10000, 500));
+ assertTrue("Expected " + count + " NC durable subs on " + dest,
+ Wait.waitFor(() -> count == getNCDurableSubs(brokerService,
dest).size(),
+ TimeUnit.SECONDS.toMillis(30), 500));
}
protected void assertConsumersCount(final BrokerService brokerService,
final ActiveMQDestination dest, final int count) throws Exception {
assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService,
dest).size(),
- 10000, 500));
- Thread.sleep(1000);
- // Check one more time after a short pause to make sure the count
didn't increase past what we wanted
- assertEquals(count, getConsumers(brokerService, dest).size());
+ 10000, 500));
+ // Wait a bit longer and verify the count is stable (didn't increase
past what we wanted)
+ assertTrue("Consumer count should remain stable at " + count,
+ Wait.waitFor(() -> count == getConsumers(brokerService,
dest).size(),
+ TimeUnit.SECONDS.toMillis(5), 500));
}
protected List<Subscription> getConsumers(final BrokerService
brokerService,
@@ -194,7 +203,7 @@ public abstract class DynamicNetworkTestSupport {
}
protected void removeSubscription(final BrokerService brokerService,
- final String subName) throws Exception {
+ final String subName) throws Exception {
final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
info.setSubscriptionName(subName);
@@ -208,12 +217,9 @@ public abstract class DynamicNetworkTestSupport {
protected void assertSubscriptionsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
- assertTrue(Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return count == getSubscriptions(brokerService, dest).size();
- }
- }, 10000, 500));
+ assertTrue("Expected " + count + " subscriptions on " + dest,
+ Wait.waitFor(() -> count == getSubscriptions(brokerService,
dest).size(),
+ TimeUnit.SECONDS.toMillis(30), 500));
}
protected void assertSubscriptionMapCounts(NetworkBridge networkBridge,
final int count) {
@@ -226,8 +232,9 @@ public abstract class DynamicNetworkTestSupport {
protected DemandForwardingBridge findDuplexBridge(final TransportConnector
connector) throws Exception {
assertNotNull(connector);
- for (TransportConnection tc : connector.getConnections()) {
- if (tc.getConnectionId().startsWith("networkConnector_")) {
+ for (final TransportConnection tc : connector.getConnections()) {
+ final String connectionId = tc.getConnectionId();
+ if (connectionId != null &&
connectionId.startsWith("networkConnector_")) {
final Field bridgeField =
TransportConnection.class.getDeclaredField("duplexBridge");
bridgeField.setAccessible(true);
return (DemandForwardingBridge) bridgeField.get(tc);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact