This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 902b85cc5b Support ephemeral ports in network config and Spring (#1674)
902b85cc5b is described below
commit 902b85cc5b19e9e295813fc118f04fc239b6ff36
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Thu Feb 19 16:59:34 2026 +0100
Support ephemeral ports in network config and Spring (#1674)
---
.../apache/activemq/network/BaseNetworkTest.java | 101 +++++++++++++++--
.../apache/activemq/network/DuplexNetworkTest.java | 7 +-
...callyIncludedDestinationsDuplexNetworkTest.java | 7 +-
.../activemq/network/MulticastNetworkTest.java | 5 +
.../activemq/network/NetworkFailoverTest.java | 102 +++++++++++-------
.../apache/activemq/perf/NetworkedSyncTest.java | 119 ++++++++++-----------
.../org/apache/activemq/broker/spring.xml | 13 +--
.../activemq/network/localBroker-ephemeral.xml | 35 ++++++
.../activemq/network/remoteBroker-ephemeral.xml | 36 +++++++
.../org/apache/activemq/perf/networkSync.xml | 12 +--
10 files changed, 307 insertions(+), 130 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
index 1af6636e92..d72dc28177 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
@@ -16,13 +16,22 @@
*/
package org.apache.activemq.network;
+import static org.junit.Assert.assertTrue;
+
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import jakarta.jms.Connection;
import jakarta.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
@@ -59,11 +68,15 @@ public class BaseNetworkTest {
if(remoteConnection != null)
remoteConnection.close();
- if(localBroker != null)
+ if(localBroker != null) {
localBroker.stop();
+ localBroker.waitUntilStopped();
+ }
- if(remoteBroker != null)
+ if(remoteBroker != null) {
remoteBroker.stop();
+ remoteBroker.waitUntilStopped();
+ }
}
protected void doSetUp(boolean deleteAllMessages) throws Exception {
@@ -75,14 +88,25 @@ public class BaseNetworkTest {
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
- URI localURI = localBroker.getVmConnectorURI();
+
+ // Programmatically add network connectors using the actual assigned
ephemeral ports.
+ // Use startNetworkConnector() instead of connector.start() to ensure
proper JMX MBean registration.
+ addNetworkConnectors();
+
+ // Wait for both network bridges to be FULLY started (advisory
consumers registered).
+ // activeBridges().isEmpty() is NOT sufficient because bridges are
added to the map
+ // before start() completes asynchronously. We must wait for the
startedLatch.
+ waitForBridgeFullyStarted(localBroker, "Local");
+ waitForBridgeFullyStarted(remoteBroker, "Remote");
+
+ final URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
- URI remoteURI = remoteBroker.getVmConnectorURI();
+ final URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
@@ -91,21 +115,76 @@ public class BaseNetworkTest {
remoteSession = remoteConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
+ /**
+ * Programmatically adds network connectors between the local and remote
brokers
+ * using the actual assigned ephemeral ports. This avoids hardcoding ports
in XML
+ * config files which causes port conflicts on CI.
+ */
+ protected void addNetworkConnectors() throws Exception {
+ final URI remoteConnectURI =
remoteBroker.getTransportConnectors().get(0).getConnectUri();
+ final URI localConnectURI =
localBroker.getTransportConnectors().get(0).getConnectUri();
+
+ // Local -> Remote network connector (matches the original
localBroker.xml config)
+ final DiscoveryNetworkConnector localToRemote = new
DiscoveryNetworkConnector(
+ new URI("static:(" + remoteConnectURI + ")"));
+ localToRemote.setName("networkConnector");
+ localToRemote.setDynamicOnly(false);
+ localToRemote.setConduitSubscriptions(true);
+ localToRemote.setDecreaseNetworkConsumerPriority(false);
+
+ final List<ActiveMQDestination> dynamicallyIncluded = new
ArrayList<>();
+ dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
+ dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
+ localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);
+
+ final List<ActiveMQDestination> excluded = new ArrayList<>();
+ excluded.add(new ActiveMQQueue("exclude.test.foo"));
+ excluded.add(new ActiveMQTopic("exclude.test.bar"));
+ localToRemote.setExcludedDestinations(excluded);
+
+ localBroker.addNetworkConnector(localToRemote);
+ // startNetworkConnector handles JMX MBean registration and connector
startup
+ localBroker.startNetworkConnector(localToRemote, null);
+
+ // Remote -> Local network connector (matches the original
remoteBroker.xml config)
+ final DiscoveryNetworkConnector remoteToLocal = new
DiscoveryNetworkConnector(
+ new URI("static:(" + localConnectURI + ")"));
+ remoteToLocal.setName("networkConnector");
+ remoteBroker.addNetworkConnector(remoteToLocal);
+ remoteBroker.startNetworkConnector(remoteToLocal, null);
+ }
+
+ protected void waitForBridgeFullyStarted(final BrokerService broker, final
String label) throws Exception {
+ // Skip if broker has no network connectors (e.g., duplex target
broker receives
+ // bridge connections but doesn't initiate them)
+ if (broker.getNetworkConnectors().isEmpty()) {
+ return;
+ }
+ assertTrue(label + " broker bridge should be fully started",
Wait.waitFor(() -> {
+ if
(broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+ return false;
+ }
+ final NetworkBridge bridge =
broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ return ((DemandForwardingBridgeSupport)
bridge).startedLatch.getCount() == 0;
+ }
+ return true;
+ }, TimeUnit.SECONDS.toMillis(10), 100));
+ }
+
protected String getRemoteBrokerURI() {
- return "org/apache/activemq/network/remoteBroker.xml";
+ return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
}
protected String getLocalBrokerURI() {
- return "org/apache/activemq/network/localBroker.xml";
+ return "org/apache/activemq/network/localBroker-ephemeral.xml";
}
protected BrokerService createBroker(String uri) throws Exception {
- Resource resource = new ClassPathResource(uri);
- BrokerFactoryBean factory = new BrokerFactoryBean(resource);
- resource = new ClassPathResource(uri);
- factory = new BrokerFactoryBean(resource);
+ final Resource resource = new ClassPathResource(uri);
+ final BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
- BrokerService result = factory.getBroker();
+ final BrokerService result = factory.getBroker();
return result;
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
index e97d87b696..3006c9c9cc 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
@@ -40,12 +40,17 @@ public class DuplexNetworkTest extends SimpleNetworkTest {
@Override
protected BrokerService createRemoteBroker() throws Exception {
- BrokerService broker = new BrokerService();
+ final BrokerService broker = new BrokerService();
broker.setBrokerName("remoteBroker");
broker.addConnector("tcp://localhost:61617?transport.connectAttemptTimeout=2000");
return broker;
}
+ @Override
+ protected void addNetworkConnectors() throws Exception {
+ // No-op: duplex network connector is already defined in
duplexLocalBroker.xml
+ }
+
@Test
public void testTempQueues() throws Exception {
TemporaryQueue temp = localSession.createTemporaryQueue();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
index 39018a18fc..d52836e4f6 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -48,12 +48,17 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
@Override
protected BrokerService createRemoteBroker() throws Exception {
- BrokerService broker = new BrokerService();
+ final BrokerService broker = new BrokerService();
broker.setBrokerName("remoteBroker");
broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
return broker;
}
+ @Override
+ protected void addNetworkConnectors() throws Exception {
+ // No-op: duplex network connector is already defined in
duplexDynamicIncludedDestLocalBroker.xml
+ }
+
// we have to override this, because with dynamicallyIncludedDestinations
working properly
// (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get
request/response
// with temps working (there is no wild card like there is for
staticallyIncludedDest)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
index 7813b07635..4eb65880b5 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
@@ -21,6 +21,11 @@ package org.apache.activemq.network;
*/
public class MulticastNetworkTest extends SimpleNetworkTest {
+ @Override
+ protected void addNetworkConnectors() throws Exception {
+ // No-op: multicast network connectors are already defined in the XML
configs
+ }
+
protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/multicast/remoteBroker.xml";
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
index fa7efce898..f086753289 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.network;
import java.io.IOException;
+import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.jms.Connection;
@@ -25,7 +26,6 @@ import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
@@ -39,6 +39,7 @@ import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.xbean.BrokerFactoryBean;
@@ -68,32 +69,28 @@ public class NetworkFailoverTest extends TestCase {
public void testRequestReply() throws Exception {
final MessageProducer remoteProducer =
remoteSession.createProducer(null);
MessageConsumer remoteConsumer =
remoteSession.createConsumer(included);
- remoteConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message msg) {
- final TextMessage textMsg = (TextMessage)msg;
+ remoteConsumer.setMessageListener(msg -> {
+ final TextMessage textMsg = (TextMessage) msg;
+ try {
+ final String payload = "REPLY: " + textMsg.getText() + ", " +
textMsg.getJMSMessageID();
+ final Destination replyTo = msg.getJMSReplyTo();
+ textMsg.clearBody();
+ textMsg.setText(payload);
+ LOG.info("*** Sending response: {}", textMsg.getText());
+ remoteProducer.send(replyTo, textMsg);
+ LOG.info("replied with: " + textMsg.getJMSMessageID());
+
+ } catch (DestinationDoesNotExistException expected) {
+ // been removed but not yet recreated
+ replyToNonExistDest.incrementAndGet();
try {
- String payload = "REPLY: " + textMsg.getText() + ", " +
textMsg.getJMSMessageID();
- Destination replyTo;
- replyTo = msg.getJMSReplyTo();
- textMsg.clearBody();
- textMsg.setText(payload);
- LOG.info("*** Sending response: {}", textMsg.getText());
- remoteProducer.send(replyTo, textMsg);
- LOG.info("replied with: " + textMsg.getJMSMessageID());
-
- } catch (DestinationDoesNotExistException expected) {
- // been removed but not yet recreated
- replyToNonExistDest.incrementAndGet();
- try {
- LOG.info("NED: " + textMsg.getJMSMessageID());
- } catch (JMSException e) {
- e.printStackTrace();
- };
- } catch (Exception e) {
- LOG.warn("*** Responder listener caught exception: ", e);
+ LOG.info("NED: " + textMsg.getJMSMessageID());
+ } catch (JMSException e) {
e.printStackTrace();
}
+ } catch (Exception e) {
+ LOG.warn("*** Responder listener caught exception: ", e);
+ e.printStackTrace();
}
});
@@ -104,16 +101,13 @@ public class NetworkFailoverTest extends TestCase {
// track remote dlq for forward failures
MessageConsumer dlqconsumer = remoteSession.createConsumer(new
ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
- dlqconsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- LOG.info("dlq " + message.getJMSMessageID());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- remoteDLQCount.incrementAndGet();
+ dlqconsumer.setMessageListener(message -> {
+ try {
+ LOG.info("dlq " + message.getJMSMessageID());
+ } catch (JMSException e) {
+ e.printStackTrace();
}
+ remoteDLQCount.incrementAndGet();
});
// allow for consumer infos to perculate arround
@@ -176,25 +170,51 @@ public class NetworkFailoverTest extends TestCase {
} catch(Exception ex) {}
}
- protected void doSetUp(boolean deleteAllMessages) throws Exception {
+ protected void doSetUp(final boolean deleteAllMessages) throws Exception {
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.setCacheTempDestinations(true);
remoteBroker.start();
+ remoteBroker.waitUntilStarted();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.setCacheTempDestinations(true);
localBroker.start();
-
- String localURI = "tcp://localhost:61616";
- String remoteURI = "tcp://localhost:61617";
- ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true");
+ localBroker.waitUntilStarted();
+
+ // Get actual assigned ephemeral ports
+ final URI localConnectURI =
localBroker.getTransportConnectors().get(0).getConnectUri();
+ final URI remoteConnectURI =
remoteBroker.getTransportConnectors().get(0).getConnectUri();
+ final String localURI = localConnectURI.toString();
+ final String remoteURI = remoteConnectURI.toString();
+
+ // Add network connectors programmatically using actual ports
+ final DiscoveryNetworkConnector localToRemote = new
DiscoveryNetworkConnector(
+ new URI("static://(" + remoteURI + ")"));
+ localToRemote.setName("networkConnector");
+ localToRemote.setDynamicOnly(false);
+ localToRemote.setConduitSubscriptions(true);
+ localToRemote.setDecreaseNetworkConsumerPriority(false);
+ localToRemote.setDynamicallyIncludedDestinations(
+ java.util.List.of(new ActiveMQQueue("include.test.foo"), new
ActiveMQTopic("include.test.bar")));
+ localToRemote.setExcludedDestinations(
+ java.util.List.of(new ActiveMQQueue("exclude.test.foo"), new
ActiveMQTopic("exclude.test.bar")));
+ localBroker.addNetworkConnector(localToRemote);
+ localBroker.startNetworkConnector(localToRemote, null);
+
+ final DiscoveryNetworkConnector remoteToLocal = new
DiscoveryNetworkConnector(
+ new URI("static://(" + localURI + ")"));
+ remoteToLocal.setName("networkConnector");
+ remoteBroker.addNetworkConnector(remoteToLocal);
+ remoteBroker.startNetworkConnector(remoteToLocal, null);
+
+ ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI +
")?randomize=false&backup=false&trackMessages=true");
localConnection = fac.createConnection();
localConnection.setClientID("local");
localConnection.start();
- fac = new ActiveMQConnectionFactory("failover:("+remoteURI +
","+localURI+")?randomize=false&backup=false&trackMessages=true");
+ fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," +
localURI + ")?randomize=false&backup=false&trackMessages=true");
fac.setWatchTopicAdvisories(false);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("remote");
@@ -205,11 +225,11 @@ public class NetworkFailoverTest extends TestCase {
}
protected String getRemoteBrokerURI() {
- return "org/apache/activemq/network/remoteBroker.xml";
+ return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
}
protected String getLocalBrokerURI() {
- return "org/apache/activemq/network/localBroker.xml";
+ return "org/apache/activemq/network/localBroker-ephemeral.xml";
}
protected BrokerService createBroker(String uri) throws Exception {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
index db75e2e594..5b7fefd54d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.perf;
+import java.net.URI;
+
import jakarta.jms.Connection;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
@@ -32,22 +34,22 @@ import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.xbean.BrokerFactoryBean;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.core.io.ClassPathResource;
public class NetworkedSyncTest extends TestCase {
// constants
public static final int MESSAGE_COUNT = 10000; //100000;
- public final static String config =
"org/apache/activemq/perf/networkSync.xml";
- public final static String broker1URL = "tcp://localhost:61616";
- public final static String broker2URL = "tcp://localhost:62616";
- private final String networkConnectorURL = "static://(" + broker2URL + ")";
+ public static final String config =
"org/apache/activemq/perf/networkSync.xml";
private static final Logger LOG =
LoggerFactory.getLogger(NetworkedSyncTest.class);
+
+ // Broker URLs resolved after startup from ephemeral ports
+ static String broker1URL;
+ static String broker2URL;
+
BrokerService broker1 = null;
BrokerService broker2 = null;
NetworkConnector connector = null;
@@ -55,12 +57,12 @@ public class NetworkedSyncTest extends TestCase {
/**
* @param name
*/
- public NetworkedSyncTest(String name) {
+ public NetworkedSyncTest(final String name) {
super(name);
LOG.info("Testcase started.");
}
- public static void main(String args[]) {
+ public static void main(final String[] args) {
TestRunner.run(NetworkedSyncTest.class);
}
@@ -70,53 +72,51 @@ public class NetworkedSyncTest extends TestCase {
@Override
protected void setUp() throws Exception {
LOG.info("setUp() called.");
- ClassPathXmlApplicationContext context1 = null;
- BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new
ClassPathResource(config));
- assertNotNull(brokerFactory);
/* start up first broker instance */
try {
- // resolve broker1
Thread.currentThread().setContextClassLoader(
NetworkedSyncTest.class.getClassLoader());
- context1 = new ClassPathXmlApplicationContext(config);
+ final ClassPathXmlApplicationContext context1 = new
ClassPathXmlApplicationContext(config);
broker1 = (BrokerService) context1.getBean("broker1");
- // start the broker
if (!broker1.isStarted()) {
LOG.info("Broker broker1 not yet started. Kicking it off
now.");
broker1.start();
- } else {
- LOG.info("Broker broker1 already started. Not kicking it off a
second time.");
- broker1.waitUntilStopped();
+ broker1.waitUntilStarted();
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error: " + e.getMessage());
throw e;
- // brokerService.stop();
}
/* start up second broker instance */
try {
Thread.currentThread().setContextClassLoader(
NetworkedSyncTest.class.getClassLoader());
- context1 = new ClassPathXmlApplicationContext(config);
- broker2 = (BrokerService) context1.getBean("broker2");
+ final ClassPathXmlApplicationContext context2 = new
ClassPathXmlApplicationContext(config);
+ broker2 = (BrokerService) context2.getBean("broker2");
- // start the broker
if (!broker2.isStarted()) {
LOG.info("Broker broker2 not yet started. Kicking it off
now.");
broker2.start();
- } else {
- LOG.info("Broker broker2 already started. Not kicking it off a
second time.");
- broker2.waitUntilStopped();
+ broker2.waitUntilStarted();
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error: " + e.getMessage());
throw e;
}
- // setup network connector from broker1 to broker2
+ // Resolve actual broker URLs from ephemeral ports assigned after
startup
+ final URI broker1ConnectURI =
broker1.getTransportConnectors().get(0).getConnectUri();
+ final URI broker2ConnectURI =
broker2.getTransportConnectors().get(0).getConnectUri();
+ broker1URL = broker1ConnectURI.toString();
+ broker2URL = broker2ConnectURI.toString();
+ LOG.info("Broker1 URL: " + broker1URL);
+ LOG.info("Broker2 URL: " + broker2URL);
+
+ // setup network connector from broker1 to broker2 using actual
assigned port
+ final String networkConnectorURL = "static://(" + broker2URL + ")";
connector = broker1.addNetworkConnector(networkConnectorURL);
connector.setBrokerName(broker1.getBrokerName());
connector.setDuplex(true);
@@ -132,41 +132,45 @@ public class NetworkedSyncTest extends TestCase {
LOG.info("tearDown() called.");
+ if (connector != null) {
+ connector.stop();
+ }
+
if (broker1 != null && broker1.isStarted()) {
LOG.info("Broker1 still running, stopping it now.");
broker1.stop();
+ broker1.waitUntilStopped();
} else {
LOG.info("Broker1 not running, nothing to shutdown.");
}
if (broker2 != null && broker2.isStarted()) {
LOG.info("Broker2 still running, stopping it now.");
broker2.stop();
+ broker2.waitUntilStopped();
} else {
LOG.info("Broker2 not running, nothing to shutdown.");
}
-
}
public void testMessageExchange() throws Exception {
LOG.info("testMessageExchange() called.");
- long start = System.currentTimeMillis();
+ final long start = System.currentTimeMillis();
// create producer and consumer threads
- Thread producer = new Thread(new Producer());
- Thread consumer = new Thread(new Consumer());
+ final Thread producer = new Thread(new Producer());
+ final Thread consumer = new Thread(new Consumer());
// start threads
consumer.start();
Thread.sleep(2000);
producer.start();
-
// wait for threads to finish
producer.join();
consumer.join();
- long end = System.currentTimeMillis();
+ final long end = System.currentTimeMillis();
- System.out.println("Duration: "+(end-start));
+ System.out.println("Duration: " + (end - start));
}
}
@@ -191,20 +195,15 @@ class Producer implements Runnable {
MessageProducer producer = null;
try {
- ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+ final ActiveMQConnectionFactory amq = new
ActiveMQConnectionFactory(
NetworkedSyncTest.broker1URL);
connection = amq.createConnection();
- connection.setExceptionListener(new
jakarta.jms.ExceptionListener() {
- @Override
- public void onException(jakarta.jms.JMSException e) {
- e.printStackTrace();
- }
- });
+ connection.setExceptionListener(e -> e.printStackTrace());
connection.start();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Topic destination = session.createTopic("TEST.FOO");
+ final Topic destination = session.createTopic("TEST.FOO");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -214,10 +213,10 @@ class Producer implements Runnable {
// Create and send message
for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) {
- String text = "Hello world! From: "
+ final String text = "Hello world! From: "
+ Thread.currentThread().getName() + " : "
+ this.hashCode() + ":" + counter;
- TextMessage message = session.createTextMessage(text);
+ final TextMessage message = session.createTextMessage(text);
producer.send(message);
counter++;
@@ -225,7 +224,7 @@ class Producer implements Runnable {
LOG.info("sent " + counter + " messages");
}
- } catch (Exception ex) {
+ } catch (final Exception ex) {
LOG.error(ex.toString());
return;
} finally {
@@ -236,7 +235,7 @@ class Producer implements Runnable {
session.close();
if (connection != null)
connection.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Problem closing down JMS objects: " + e);
}
}
@@ -250,8 +249,7 @@ class Producer implements Runnable {
*/
class Consumer implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);;
-
+ private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
/**
* connect to broker and receive messages
@@ -263,42 +261,35 @@ class Consumer implements Runnable {
MessageConsumer consumer = null;
try {
- ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+ final ActiveMQConnectionFactory amq = new
ActiveMQConnectionFactory(
NetworkedSyncTest.broker2URL);
connection = amq.createConnection();
// need to set clientID when using durable subscription.
connection.setClientID("tmielke");
- connection.setExceptionListener(new
jakarta.jms.ExceptionListener() {
- @Override
- public void onException(jakarta.jms.JMSException e) {
- e.printStackTrace();
- }
- });
+ connection.setExceptionListener(e -> e.printStackTrace());
connection.start();
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic("TEST.FOO");
- consumer = session.createDurableSubscriber((Topic)
destination,"tmielke");
+ final Destination destination = session.createTopic("TEST.FOO");
+ consumer = session.createDurableSubscriber((Topic) destination,
"tmielke");
long counter = 0;
// Wait for a message
for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) {
- Message message2 = consumer.receive();
+ final Message message2 = consumer.receive();
if (message2 instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message2;
+ final TextMessage textMessage = (TextMessage) message2;
textMessage.getText();
- // logger.info("Received: " + text);
} else {
- LOG.error("Received message of unsupported type. Expecting
TextMessage. "+ message2);
+ LOG.error("Received message of unsupported type. Expecting
TextMessage. " + message2);
}
counter++;
if ((counter % 1000) == 0)
LOG.info("received " + counter + " messages");
-
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Error in Consumer: " + e);
return;
} finally {
@@ -309,7 +300,7 @@ class Consumer implements Runnable {
session.close();
if (connection != null)
connection.close();
- } catch (Exception ex) {
+ } catch (final Exception ex) {
LOG.error("Error closing down JMS objects: " + ex);
}
}
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml
index 7d37217f47..95a1554794 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/spring.xml
@@ -23,20 +23,21 @@
<!-- an embedded broker -->
- <bean id="broker" class="org.apache.activemq.broker.BrokerService"
+ <bean id="broker" class="org.apache.activemq.broker.BrokerService"
init-method="start">
+ <property name="brokerName" value="springTestBroker" />
+ <property name="persistent" value="false" />
<property name="transportConnectorURIs">
<list>
- <value>tcp://localhost:61616</value>
- <value>tcp://localhost:61636</value>
+ <value>tcp://localhost:0</value>
</list>
</property>
</bean>
- <!-- JMS ConnectionFactory to use -->
+ <!-- JMS ConnectionFactory to use - VM transport for embedded broker -->
<bean id="jmsFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="tcp://localhost:61636" />
+ <property name="brokerURL" value="vm://springTestBroker" />
</bean>
<!-- Spring JMS Template -->
@@ -47,7 +48,7 @@
<bean
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory">
- <ref local="jmsFactory" />
+ <ref bean="jmsFactory" />
</property>
</bean>
</property>
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-ephemeral.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-ephemeral.xml
new file mode 100644
index 0000000000..c58962326e
--- /dev/null
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-ephemeral.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
+ http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
+ <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+<!-- Network connectors are added programmatically in
BaseNetworkTest.addNetworkConnectors()
+ to use the actual assigned ephemeral ports -->
+<broker brokerName="localBroker" start="false" persistent="true"
useShutdownHook="false" monitorConnectionSplits="true"
xmlns="http://activemq.apache.org/schema/core">
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:0"/>
+ </transportConnectors>
+
+ </broker>
+</beans>
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-ephemeral.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-ephemeral.xml
new file mode 100644
index 0000000000..dd7561e5b8
--- /dev/null
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-ephemeral.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <!-- Network connectors are added programmatically in
BaseNetworkTest.addNetworkConnectors()
+ to use the actual assigned ephemeral ports -->
+ <broker brokerName="remoteBroker" start="false" useJmx="false"
persistent="true" useShutdownHook="false" monitorConnectionSplits="false"
xmlns="http://activemq.apache.org/schema/core">
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:0"/>
+ </transportConnectors>
+ </broker>
+
+</beans>
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
index 5c7c14c2d4..0b1f5eafdb 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/perf/networkSync.xml
@@ -27,22 +27,22 @@
<!-- Broker1 -->
<broker brokerName="broker1" id="broker1" useJmx="true" persistent="true"
deleteAllMessagesOnStartup="true" start="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
- <transportConnector uri="tcp://localhost:61616" />
+ <transportConnector uri="tcp://localhost:0" />
</transportConnectors>
<persistenceAdapter>
- <amqPersistenceAdapter directory="target/Broker1-data/activemq-data"
syncOnWrite="true" indexPageSize="16kb" indexBinSize="100"
maxReferenceFileLength="8192"/>
+ <kahaDB directory="target/Broker1-data/activemq-data"
journalMaxFileLength="32mb"/>
</persistenceAdapter>
</broker>
-
-
+
+
<!-- Broker2 -->
<broker brokerName="broker2" id="broker2" useJmx="true" persistent="false"
deleteAllMessagesOnStartup="true" start="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
- <transportConnector uri="tcp://localhost:62616" />
+ <transportConnector uri="tcp://localhost:0" />
</transportConnectors>
<persistenceAdapter>
- <amqPersistenceAdapter directory="target/Broker2-data/activemq-data"
syncOnWrite="true" indexPageSize="16kb" indexBinSize="100"
maxReferenceFileLength="8192"/>
+ <kahaDB directory="target/Broker2-data/activemq-data"
journalMaxFileLength="32mb"/>
</persistenceAdapter>
</broker>
</beans>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact