This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new c3272de289 Backport network flaky test improvements (#2046)
c3272de289 is described below

commit c3272de28948f423ca8c8b6933c1c199dd7e34b2
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Tue May 26 20:30:13 2026 -0400

    Backport network flaky test improvements (#2046)
---
 .../network/DurableSyncNetworkBridgeTest.java      | 181 +++++++++++++++------
 .../network/DynamicNetworkTestSupport.java         |  49 +++---
 2 files changed, 161 insertions(+), 69 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index aa36e26fba..b849f08366 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -16,22 +16,10 @@
  */
 package org.apache.activemq.network;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.BrokerPlugin;
@@ -62,6 +50,18 @@ import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 @RunWith(Parameterized.class)
 public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
@@ -83,7 +83,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     private final FLOW flow;
 
     @Rule
-    public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
+    public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
 
     @Parameters
     public static Collection<Object[]> data() {
@@ -138,6 +138,10 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
 
+        // Wait for subscription to become inactive before attempting removal
+        // It's very important to wait here, otherwise the removal may not 
propagate
+        waitForSubscriptionInactive(broker1, topic, subName);
+
         removeSubscription(broker1, subName);
 
         assertSubscriptionsCount(broker1, topic, 0);
@@ -222,7 +226,12 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //Test that on successful reconnection of the bridge that
         //the NC sub will be removed
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
+        // In REVERSE flow, broker2=localBroker has the bridge and broker1 
(remoteBroker)
+        // is already running, so the sync may have already cleaned up the NC 
durable sub.
+        // This "before sync" assertion is only valid in FORWARD flow.
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
         restartBroker(broker1, true);
         assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -249,7 +258,9 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //the NC sub will be removed because even though the local 
subscription exists,
         //it no longer matches the included filter
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
         restartBroker(broker1, true);
         assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -287,7 +298,9 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //the NC sub will be removed because even though the local 
subscription exists,
         //it no longer matches the included static filter
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
         restartBroker(broker1, true);
         assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -316,10 +329,13 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //Test that on successful reconnection of the bridge that
         //the NC sub will be removed for topic1 but will stay for topic2
 
-        //before sync, the old NC should exist
+        //before sync, the old NC should exist (only verifiable in FORWARD 
flow;
+        //in REVERSE, broker2=localBroker has the bridge and sync may already 
have run)
         restartBroker(broker2, true);
-        assertNCDurableSubsCount(broker2, topic, 1);
-        assertNCDurableSubsCount(broker2, topic2, 0);
+        if (flow == FLOW.FORWARD) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic2, 0);
+        }
 
         //After sync, remove old NC and create one for topic 2
         restartBroker(broker1, true);
@@ -527,7 +543,6 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         session1.createDurableSubscriber(topic, "sub3");
         session1.createDurableSubscriber(excludeTopic, "sub-exclude");
 
-        Thread.sleep(1000);
         assertNCDurableSubsCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, excludeTopic, 0);
 
@@ -566,13 +581,10 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         secondConnector.start();
 
         //Make sure both bridges are connected
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
-                        
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
-            }
-        }, 10000, 500));
+        assertTrue(Wait.waitFor(() ->
+                        
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
+                                
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1,
+                TimeUnit.SECONDS.toMillis(15), 500));
 
         //Make sure NC durables exist for both bridges
         assertNCDurableSubsCount(broker2, topic2, 1);
@@ -633,13 +645,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         final DestinationStatistics remoteDestStatistics2 = 
remoteBroker.getDestination(
                 new 
ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
 
-        assertTrue(Wait.waitFor(new Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return remoteDestStatistics2.getMessages().getCount() == 501;
-            }
-        }));
+        assertTrue(Wait.waitFor(() -> 
remoteDestStatistics2.getMessages().getCount() == 501));
 
     }
 
@@ -719,8 +725,36 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         included = new ActiveMQTopic(testTopicName);
         doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
         doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, 
