This is an automated email from the ASF dual-hosted git repository.

jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 1357cd81e9 Multi broker setup hardening (#1627)
1357cd81e9 is described below

commit 1357cd81e99859fa7dee8632d6b691875c20509b
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Feb 4 15:20:43 2026 +0100

    Multi broker setup hardening (#1627)
    
    * Use ephemeral ports for broker creation in 
DurableFiveBrokerNetworkBridgeTest to prevent port conflicts during parallel 
test execution
    
    Enhance broker startup reliability by replacing Thread.sleep with 
Wait.waitFor for transport connector readiness
    
    * Improve broker readiness check in JmsMultipleBrokersTestSupport to skip 
VM-only brokers and enhance reliability
    
    * Fix startAllBrokers() transport connector check for VM-only and secured 
brokers
    - Skip readiness check for VM transport connectors (no network to verify)
    - Treat JMSSecurityException as ready (broker accepts connections, just 
enforces auth)
    - Fix AdvisoryViaNetworkTest race conditions with waitForConsumerOnBroker()
---
 .../activemq/JmsMultipleBrokersTestSupport.java    | 38 ++++++++++++++++++++--
 .../DurableFiveBrokerNetworkBridgeTest.java        | 31 +++++++-----------
 .../activemq/usecases/AdvisoryViaNetworkTest.java  | 27 +++++++++++++++
 3 files changed, 74 insertions(+), 22 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
index 2ebbae9845..25c7749a09 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
@@ -271,14 +271,46 @@ public class JmsMultipleBrokersTestSupport extends 
CombinationTestSupport {
     }
 
     protected void startAllBrokers() throws Exception {
-        Collection<BrokerItem> brokerList = brokers.values();
+        final Collection<BrokerItem> brokerList = brokers.values();
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
-            BrokerService broker = i.next().broker;
+            final BrokerService broker = i.next().broker;
             broker.start();
             broker.waitUntilStarted();
         }
 
-        Thread.sleep(maxSetupTime);
+        // Wait for all brokers with TCP transport connectors to be ready to 
accept connections
+        // instead of using Thread.sleep which is unreliable across different 
machines.
+        // Skip this check for VM-only brokers since VM transport doesn't 
require network readiness
+        // and some tests intentionally configure brokers to reject 
connections.
+        for (final BrokerItem brokerItem : brokerList) {
+            final BrokerService broker = brokerItem.broker;
+            // Only check transport connector readiness for TCP/network 
connectors
+            // (skip VM-only brokers which don't require network readiness 
checks)
+            final boolean hasTcpConnector = 
broker.getTransportConnectors().stream()
+                .anyMatch(tc -> {
+                    final String scheme = tc.getUri().getScheme();
+                    return scheme != null && !scheme.equals("vm");
+                });
+            if (hasTcpConnector) {
+                assertTrue("Broker " + broker.getBrokerName() + " transport 
connectors ready",
+                    Wait.waitFor(() -> {
+                        if (!broker.isStarted()) {
+                            return false;
+                        }
+                        // Try to create a test connection to verify transport 
is accepting connections
+                        try (final Connection testConn = 
brokerItem.createConnection()) {
+                            return true;
+                        } catch (final jakarta.jms.JMSSecurityException e) {
+                            // Security exception means the broker IS 
accepting connections,
+                            // it's just enforcing authentication - consider 
it ready
+                            return true;
+                        } catch (final Exception e) {
+                            LOG.debug("Broker {} not ready yet: {}", 
broker.getBrokerName(), e.getMessage());
+                            return false;
+                        }
+                    }, TimeUnit.SECONDS.toMillis(30), 
TimeUnit.SECONDS.toMillis(1)));
+            }
+        }
     }
 
     protected BrokerService createBroker(String brokerName) throws Exception {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 6d3cc5bc00..723c854027 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -313,9 +313,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         // Setup consumers
         Session ses = createSession("Broker_A_A");
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        Thread.sleep(1000);
 
-        // let consumers propagate around the network
+        // let consumers propagate around the network 
(assertNCDurableSubsCount waits internally)
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
@@ -328,9 +327,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         //bring online a consumer on the other side
         Session ses2 = createSession("Broker_E_E");
         MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
-        Thread.sleep(1000);
 
-        //there will be 2 network durables, 1 for each direction of the bridge
+        //there will be 2 network durables, 1 for each direction of the bridge 
(assertNCDurableSubsCount waits internally)
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
@@ -384,16 +382,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
         MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
-        Thread.sleep(1000);
 
-        // let consumers propagate around the network
+        // let consumers propagate around the network 
(assertNCDurableSubsCount waits internally)
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
         MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
-        Thread.sleep(1000);
 
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
@@ -759,7 +755,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         Session ses2 = createSession("Broker_E_E");
         MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
         MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
-        Thread.sleep(1000);
 
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
@@ -768,9 +763,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
 
         startNetworkConnectors(nc1, nc2, nc3, nc4);
-        Thread.sleep(1000);
 
-        // Check that the correct network durables exist
+        // Check that the correct network durables exist 
(assertNCDurableSubsCount waits internally)
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 
expected.get(0));
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 
expected.get(1));
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 
expected.get(2));
@@ -795,12 +789,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
             // to test sync works ok. Things should work for all cases both 
