This is an automated email from the ASF dual-hosted git repository.

jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 902b85cc5b Support ephemeral ports in network config and Spring (#1674)
902b85cc5b is described below

commit 902b85cc5b19e9e295813fc118f04fc239b6ff36
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Thu Feb 19 16:59:34 2026 +0100

    Support ephemeral ports in network config and Spring (#1674)
---
 .../apache/activemq/network/BaseNetworkTest.java   | 101 +++++++++++++++--
 .../apache/activemq/network/DuplexNetworkTest.java |   7 +-
 ...callyIncludedDestinationsDuplexNetworkTest.java |   7 +-
 .../activemq/network/MulticastNetworkTest.java     |   5 +
 .../activemq/network/NetworkFailoverTest.java      | 102 +++++++++++-------
 .../apache/activemq/perf/NetworkedSyncTest.java    | 119 ++++++++++-----------
 .../org/apache/activemq/broker/spring.xml          |  13 +--
 .../activemq/network/localBroker-ephemeral.xml     |  35 ++++++
 .../activemq/network/remoteBroker-ephemeral.xml    |  36 +++++++
 .../org/apache/activemq/perf/networkSync.xml       |  12 +--
 10 files changed, 307 insertions(+), 130 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
index 1af6636e92..d72dc28177 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
@@ -16,13 +16,22 @@
  */
 package org.apache.activemq.network;
 
+import static org.junit.Assert.assertTrue;
+
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import jakarta.jms.Connection;
 import jakarta.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.junit.After;
 import org.junit.Before;
@@ -59,11 +68,15 @@ public class BaseNetworkTest {
         if(remoteConnection != null)
             remoteConnection.close();
 
-        if(localBroker != null)
+        if(localBroker != null) {
             localBroker.stop();
+            localBroker.waitUntilStopped();
+        }
 
-        if(remoteBroker != null)
+        if(remoteBroker != null) {
             remoteBroker.stop();
+            remoteBroker.waitUntilStopped();
+        }
     }
 
     protected void doSetUp(boolean deleteAllMessages) throws Exception {
@@ -75,14 +88,25 @@ public class BaseNetworkTest {
         localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         localBroker.start();
         localBroker.waitUntilStarted();
-        URI localURI = localBroker.getVmConnectorURI();
+
+        // Programmatically add network connectors using the actual assigned 
ephemeral ports.
+        // Use startNetworkConnector() instead of connector.start() to ensure 
proper JMX MBean registration.
+        addNetworkConnectors();
+
+        // Wait for both network bridges to be FULLY started (advisory 
consumers registered).
+        // activeBridges().isEmpty() is NOT sufficient because bridges are 
added to the map
+        // before start() completes asynchronously. We must wait for the 
startedLatch.
+        waitForBridgeFullyStarted(localBroker, "Local");
+        waitForBridgeFullyStarted(remoteBroker, "Remote");
+
+        final URI localURI = localBroker.getVmConnectorURI();
         ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(localURI);
         fac.setAlwaysSyncSend(true);
         fac.setDispatchAsync(false);
         localConnection = fac.createConnection();
         localConnection.setClientID("clientId");
         localConnection.start();
-        URI remoteURI = remoteBroker.getVmConnectorURI();
+        final URI remoteURI = remoteBroker.getVmConnectorURI();
         fac = new ActiveMQConnectionFactory(remoteURI);
         remoteConnection = fac.createConnection();
         remoteConnection.setClientID("clientId");
@@ -91,21 +115,76 @@ public class BaseNetworkTest {
         remoteSession = remoteConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
     }
 
+    /**
+     * Programmatically adds network connectors between the local and remote 
brokers
+     * using the actual assigned ephemeral ports. This avoids hardcoding ports 
in XML
+     * config files which causes port conflicts on CI.
+     */
+    protected void addNetworkConnectors() throws Exception {
+        final URI remoteConnectURI = 
remoteBroker.getTransportConnectors().get(0).getConnectUri();
+        final URI localConnectURI = 
localBroker.getTransportConnectors().get(0).getConnectUri();
+
+        // Local -> Remote network connector (matches the original 
localBroker.xml config)
+        final DiscoveryNetworkConnector localToRemote = new 
DiscoveryNetworkConnector(
+                new URI("static:(" + remoteConnectURI + ")"));
+        localToRemote.setName("networkConnector");
+        localToRemote.setDynamicOnly(false);
+        localToRemote.setConduitSubscriptions(true);
+        localToRemote.setDecreaseNetworkConsumerPriority(false);
+
+        final List<ActiveMQDestination> dynamicallyIncluded = new 
ArrayList<>();
+        dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
+        dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
+        localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);
+
+        final List<ActiveMQDestination> excluded = new ArrayList<>();
+        excluded.add(new ActiveMQQueue("exclude.test.foo"));
+        excluded.add(new ActiveMQTopic("exclude.test.bar"));
+        localToRemote.setExcludedDestinations(excluded);
+
+        localBroker.addNetworkConnector(localToRemote);
+        // startNetworkConnector handles JMX MBean registration and connector 
startup
+        localBroker.startNetworkConnector(localToRemote, null);
+
+        // Remote -> Local network connector (matches the original 
remoteBroker.xml config)
+        final DiscoveryNetworkConnector remoteToLocal = new 
DiscoveryNetworkConnector(
+                new URI("static:(" + localConnectURI + ")"));
+        remoteToLocal.setName("networkConnector");
+        remoteBroker.addNetworkConnector(remoteToLocal);
+        remoteBroker.startNetworkConnector(remoteToLocal, null);
+    }
+
+    protected void waitForBridgeFullyStarted(final BrokerService broker, final 
String label) throws Exception {
+        // Skip if broker has no network connectors (e.g., duplex target 
broker receives
+        // bridge connections but doesn't initiate them)
+        if (broker.getNetworkConnectors().isEmpty()) {
+            return;
+        }
+        assertTrue(label + " broker bridge should be fully started", 
Wait.waitFor(() -> {
+            if 
(broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+                return false;
+            }
+            final NetworkBridge bridge = 
broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+            if (bridge instanceof DemandForwardingBridgeSupport) {
+                return ((DemandForwardingBridgeSupport) 
bridge).startedLatch.getCount() == 0;
+            }
+            return true;
+        }, TimeUnit.SECONDS.toMillis(10), 100));
+    }
+
     protected String getRemoteBrokerURI() {
-        return "org/apache/activemq/network/remoteBroker.xml";
+        return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
     }
 
     protected String getLocalBrokerURI() {
-        return "org/apache/activemq/network/localBroker.xml";
+        return "org/apache/activemq/network/localBroker-ephemeral.xml";
     }
 
     protected BrokerService createBroker(String uri) throws Exception {
-        Resource resource = new ClassPathResource(uri);
-        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
-        resource = new ClassPathResource(uri);
-        factory = new BrokerFactoryBean(resource);
+        final Resource resource = new ClassPathResource(uri);
+        final BrokerFactoryBean factory = new BrokerFactoryBean(resource);
         factory.afterPropertiesSet();
-        BrokerService result = factory.getBroker();
+        final BrokerService result = factory.getBroker();
         return result;
     }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