localDataDir);
-        //Give time for advisories to propagate
-        Thread.sleep(1000);
+        //Wait for the bridge to be fully started (advisory consumers 
registered).
+        //Note: activeBridges().size() == 1 is NOT sufficient because bridges 
are added
+        //to the map before start() completes asynchronously. We must wait for 
the
+        //startedLatch which counts down after advisory consumers are 
registered.
+        if (startNetworkConnector) {
+            waitForBridgeFullyStarted();
+        }
+    }
+
+    private void waitForBridgeFullyStarted() throws Exception {
+        // Wait for the local bridge to be fully started (advisory consumers 
registered)
+        assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
+            if 
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+                return false;
+            }
+            final NetworkBridge bridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+            if (bridge instanceof DemandForwardingBridgeSupport) {
+                return ((DemandForwardingBridgeSupport) 
bridge).startedLatch.getCount() == 0;
+            }
+            return true;
+        }, TimeUnit.SECONDS.toMillis(15), 100));
+
+        // Also wait for the duplex bridge on the remote broker to be fully 
started.
+        // The duplex connector creates a separate DemandForwardingBridge on 
the remote side
+        // that also needs its advisory consumers registered before it can 
process events.
+        assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> 
{
+            final DemandForwardingBridge duplexBridge = findDuplexBridge(
+                    remoteBroker.getTransportConnectors().get(0));
+            return duplexBridge != null && 
duplexBridge.startedLatch.getCount() == 0;
+        }, TimeUnit.SECONDS.toMillis(15), 100));
     }
 
     protected void restartLocalBroker(boolean startNetworkConnector) throws 
Exception {
@@ -729,13 +763,42 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     }
 
     protected void restartRemoteBroker() throws Exception {
-        int port = 0;
-        if (remoteBroker != null) {
-            List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
-            port = transportConnectors.get(0).getConnectUri().getPort();
-        }
+        final int previousPort = 
remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
+        final File dataDir = remoteBroker.getDataDirectoryFile();
         stopRemoteBroker();
-        doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port);
+        try {
+            doSetUpRemoteBroker(false, dataDir, previousPort);
+        } catch (final IOException e) {
+            if (e.getCause() instanceof java.net.BindException) {
+                // Previous port still in TIME_WAIT — use a new ephemeral port
+                doSetUpRemoteBroker(false, dataDir, 0);
+                // Update the local broker's network connector to point to the 
new port
+                updateLocalNetworkConnectorUri();
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * When the remote broker restarts on a new ephemeral port (BindException 
fallback),
+     * any existing network connector on the local broker still points to the 
old port.
+     * This method stops the old connector and replaces it with one targeting 
the new URI.
+     */
+    private void updateLocalNetworkConnectorUri() throws Exception {
+        if (localBroker == null) {
+            return;
+        }
+        final List<NetworkConnector> connectors = 
localBroker.getNetworkConnectors();
+        if (connectors.isEmpty()) {
+            return;
+        }
+        final NetworkConnector oldConnector = connectors.get(0);
+        oldConnector.stop();
+        localBroker.removeNetworkConnector(oldConnector);
+        final NetworkConnector newConnector = configureLocalNetworkConnector();
+        localBroker.addNetworkConnector(newConnector);
+        newConnector.start();
     }
 
     protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean 
startNetworkConnector,
@@ -753,12 +816,14 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         localConnection.start();
 
         if (startNetworkConnector) {
-            Wait.waitFor(new Condition() {
-                @Override
-                public boolean isSatisified() throws Exception {
-                    return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
-                }
-            }, 5000, 500);
+            // Best-effort wait for the bridge to appear. Do NOT use 
assertTrue here
+            // because some tests restart localBroker before remoteBroker is 
running,
+            // relying on the bridge connecting later when remoteBroker 
restarts.
+            // Tests that need the bridge to be fully started call 
assertBridgeStarted() explicitly.
+            // Keep timeout short (5s) to avoid growing the NC reconnect 
backoff too much,
+            // which would delay bridge formation when the remote broker 
starts later.
+            Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+                    TimeUnit.SECONDS.toMillis(5), 500);
         }
         localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
@@ -869,4 +934,24 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         return brokerService;
     }
 