dynamicOnly
             // false and true because TTL info still exits and consumers are 
online
             stopNetworkConnectors(nc1, nc2, nc3, nc4);
-            Thread.sleep(1000);
             startNetworkConnectors(nc1, nc2, nc3, nc4);
-            Thread.sleep(1000);
         }
 
-        // after restarting the bridges, check sync/demand are correct
+        // after restarting the bridges, check sync/demand are correct 
(assertNCDurableSubsCount waits internally)
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 
expected.get(0));
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 
expected.get(1));
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 
expected.get(2));
@@ -846,12 +838,13 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         super.setAutoFail(true);
         super.setUp();
         deletePersistentMessagesOnStartup = true;
-        String options = new String("?persistent=true&useJmx=false");
-        createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + 
options));
+        final String options = "?persistent=true&useJmx=false";
+        // Use ephemeral ports (0) to avoid port conflicts when tests run in 
parallel
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_A_A" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_B_B" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_C_C" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_D_D" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_E_E" + 
options));
     }
 
     @Override
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index 65d011bcd1..cc0a814b2f 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -104,6 +104,10 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
         createConsumer("A", topic1);
         createConsumer("A", new ActiveMQTopic("A.FOO2"));
 
+        // Wait for network bridge to propagate consumers to broker B
+        waitForConsumerOnBroker(brokerB, advisoryTopic);
+        waitForConsumerOnBroker(brokerB, topic1);
+
         //verify that brokerB's advisory prefetch is 10 but normal topic 
prefetch is 1
         assertEquals(10, 
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
         assertEquals(1, 
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -137,6 +141,10 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
         createConsumer("A", topic1);
         createConsumer("A", new ActiveMQTopic("A.FOO2"));
 
+        // Wait for network bridge to propagate consumers to broker B
+        waitForConsumerOnBroker(brokerB, advisoryTopic);
+        waitForConsumerOnBroker(brokerB, topic1);
+
         //verify that brokerB's advisory prefetch is 1 but normal topic 
prefetch is 10
         assertEquals(1, 
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
         assertEquals(10, 
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -168,6 +176,10 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
         createConsumer("A", topic1);
         createConsumer("A", new ActiveMQTopic("A.FOO2"));
 
+        // Wait for network bridge to propagate consumers to broker B
+        waitForConsumerOnBroker(brokerB, advisoryTopic);
+        waitForConsumerOnBroker(brokerB, topic1);
+
         //verify that both consumers have a prefetch of 10
         assertEquals(10, 
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
         assertEquals(10, 
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -198,6 +210,10 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
         createConsumer("A", topic1);
         createConsumer("A", new ActiveMQTopic("A.FOO2"));
 
+        // Wait for network bridge to propagate consumers to broker B
+        waitForConsumerOnBroker(brokerB, advisoryTopic);
+        waitForConsumerOnBroker(brokerB, topic1);
+
         //verify that both consumers have a prefetch of 1
         assertEquals(1, 
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
         assertEquals(1, 
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -392,4 +408,15 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
         super.setUp();
     }
 
+    private void waitForConsumerOnBroker(final BrokerService broker, final 
ActiveMQTopic topic) throws Exception {
+        assertTrue("Consumer on " + broker.getBrokerName() + " for " + topic,
+            Wait.waitFor(() -> {
+                try {
+                    return 
!broker.getDestination(topic).getConsumers().isEmpty();
+                } catch (final Exception e) {
+                    return false;
+                }
+            }, 30000, 100));
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to