This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new f9fb06ff85 Replace Thread.sleep with Wait.waitFor in
FailoverClusterTest (#1679)
f9fb06ff85 is described below
commit f9fb06ff859b8bbc3d305bba5e3b546ad2165a43
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Feb 17 16:04:20 2026 +0100
Replace Thread.sleep with Wait.waitFor in FailoverClusterTest (#1679)
Thread.sleep(3000) is unreliable on slow CI machines for waiting on cluster
formation and client rebalancing. Replace all sleep-based synchronization with
Wait.waitFor polling:
- testClusterConnectedAfterClients: poll for clients on multiple brokers
- testClusterURIOptionsStrip: same polling for client rebalance
- testClusterConnectedBeforeClients: poll for bridge formation, then poll
for client failover after broker stop
This also fixes AutoFailoverClusterTest which inherits these methods.
---
.../transport/failover/FailoverClusterTest.java | 67 ++++++++++++++--------
1 file changed, 42 insertions(+), 25 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
index e031308b89..48e3901bfa 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Queue;
@@ -31,6 +32,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
public class FailoverClusterTest extends TestCase {
@@ -53,12 +55,8 @@ public class FailoverClusterTest extends TestCase {
if (brokerB == null) {
brokerB = createBrokerB(getBindAddress());
}
- Thread.sleep(3000);
- Set<String> set = new HashSet<String>();
- for (ActiveMQConnection c : connections) {
- set.add(c.getTransportChannel().getRemoteAddress());
- }
- assertTrue(set.size() > 1);
+ assertTrue("clients should connect to multiple brokers",
+ waitForClientRebalance(connections, 2));
}
public void testClusterURIOptionsStrip() throws Exception {
@@ -67,12 +65,8 @@ public class FailoverClusterTest extends TestCase {
// add in server side only url param, should not be propagated
brokerB = createBrokerB(getBindAddress() +
"?transport.closeAsync=false");
}
- Thread.sleep(3000);
- Set<String> set = new HashSet<String>();
- for (ActiveMQConnection c : connections) {
- set.add(c.getTransportChannel().getRemoteAddress());
- }
- assertTrue(set.size() > 1);
+ assertTrue("clients should connect to multiple brokers",
+ waitForClientRebalance(connections, 2));
}
public void testClusterConnectedBeforeClients() throws Exception {
@@ -80,17 +74,26 @@ public class FailoverClusterTest extends TestCase {
if (brokerB == null) {
brokerB = createBrokerB(getBindAddress());
}
- Thread.sleep(5000);
+
+ assertTrue("bridge should form between brokers",
+ Wait.waitFor(() ->
!brokerB.getNetworkConnectors().get(0).activeBridges().isEmpty(),
+ TimeUnit.SECONDS.toMillis(15),
TimeUnit.MILLISECONDS.toMillis(500)));
+
createClients();
- Thread.sleep(2000);
+
+ final URI brokerBURI = new
URI(brokerB.getTransportConnectors().get(0).getPublishableConnectString());
brokerA.stop();
- Thread.sleep(2000);
- URI brokerBURI = new URI(
brokerB.getTransportConnectors().get(0).getPublishableConnectString());
- for (ActiveMQConnection c : connections) {
- String addr = c.getTransportChannel().getRemoteAddress();
- assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
- }
+ assertTrue("all clients should failover to broker B",
+ Wait.waitFor(() -> {
+ for (final ActiveMQConnection c : connections) {
+ final String addr =
c.getTransportChannel().getRemoteAddress();
+ if (addr == null || addr.indexOf("" +
brokerBURI.getPort()) <= 0) {
+ return false;
+ }
+ }
+ return true;
+ }, TimeUnit.SECONDS.toMillis(15),
TimeUnit.MILLISECONDS.toMillis(500)));
}
@Override
@@ -154,14 +157,28 @@ public class FailoverClusterTest extends TestCase {
@SuppressWarnings("unused")
protected void createClients() throws Exception {
- ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(clientUrl);
+ final ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(clientUrl);
for (int i = 0; i < NUMBER; i++) {
- ActiveMQConnection c = (ActiveMQConnection)
factory.createConnection();
+ final ActiveMQConnection c = (ActiveMQConnection)
factory.createConnection();
c.start();
- Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = s.createQueue(getClass().getName());
- MessageConsumer consumer = s.createConsumer(queue);
+ final Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = s.createQueue(getClass().getName());
+ final MessageConsumer consumer = s.createConsumer(queue);
connections.add(c);
}
}
+
+ private static boolean waitForClientRebalance(final
List<ActiveMQConnection> connections,
+ final int minBrokerCount)
throws Exception {
+ return Wait.waitFor(() -> {
+ final Set<String> set = new HashSet<>();
+ for (final ActiveMQConnection c : connections) {
+ final String addr = c.getTransportChannel().getRemoteAddress();
+ if (addr != null) {
+ set.add(addr);
+ }
+ }
+ return set.size() >= minBrokerCount;
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact