This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 1357cd81e9 Multi broker setup hardening (#1627)
1357cd81e9 is described below
commit 1357cd81e99859fa7dee8632d6b691875c20509b
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Feb 4 15:20:43 2026 +0100
Multi broker setup hardening (#1627)
* Use ephemeral ports for broker creation in
DurableFiveBrokerNetworkBridgeTest to prevent port conflicts during parallel
test execution
Enhance broker startup reliability by replacing Thread.sleep with
Wait.waitFor for transport connector readiness
* Improve broker readiness check in JmsMultipleBrokersTestSupport to skip
VM-only brokers and enhance reliability
* Fix startAllBrokers() transport connector check for VM-only and secured
brokers
- Skip readiness check for VM transport connectors (no network to verify)
- Treat JMSSecurityException as ready (broker accepts connections, just
enforces auth)
- Fix AdvisoryViaNetworkTest race conditions with waitForConsumerOnBroker()
---
.../activemq/JmsMultipleBrokersTestSupport.java | 38 ++++++++++++++++++++--
.../DurableFiveBrokerNetworkBridgeTest.java | 31 +++++++-----------
.../activemq/usecases/AdvisoryViaNetworkTest.java | 27 +++++++++++++++
3 files changed, 74 insertions(+), 22 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
index 2ebbae9845..25c7749a09 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
@@ -271,14 +271,46 @@ public class JmsMultipleBrokersTestSupport extends
CombinationTestSupport {
}
protected void startAllBrokers() throws Exception {
- Collection<BrokerItem> brokerList = brokers.values();
+ final Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
- BrokerService broker = i.next().broker;
+ final BrokerService broker = i.next().broker;
broker.start();
broker.waitUntilStarted();
}
- Thread.sleep(maxSetupTime);
+ // Wait for all brokers with TCP transport connectors to be ready to
accept connections
+ // instead of using Thread.sleep which is unreliable across different
machines.
+ // Skip this check for VM-only brokers since VM transport doesn't
require network readiness
+ // and some tests intentionally configure brokers to reject
connections.
+ for (final BrokerItem brokerItem : brokerList) {
+ final BrokerService broker = brokerItem.broker;
+ // Only check transport connector readiness for TCP/network
connectors
+ // (skip VM-only brokers which don't require network readiness
checks)
+ final boolean hasTcpConnector =
broker.getTransportConnectors().stream()
+ .anyMatch(tc -> {
+ final String scheme = tc.getUri().getScheme();
+ return scheme != null && !scheme.equals("vm");
+ });
+ if (hasTcpConnector) {
+ assertTrue("Broker " + broker.getBrokerName() + " transport
connectors ready",
+ Wait.waitFor(() -> {
+ if (!broker.isStarted()) {
+ return false;
+ }
+ // Try to create a test connection to verify transport
is accepting connections
+ try (final Connection testConn =
brokerItem.createConnection()) {
+ return true;
+ } catch (final jakarta.jms.JMSSecurityException e) {
+ // Security exception means the broker IS
accepting connections,
+ // it's just enforcing authentication - consider
it ready
+ return true;
+ } catch (final Exception e) {
+ LOG.debug("Broker {} not ready yet: {}",
broker.getBrokerName(), e.getMessage());
+ return false;
+ }
+ }, TimeUnit.SECONDS.toMillis(30),
TimeUnit.SECONDS.toMillis(1)));
+ }
+ }
}
protected BrokerService createBroker(String brokerName) throws Exception {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 6d3cc5bc00..723c854027 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -313,9 +313,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
// Setup consumers
Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
- Thread.sleep(1000);
- // let consumers propagate around the network
+ // let consumers propagate around the network
(assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
@@ -328,9 +327,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
//bring online a consumer on the other side
Session ses2 = createSession("Broker_E_E");
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
- Thread.sleep(1000);
- //there will be 2 network durables, 1 for each direction of the bridge
+ //there will be 2 network durables, 1 for each direction of the bridge
(assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
@@ -384,16 +382,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
- Thread.sleep(1000);
- // let consumers propagate around the network
+ // let consumers propagate around the network
(assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
- Thread.sleep(1000);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
@@ -759,7 +755,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
Session ses2 = createSession("Broker_E_E");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
- Thread.sleep(1000);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
@@ -768,9 +763,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
startNetworkConnectors(nc1, nc2, nc3, nc4);
- Thread.sleep(1000);
- // Check that the correct network durables exist
+ // Check that the correct network durables exist
(assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest,
expected.get(0));
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest,
expected.get(1));
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest,
expected.get(2));
@@ -795,12 +789,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
// to test sync works ok. Things should work for all cases both
dynamicOnly
// false and true because TTL info still exits and consumers are
online
stopNetworkConnectors(nc1, nc2, nc3, nc4);
- Thread.sleep(1000);
startNetworkConnectors(nc1, nc2, nc3, nc4);
- Thread.sleep(1000);
}
- // after restarting the bridges, check sync/demand are correct
+ // after restarting the bridges, check sync/demand are correct
(assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest,
expected.get(0));
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest,
expected.get(1));
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest,
expected.get(2));
@@ -846,12 +838,13 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
super.setAutoFail(true);
super.setUp();
deletePersistentMessagesOnStartup = true;
- String options = new String("?persistent=true&useJmx=false");
- createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" +
options));
- createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" +
options));
- createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" +
options));
- createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" +
options));
- createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" +
options));
+ final String options = "?persistent=true&useJmx=false";
+ // Use ephemeral ports (0) to avoid port conflicts when tests run in
parallel
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_A_A" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_B_B" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_C_C" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_D_D" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_E_E" +
options));
}
@Override
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index 65d011bcd1..cc0a814b2f 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -104,6 +104,10 @@ public class AdvisoryViaNetworkTest extends
JmsMultipleBrokersTestSupport {
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
+ // Wait for network bridge to propagate consumers to broker B
+ waitForConsumerOnBroker(brokerB, advisoryTopic);
+ waitForConsumerOnBroker(brokerB, topic1);
+
//verify that brokerB's advisory prefetch is 10 but normal topic
prefetch is 1
assertEquals(10,
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(1,
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -137,6 +141,10 @@ public class AdvisoryViaNetworkTest extends
JmsMultipleBrokersTestSupport {
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
+ // Wait for network bridge to propagate consumers to broker B
+ waitForConsumerOnBroker(brokerB, advisoryTopic);
+ waitForConsumerOnBroker(brokerB, topic1);
+
//verify that brokerB's advisory prefetch is 1 but normal topic
prefetch is 10
assertEquals(1,
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(10,
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -168,6 +176,10 @@ public class AdvisoryViaNetworkTest extends
JmsMultipleBrokersTestSupport {
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
+ // Wait for network bridge to propagate consumers to broker B
+ waitForConsumerOnBroker(brokerB, advisoryTopic);
+ waitForConsumerOnBroker(brokerB, topic1);
+
//verify that both consumers have a prefetch of 10
assertEquals(10,
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(10,
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -198,6 +210,10 @@ public class AdvisoryViaNetworkTest extends
JmsMultipleBrokersTestSupport {
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
+ // Wait for network bridge to propagate consumers to broker B
+ waitForConsumerOnBroker(brokerB, advisoryTopic);
+ waitForConsumerOnBroker(brokerB, topic1);
+
//verify that both consumers have a prefetch of 1
assertEquals(1,
brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(1,
brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
@@ -392,4 +408,15 @@ public class AdvisoryViaNetworkTest extends
JmsMultipleBrokersTestSupport {
super.setUp();
}
+ private void waitForConsumerOnBroker(final BrokerService broker, final
ActiveMQTopic topic) throws Exception {
+ assertTrue("Consumer on " + broker.getBrokerName() + " for " + topic,
+ Wait.waitFor(() -> {
+ try {
+ return
!broker.getDestination(topic).getConsumers().isEmpty();
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 30000, 100));
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact