This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new f9fb06ff85 Replace Thread.sleep with Wait.waitFor in 
FailoverClusterTest (#1679)
f9fb06ff85 is described below

commit f9fb06ff859b8bbc3d305bba5e3b546ad2165a43
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Feb 17 16:04:20 2026 +0100

    Replace Thread.sleep with Wait.waitFor in FailoverClusterTest (#1679)
    
    Thread.sleep(3000) is unreliable on slow CI machines for waiting on cluster 
formation and client rebalancing. Replace all sleep-based synchronization with 
Wait.waitFor polling:
    - testClusterConnectedAfterClients: poll for clients on multiple brokers
    - testClusterURIOptionsStrip: same polling for client rebalance
    - testClusterConnectedBeforeClients: poll for bridge formation, then poll 
for client failover after broker stop
    
    This also fixes AutoFailoverClusterTest which inherits these methods.
---
 .../transport/failover/FailoverClusterTest.java    | 67 ++++++++++++++--------
 1 file changed, 42 insertions(+), 25 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
index e031308b89..48e3901bfa 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import jakarta.jms.Connection;
 import jakarta.jms.MessageConsumer;
 import jakarta.jms.Queue;
@@ -31,6 +32,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
 
 public class FailoverClusterTest extends TestCase {
 
@@ -53,12 +55,8 @@ public class FailoverClusterTest extends TestCase {
         if (brokerB == null) {
             brokerB = createBrokerB(getBindAddress());
         }
-        Thread.sleep(3000);
-        Set<String> set = new HashSet<String>();
-        for (ActiveMQConnection c : connections) {
-            set.add(c.getTransportChannel().getRemoteAddress());
-        }
-        assertTrue(set.size() > 1);
+        assertTrue("clients should connect to multiple brokers",
+            waitForClientRebalance(connections, 2));
     }
 
     public void testClusterURIOptionsStrip() throws Exception {
@@ -67,12 +65,8 @@ public class FailoverClusterTest extends TestCase {
             // add in server side only url param, should not be propagated
             brokerB = createBrokerB(getBindAddress() + 
"?transport.closeAsync=false");
         }
-        Thread.sleep(3000);
-        Set<String> set = new HashSet<String>();
-        for (ActiveMQConnection c : connections) {
-            set.add(c.getTransportChannel().getRemoteAddress());
-        }
-        assertTrue(set.size() > 1);
+        assertTrue("clients should connect to multiple brokers",
+            waitForClientRebalance(connections, 2));
     }
 
     public void testClusterConnectedBeforeClients() throws Exception {
@@ -80,17 +74,26 @@ public class FailoverClusterTest extends TestCase {
         if (brokerB == null) {
             brokerB = createBrokerB(getBindAddress());
         }
-        Thread.sleep(5000);
+
+        assertTrue("bridge should form between brokers",
+            Wait.waitFor(() -> 
!brokerB.getNetworkConnectors().get(0).activeBridges().isEmpty(),
+                TimeUnit.SECONDS.toMillis(15), 
TimeUnit.MILLISECONDS.toMillis(500)));
+
         createClients();
-        Thread.sleep(2000);
+
+        final URI brokerBURI = new 
URI(brokerB.getTransportConnectors().get(0).getPublishableConnectString());
         brokerA.stop();
-        Thread.sleep(2000);
 
-        URI brokerBURI = new URI( 
brokerB.getTransportConnectors().get(0).getPublishableConnectString());
-        for (ActiveMQConnection c : connections) {
-            String addr = c.getTransportChannel().getRemoteAddress();
-            assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
-        }
+        assertTrue("all clients should failover to broker B",
+            Wait.waitFor(() -> {
+                for (final ActiveMQConnection c : connections) {
+                    final String addr = 
c.getTransportChannel().getRemoteAddress();
+                    if (addr == null || addr.indexOf("" + 
brokerBURI.getPort()) <= 0) {
+                        return false;
+                    }
+                }
+                return true;
+            }, TimeUnit.SECONDS.toMillis(15), 
TimeUnit.MILLISECONDS.toMillis(500)));
     }
 
     @Override
@@ -154,14 +157,28 @@ public class FailoverClusterTest extends TestCase {
 
     @SuppressWarnings("unused")
     protected void createClients() throws Exception {
-        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(clientUrl);
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(clientUrl);
         for (int i = 0; i < NUMBER; i++) {
-            ActiveMQConnection c = (ActiveMQConnection) 
factory.createConnection();
+            final ActiveMQConnection c = (ActiveMQConnection) 
factory.createConnection();
             c.start();
-            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = s.createQueue(getClass().getName());
-            MessageConsumer consumer = s.createConsumer(queue);
+            final Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Queue queue = s.createQueue(getClass().getName());
+            final MessageConsumer consumer = s.createConsumer(queue);
             connections.add(c);
         }
     }
+
+    private static boolean waitForClientRebalance(final 
List<ActiveMQConnection> connections,
+                                                  final int minBrokerCount) 
throws Exception {
+        return Wait.waitFor(() -> {
+            final Set<String> set = new HashSet<>();
+            for (final ActiveMQConnection c : connections) {
+                final String addr = c.getTransportChannel().getRemoteAddress();
+                if (addr != null) {
+                    set.add(addr);
+                }
+            }
+            return set.size() >= minBrokerCount;
+        }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to