index e97d87b696..3006c9c9cc 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
@@ -40,12 +40,17 @@ public class DuplexNetworkTest extends SimpleNetworkTest {
 
     @Override
     protected BrokerService createRemoteBroker() throws Exception {
-        BrokerService broker = new BrokerService();
+        final BrokerService broker = new BrokerService();
         broker.setBrokerName("remoteBroker");
         
broker.addConnector("tcp://localhost:61617?transport.connectAttemptTimeout=2000");
         return broker;
     }
 
+    @Override
+    protected void addNetworkConnectors() throws Exception {
+        // No-op: duplex network connector is already defined in 
duplexLocalBroker.xml
+    }
+
     @Test
     public void testTempQueues() throws Exception {
         TemporaryQueue temp = localSession.createTemporaryQueue();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
index 39018a18fc..d52836e4f6 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -48,12 +48,17 @@ public class 
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
 
     @Override
     protected BrokerService createRemoteBroker() throws Exception {
-        BrokerService broker = new BrokerService();
+        final BrokerService broker = new BrokerService();
         broker.setBrokerName("remoteBroker");
         broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
         return broker;
     }
 
+    @Override
+    protected void addNetworkConnectors() throws Exception {
+        // No-op: duplex network connector is already defined in 
duplexDynamicIncludedDestLocalBroker.xml
+    }
+
     // we have to override this, because with dynamicallyIncludedDestinations 
working properly
     // (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get 
request/response
     // with temps working (there is no wild card like there is for 
staticallyIncludedDest)
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
index 7813b07635..4eb65880b5 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
@@ -21,6 +21,11 @@ package org.apache.activemq.network;
  */
 public class MulticastNetworkTest extends SimpleNetworkTest {
 
+    @Override
+    protected void addNetworkConnectors() throws Exception {
+        // No-op: multicast network connectors are already defined in the XML 
configs
+    }
+
     protected String getRemoteBrokerURI() {
         return "org/apache/activemq/network/multicast/remoteBroker.xml";
     }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
index fa7efce898..f086753289 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.network;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import jakarta.jms.Connection;
@@ -25,7 +26,6 @@ import jakarta.jms.Destination;
 import jakarta.jms.JMSException;
 import jakarta.jms.Message;
 import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
 import jakarta.jms.MessageProducer;
 import jakarta.jms.Queue;
 import jakarta.jms.Session;
@@ -39,6 +39,7 @@ import org.apache.activemq.DestinationDoesNotExistException;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.failover.FailoverTransport;
 import org.apache.activemq.xbean.BrokerFactoryBean;
@@ -68,32 +69,28 @@ public class NetworkFailoverTest extends TestCase {
     public void testRequestReply() throws Exception {
         final MessageProducer remoteProducer = 
remoteSession.createProducer(null);
         MessageConsumer remoteConsumer = 
remoteSession.createConsumer(included);
-        remoteConsumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message msg) {
-                final TextMessage textMsg = (TextMessage)msg;
+        remoteConsumer.setMessageListener(msg -> {
+            final TextMessage textMsg = (TextMessage) msg;
+            try {
+                final String payload = "REPLY: " + textMsg.getText() + ", " + 
textMsg.getJMSMessageID();
+                final Destination replyTo = msg.getJMSReplyTo();
+                textMsg.clearBody();
+                textMsg.setText(payload);
+                LOG.info("*** Sending response: {}", textMsg.getText());
+                remoteProducer.send(replyTo, textMsg);
+                LOG.info("replied with: " + textMsg.getJMSMessageID());
+
+            } catch (DestinationDoesNotExistException expected) {
+                // been removed but not yet recreated
+                replyToNonExistDest.incrementAndGet();
                 try {
-                    String payload = "REPLY: " + textMsg.getText() + ", " + 
textMsg.getJMSMessageID();
-                    Destination replyTo;
-                    replyTo = msg.getJMSReplyTo();
-                    textMsg.clearBody();
-                    textMsg.setText(payload);
-                    LOG.info("*** Sending response: {}", textMsg.getText());
-                    remoteProducer.send(replyTo, textMsg);
-                    LOG.info("replied with: " + textMsg.getJMSMessageID());
-
-                } catch (DestinationDoesNotExistException expected) {
-                    // been removed but not yet recreated
-                    replyToNonExistDest.incrementAndGet();
-                    try {
-                        LOG.info("NED: " + textMsg.getJMSMessageID());
-                    } catch (JMSException e) {
-                        e.printStackTrace();
-                    };
-                } catch (Exception e) {
-                    LOG.warn("*** Responder listener caught exception: ", e);
+                    LOG.info("NED: " + textMsg.getJMSMessageID());
+                } catch (JMSException e) {
                     e.printStackTrace();
                 }
+            } catch (Exception e) {
+                LOG.warn("*** Responder listener caught exception: ", e);
+                e.printStackTrace();
             }
         });
 
@@ -104,16 +101,13 @@ public class NetworkFailoverTest extends TestCase {
 
         // track remote dlq for forward failures
         MessageConsumer dlqconsumer = remoteSession.createConsumer(new 
ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
-        dlqconsumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                try {
-                    LOG.info("dlq " + message.getJMSMessageID());
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
-                remoteDLQCount.incrementAndGet();
+        dlqconsumer.setMessageListener(message -> {
+            try {
+                LOG.info("dlq " + message.getJMSMessageID());
+            } catch (JMSException e) {
+                e.printStackTrace();
             }
+            remoteDLQCount.incrementAndGet();
         });
 
         // allow for consumer infos to perculate arround
@@ -176,25 +170,51 @@ public class NetworkFailoverTest extends TestCase {
         } catch(Exception ex) {}
     }
 
-    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+    protected void doSetUp(final boolean deleteAllMessages) throws Exception {
 
         remoteBroker = createRemoteBroker();
         remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         remoteBroker.setCacheTempDestinations(true);
         remoteBroker.start();
+        remoteBroker.waitUntilStarted();
 
         localBroker = createLocalBroker();
         localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         localBroker.setCacheTempDestinations(true);
         localBroker.start();
-
-        String localURI = "tcp://localhost:61616";
-        String remoteURI = "tcp://localhost:61617";
-        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true");
+        localBroker.waitUntilStarted();
+
+        // Get actual assigned ephemeral ports
+        final URI localConnectURI = 
localBroker.getTransportConnectors().get(0).getConnectUri();
+        final URI remoteConnectURI = 
remoteBroker.getTransportConnectors().get(0).getConnectUri();
+        final String localURI = localConnectURI.toString();
+        final String remoteURI = remoteConnectURI.toString();
+
+        // Add network connectors programmatically using actual ports
+        final DiscoveryNetworkConnector localToRemote = new 
DiscoveryNetworkConnector(
+                new URI("static://(" + remoteURI + ")"));
+        localToRemote.setName("networkConnector");
+        localToRemote.setDynamicOnly(false);
+        localToRemote.setConduitSubscriptions(true);
+        localToRemote.setDecreaseNetworkConsumerPriority(false);
+        localToRemote.setDynamicallyIncludedDestinations(
+                java.util.List.of(new ActiveMQQueue("include.test.foo"), new 
ActiveMQTopic("include.test.bar")));
+        localToRemote.setExcludedDestinations(
+                java.util.List.of(new ActiveMQQueue("exclude.test.foo"), new 
ActiveMQTopic("exclude.test.bar")));
+        localBroker.addNetworkConnector(localToRemote);
+        localBroker.startNetworkConnector(localToRemote, null);
+
+        final DiscoveryNetworkConnector remoteToLocal = new 
DiscoveryNetworkConnector(
+                new URI("static://(" + localURI + ")"));
+        remoteToLocal.setName("networkConnector");
+        remoteBroker.addNetworkConnector(remoteToLocal);
+        remoteBroker.startNetworkConnector(remoteToLocal, null);
+
+        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI + 
")?randomize=false&backup=false&trackMessages=true");
         localConnection = fac.createConnection();
         localConnection.setClientID("local");
         localConnection.start();
-        fac = new ActiveMQConnectionFactory("failover:("+remoteURI + 
","+localURI+")?randomize=false&backup=false&trackMessages=true");
+        fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + 
localURI + ")?randomize=false&backup=false&trackMessages=true");
         fac.setWatchTopicAdvisories(false);
         remoteConnection = fac.createConnection();
         remoteConnection.setClientID("remote");
@@ -205,11 +225,11 @@ public class NetworkFailoverTest extends TestCase {
     }
 
     protected String getRemoteBrokerURI() {
-        return "org/apache/activemq/network/remoteBroker.xml";
+        return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
     }
 
     protected String getLocalBrokerURI() {
-        return "org/apache/activemq/network/localBroker.xml";
+        return "org/apache/activemq/network/localBroker-ephemeral.xml";
     }
 
     protected BrokerService createBroker(String uri) throws Exception {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
index db75e2e594..5b7fefd54d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.perf;
 
+import java.net.URI;
+
 import jakarta.jms.Connection;
 import jakarta.jms.DeliveryMode;
 import jakarta.jms.Destination;
@@ -32,22 +34,22 @@ import junit.textui.TestRunner;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.core.io.ClassPathResource;
 
 
 public class NetworkedSyncTest extends TestCase {
 
     // constants
     public static final int MESSAGE_COUNT = 10000; //100000;
-    public final static String config = 
"org/apache/activemq/perf/networkSync.xml";
-    public final static String broker1URL = "tcp://localhost:61616";
-    public final static String broker2URL = "tcp://localhost:62616";
-    private final String networkConnectorURL = "static://(" + broker2URL + ")";
+    public static final String config = 
"org/apache/activemq/perf/networkSync.xml";
     private static final Logger LOG = 
LoggerFactory.getLogger(NetworkedSyncTest.class);
+
+    // Broker URLs resolved after startup from ephemeral ports
+    static String broker1URL;
+    static String broker2URL;
+
     BrokerService broker1 = null;
     BrokerService broker2 = null;
     NetworkConnector connector = null;
@@ -55,12 +57,12 @@ public class NetworkedSyncTest extends TestCase {
     /**
      * @param name
      */
-    public NetworkedSyncTest(String name) {
+    public NetworkedSyncTest(final String name) {
         super(name);
         LOG.info("Testcase started.");
     }
 
-   public static void main(String args[]) {
+   public static void main(final String[] args) {
        TestRunner.run(NetworkedSyncTest.class);
    }
 
@@ -70,53 +72,51 @@ public class NetworkedSyncTest extends TestCase {
     @Override
     protected void setUp() throws Exception {
         LOG.info("setUp() called.");
-        ClassPathXmlApplicationContext context1 = null;
-        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new 
ClassPathResource(config));
-        assertNotNull(brokerFactory);
 
         /* start up first broker instance */
         try {
-            // resolve broker1
             Thread.currentThread().setContextClassLoader(
                     NetworkedSyncTest.class.getClassLoader());
-            context1 = new ClassPathXmlApplicationContext(config);
+            final ClassPathXmlApplicationContext context1 = new 
ClassPathXmlApplicationContext(config);
             broker1 = (BrokerService) context1.getBean("broker1");
 
-            // start the broker
             if (!broker1.isStarted()) {
                 LOG.info("Broker broker1 not yet started. Kicking it off 
now.");
                 broker1.start();
-            } else {
-                LOG.info("Broker broker1 already started. Not kicking it off a 
second time.");
-                broker1.waitUntilStopped();
+                broker1.waitUntilStarted();
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.error("Error: " + e.getMessage());
             throw e;
-            // brokerService.stop();
         }
 
         /* start up second broker instance */
         try {
             Thread.currentThread().setContextClassLoader(
                     NetworkedSyncTest.class.getClassLoader());
-            context1 = new ClassPathXmlApplicationContext(config);
-            broker2 = (BrokerService) context1.getBean("broker2");
+            final ClassPathXmlApplicationContext context2 = new 
ClassPathXmlApplicationContext(config);
+            broker2 = (BrokerService) context2.getBean("broker2");
 
-            // start the broker
             if (!broker2.isStarted()) {
                 LOG.info("Broker broker2 not yet started. Kicking it off 
now.");
                 broker2.start();
-            } else {
-                LOG.info("Broker broker2 already started. Not kicking it off a 
second time.");
-                broker2.waitUntilStopped();
+                broker2.waitUntilStarted();
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.error("Error: " + e.getMessage());
             throw e;
         }
 
-        // setup network connector from broker1 to broker2
+        // Resolve actual broker URLs from ephemeral ports assigned after 
startup
+        final URI broker1ConnectURI = 
broker1.getTransportConnectors().get(0).getConnectUri();
+        final URI broker2ConnectURI = 
broker2.getTransportConnectors().get(0).getConnectUri();
+        broker1URL = broker1ConnectURI.toString();
+        broker2URL = broker2ConnectURI.toString();
+        LOG.info("Broker1 URL: " + broker1URL);
+        LOG.info("Broker2 URL: " + broker2URL);
+
+        // setup network connector from broker1 to broker2 using actual 
assigned port
+        final String networkConnectorURL = "static://(" + broker2URL + ")";
         connector = broker1.addNetworkConnector(networkConnectorURL);
         connector.setBrokerName(broker1.getBrokerName());
         connector.setDuplex(true);
@@ -132,41 +132,45 @@ public class NetworkedSyncTest extends TestCase {
 
         LOG.info("tearDown() called.");
 
+        if (connector != null) {
+            connector.stop();
+        }
+
         if (broker1 != null && broker1.isStarted()) {
             LOG.info("Broker1 still running, stopping it now.");
             broker1.stop();
+            broker1.waitUntilStopped();
         } else {
             LOG.info("Broker1 not running, nothing to shutdown.");
         }
         if (broker2 != null && broker2.isStarted()) {
             LOG.info("Broker2 still running, stopping it now.");
             broker2.stop();
+            broker2.waitUntilStopped();
         } else {
             LOG.info("Broker2 not running, nothing to shutdown.");
         }
-
     }
 
     public void testMessageExchange() throws Exception {
         LOG.info("testMessageExchange() called.");
 
-        long start = System.currentTimeMillis();
+        final long start = System.currentTimeMillis();
 
         // create producer and consumer threads
-        Thread producer = new Thread(new Producer());
-        Thread consumer = new Thread(new Consumer());
+        final Thread producer = new Thread(new Producer());
+        final Thread consumer = new Thread(new Consumer());
         // start threads
         consumer.start();
         Thread.sleep(2000);
         producer.start();
 
-
         // wait for threads to finish
         producer.join();
         consumer.join();
-        long end = System.currentTimeMillis();
+        final long end = System.currentTimeMillis();
 
-        System.out.println("Duration: "+(end-start));
+        System.out.println("Duration: " + (end - start));
     }
 }
 
@@ -191,20 +195,15 @@ class Producer implements Runnable {
         MessageProducer producer = null;
 
         try {
-            ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+            final ActiveMQConnectionFactory amq = new 
ActiveMQConnectionFactory(
                     NetworkedSyncTest.broker1URL);
             connection = amq.createConnection();
 
-            connection.setExceptionListener(new 
jakarta.jms.ExceptionListener() {
-                @Override
-                public void onException(jakarta.jms.JMSException e) {
-                    e.printStackTrace();
-                }
-            });
+            connection.setExceptionListener(e -> e.printStackTrace());
 
             connection.start();
             session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            Topic destination = session.createTopic("TEST.FOO");
+            final Topic destination = session.createTopic("TEST.FOO");
 
             producer = session.createProducer(destination);
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -214,10 +213,10 @@ class Producer implements Runnable {
             // Create and send message
             for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) {
 
-                String text = "Hello world! From: "
+                final String text = "Hello world! From: "
                         + Thread.currentThread().getName() + " : "
                         + this.hashCode() + ":" + counter;
-                TextMessage message = session.createTextMessage(text);
+                final TextMessage message = session.createTextMessage(text);
                 producer.send(message);
                 counter++;
 
@@ -225,7 +224,7 @@ class Producer implements Runnable {
                     LOG.info("sent " + counter + " messages");
 
             }
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             LOG.error(ex.toString());
             return;
         } finally {
@@ -236,7 +235,7 @@ class Producer implements Runnable {
                     session.close();
                 if (connection != null)
                     connection.close();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 LOG.error("Problem closing down JMS objects: " + e);
             }
         }
@@ -250,8 +249,7 @@ class Producer implements Runnable {
  */
 class Consumer implements Runnable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);;
-
+    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
 
     /**
      * connect to broker and receive messages
@@ -263,42 +261,35 @@ class Consumer implements Runnable {
         MessageConsumer consumer = null;
 
         try {
-            ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+            final ActiveMQConnectionFactory amq = new 
ActiveMQConnectionFactory(
                     NetworkedSyncTest.broker2URL);
             connection = amq.createConnection();
             // need to set clientID when using durable subscription.
             connection.setClientID("tmielke");
 
-            connection.setExceptionListener(new 
jakarta.jms.ExceptionListener() {
-                @Override
-                public void onException(jakarta.jms.JMSException e) {
-                    e.printStackTrace();
-                }
-            });
+            connection.setExceptionListener(e -> e.printStackTrace());
 
             connection.start();
             session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            Destination destination = session.createTopic("TEST.FOO");
-            consumer = session.createDurableSubscriber((Topic) 
destination,"tmielke");
+            final Destination destination = session.createTopic("TEST.FOO");
+            consumer = session.createDurableSubscriber((Topic) destination, 
"tmielke");
 
             long counter = 0;
             // Wait for a message
             for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) {
-                Message message2 = consumer.receive();
+                final Message message2 = consumer.receive();
                 if (message2 instanceof TextMessage) {
-                    TextMessage textMessage = (TextMessage) message2;
+                    final TextMessage textMessage = (TextMessage) message2;
                     textMessage.getText();
-                    // logger.info("Received: " + text);
                 } else {
-                    LOG.error("Received message of unsupported type. Expecting 
TextMessage. "+ message2);
+                    LOG.error("Received message of unsupported type. Expecting 
TextMessage. " + message2);
                 }
                 counter++;
                 if ((counter % 1000) == 0)
                     LOG.info("received " + counter + " messages");
 
-
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.error("Error in Consumer: " + e);
             return;
         } finally {
@@ -309,7 +300,7 @@ class Consumer implements Runnable {
                     session.close();
                 if (connection != null)
                     connection.close();
-            } catch (Exception ex) {
+            } catch (final Exception ex) {
                 LOG.error("Error closing down JMS objects: " + ex);
             }
         }
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml
index 7d37217f47..95a1554794 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml
@@ -23,20 +23,21 @@
 
   <!-- an embedded broker -->
 
-  <bean id="broker" class="org.apache.activemq.broker.BrokerService" 
+  <bean id="broker" class="org.apache.activemq.broker.BrokerService"
     init-method="start">
+    <property name="brokerName" value="springTestBroker" />
+    <property name="persistent" value="false" />
     <property name="transportConnectorURIs">
       <list>
-        <value>tcp://localhost:61616</value>
-        <value>tcp://localhost:61636</value>
+        <value>tcp://localhost:0</value>
       </list>
     </property>
   </bean>
 
-  <!-- JMS ConnectionFactory to use -->
+  <!-- JMS ConnectionFactory to use - VM transport for embedded broker -->
   <bean id="jmsFactory"
     class="org.apache.activemq.ActiveMQConnectionFactory">
-    <property name="brokerURL" value="tcp://localhost:61636" />
+    <property name="brokerURL" value="vm://springTestBroker" />
   </bean>
 
   <!-- Spring JMS Template -->
@@ -47,7 +48,7 @@
       <bean
         class="org.springframework.jms.connection.SingleConnectionFactory">
         <property name="targetConnectionFactory">
-          <ref local="jmsFactory" />
+          <ref bean="jmsFactory" />
         </property>
       </bean>
     </property>
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-ephemeral.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-ephemeral.xml
new file mode 100644
index 0000000000..c58962326e
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-ephemeral.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+<!-- Network connectors are added programmatically in 
BaseNetworkTest.addNetworkConnectors()
+     to use the actual assigned ephemeral ports -->
+<broker brokerName="localBroker" start="false" persistent="true" 
useShutdownHook="false" monitorConnectionSplits="true" 
xmlns="http://activemq.apache.org/schema/core";>
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:0"/>
+    </transportConnectors>
+    
+  </broker>
+</beans>
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-ephemeral.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-ephemeral.xml
new file mode 100644
index 0000000000..dd7561e5b8
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-ephemeral.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans"; 
+  xmlns:amq="http://activemq.apache.org/schema/core";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <!-- Network connectors are added programmatically in 
BaseNetworkTest.addNetworkConnectors()
+       to use the actual assigned ephemeral ports -->
+  <broker brokerName="remoteBroker" start="false" useJmx="false" 
persistent="true" useShutdownHook="false" monitorConnectionSplits="false" 
xmlns="http://activemq.apache.org/schema/core";>
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:0"/>
+    </transportConnectors>
+  </broker>
+
+</beans>
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
index 5c7c14c2d4..0b1f5eafdb 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
@@ -27,22 +27,22 @@
   <!-- Broker1 -->
   <broker brokerName="broker1" id="broker1" useJmx="true" persistent="true" 
deleteAllMessagesOnStartup="true" start="false" 
xmlns="http://activemq.apache.org/schema/core";>
     <transportConnectors>
-      <transportConnector uri="tcp://localhost:61616" />
+      <transportConnector uri="tcp://localhost:0" />
     </transportConnectors>
  
     <persistenceAdapter>
-      <amqPersistenceAdapter directory="target/Broker1-data/activemq-data" 
syncOnWrite="true"  indexPageSize="16kb" indexBinSize="100" 
maxReferenceFileLength="8192"/>
+      <kahaDB directory="target/Broker1-data/activemq-data" 
journalMaxFileLength="32mb"/>
     </persistenceAdapter>
   </broker>
- 
- 
+
+
   <!-- Broker2 -->
   <broker brokerName="broker2" id="broker2" useJmx="true" persistent="false" 
deleteAllMessagesOnStartup="true" start="false" 
xmlns="http://activemq.apache.org/schema/core";>
     <transportConnectors>
-      <transportConnector uri="tcp://localhost:62616" />
+      <transportConnector uri="tcp://localhost:0" />
     </transportConnectors>
      <persistenceAdapter>
-      <amqPersistenceAdapter directory="target/Broker2-data/activemq-data" 
syncOnWrite="true"  indexPageSize="16kb" indexBinSize="100" 
maxReferenceFileLength="8192"/>
+      <kahaDB directory="target/Broker2-data/activemq-data" 
journalMaxFileLength="32mb"/>
     </persistenceAdapter>
   </broker>
 </beans>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to