+    /**
+     * Wait for a durable subscription to become inactive before attempting 
removal.
+     * This prevents "Durable consumer is in use" errors when consumer close 
operations
+     * complete asynchronously (especially visible with Java 25's different 
thread scheduling).
+     */
+    protected void waitForSubscriptionInactive(final BrokerService 
brokerService,
+            final ActiveMQTopic topic,
+            final String subName) throws Exception {
+        assertTrue("Subscription should become inactive", Wait.waitFor(() -> {
+            final 
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs = 
getSubscriptions(brokerService, topic);
+            for (final 
org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
+                if 
(sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
+                    return !sub.isActive();
+                }
+            }
+            // If subscription doesn't exist, it's considered inactive
+            return true;
+        }, TimeUnit.SECONDS.toMillis(15), 100));
+    }
+
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 2d83fb71b7..4e0ecf7480 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -45,7 +46,6 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
@@ -95,9 +95,16 @@ public abstract class DynamicNetworkTestSupport {
     }
 
     protected void assertBridgeStarted() throws Exception {
-        assertTrue(Wait.waitFor(
-            () -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
-            10000, 500));
+        assertTrue("Bridge should be fully started", Wait.waitFor(() -> {
+            if 
(localBroker.getNetworkConnectors().get(0).activeBridges().size() != 1) {
+                return false;
+            }
+            final NetworkBridge bridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+            if (bridge instanceof DemandForwardingBridgeSupport) {
+                return ((DemandForwardingBridgeSupport) 
bridge).startedLatch.getCount() == 0;
+            }
+            return true;
+        }, TimeUnit.SECONDS.toMillis(10), 500));
     }
 
     protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final 
ConnectionContext context,
@@ -119,8 +126,8 @@ public abstract class DynamicNetworkTestSupport {
 
     protected void waitForDispatchFromLocalBroker(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
         assertTrue(Wait.waitFor(() -> count == 
destinationStatistics.getDequeues().getCount() &&
-               count == destinationStatistics.getDispatched().getCount() &&
-               count == destinationStatistics.getForwards().getCount()));
+                count == destinationStatistics.getDispatched().getCount() &&
+                count == destinationStatistics.getForwards().getCount()));
     }
 
     protected void assertLocalBrokerStatistics(final DestinationStatistics 
localStatistics, final int count) {
@@ -135,17 +142,19 @@ public abstract class DynamicNetworkTestSupport {
 
     protected void assertNCDurableSubsCount(final BrokerService brokerService,
             final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
-            10000, 500));
+        assertTrue("Expected " + count + " NC durable subs on " + dest,
+                Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
+                        TimeUnit.SECONDS.toMillis(30), 500));
     }
 
     protected void assertConsumersCount(final BrokerService brokerService,
             final ActiveMQDestination dest, final int count) throws Exception {
         assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, 
dest).size(),
-            10000, 500));
-        Thread.sleep(1000);
-        // Check one more time after a short pause to make sure the count 
didn't increase past what we wanted
-        assertEquals(count, getConsumers(brokerService, dest).size());
+                10000, 500));
+        // Wait a bit longer and verify the count is stable (didn't increase 
past what we wanted)
+        assertTrue("Consumer count should remain stable at " + count,
+                Wait.waitFor(() -> count == getConsumers(brokerService, 
dest).size(),
+                        TimeUnit.SECONDS.toMillis(5), 500));
     }
 
     protected List<Subscription> getConsumers(final BrokerService 
brokerService,
@@ -194,7 +203,7 @@ public abstract class DynamicNetworkTestSupport {
     }
 
     protected void removeSubscription(final BrokerService brokerService,
-        final String subName) throws Exception {
+            final String subName) throws Exception {
         final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
         info.setClientId(clientId);
         info.setSubscriptionName(subName);
@@ -208,12 +217,9 @@ public abstract class DynamicNetworkTestSupport {
 
     protected void assertSubscriptionsCount(final BrokerService brokerService,
             final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getSubscriptions(brokerService, dest).size();
-            }
-        }, 10000, 500));
+        assertTrue("Expected " + count + " subscriptions on " + dest,
+                Wait.waitFor(() -> count == getSubscriptions(brokerService, 
dest).size(),
+                        TimeUnit.SECONDS.toMillis(30), 500));
     }
 
     protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, 
final int count) {
@@ -226,8 +232,9 @@ public abstract class DynamicNetworkTestSupport {
     protected DemandForwardingBridge findDuplexBridge(final TransportConnector 
connector) throws Exception {
         assertNotNull(connector);
 
-        for (TransportConnection tc : connector.getConnections()) {
-            if (tc.getConnectionId().startsWith("networkConnector_")) {
+        for (final TransportConnection tc : connector.getConnections()) {
+            final String connectionId = tc.getConnectionId();
+            if (connectionId != null && 
connectionId.startsWith("networkConnector_")) {
                 final Field bridgeField = 
TransportConnection.class.getDeclaredField("duplexBridge");
                 bridgeField.setAccessible(true);
                 return (DemandForwardingBridge) bridgeField.get(tc);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to