This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 59eb8315cb Batch of fixes (#1622)
59eb8315cb is described below
commit 59eb8315cb00ed694cc0f481064246e55dc9203d
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Thu Jan 22 13:12:18 2026 +0100
Batch of fixes (#1622)
---
.../store/kahadb/JournalFdRecoveryTest.java | 3 +
.../store/kahadb/scheduler/AMQ7086Test.java | 10 +--
.../activemq/ra/ActiveMQConnectionFactoryTest.java | 4 +-
.../apache/activemq/transport/stomp/StompTest.java | 7 +-
.../activemq/ActiveMQSslConnectionFactoryTest.java | 76 +++++++++++++---------
.../java/org/apache/activemq/JMSConsumerTest.java | 8 +++
.../FailoverConsumerOutstandingCommitTest.java | 22 ++++++-
.../failover/FailoverConsumerUnconsumedTest.java | 16 ++++-
.../transport/failover/FailoverDuplicateTest.java | 12 +++-
.../FailoverDurableSubTransactionTest.java | 2 +
.../failover/FailoverPrefetchZeroTest.java | 10 ++-
.../failover/FailoverTransactionTest.java | 65 +++++++++++++++++-
.../failover/FailoverXATransactionTest.java | 12 +++-
13 files changed, 202 insertions(+), 45 deletions(-)
diff --git
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
index c26bb0e8cd..1c03df2a21 100644
---
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
+++
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -224,6 +224,9 @@ public class JournalFdRecoveryTest {
assertTrue("more than x files: " + numFiles, numFiles > 5);
assertEquals("Drain", 30, tryConsume(destination, 30));
+ // Force checkpoint to ensure all acknowledgments are persisted before
stopping
+ adapter.getStore().checkpoint(true);
+
LOG.info("Num files after stopped: " + getNumberOfJournalFiles());
File dataDir = broker.getPersistenceAdapter().getDirectory();
diff --git
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
index 1335ca5d79..467f173124 100644
---
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
+++
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
@@ -82,16 +82,18 @@ public class AMQ7086Test {
produceWithScheduledDelayAndConsume();
LOG.info("job store: " + jobSchedulerStore);
- int numSchedulerFiles =
jobSchedulerStore.getJournal().getFileMap().size();
LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
- int numKahadbFiles =
kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();
-
- LOG.info("Num files, job store: {}, message store: {}",
numKahadbFiles, numKahadbFiles);
// pull the dirs before we stop
File jobDir = jobSchedulerStore.getJournal().getDirectory();
File kahaDir =
kahaDBPersistenceAdapter.getStore().getJournal().getDirectory();
+ // Count actual disk files before stop to avoid TOCTOU race with
in-memory state
+ int numSchedulerFiles = verifyFilesOnDisk(jobDir);
+ int numKahadbFiles = verifyFilesOnDisk(kahaDir);
+
+ LOG.info("Num files, job store: {}, message store: {}",
numSchedulerFiles, numKahadbFiles);
+
brokerService.stop();
final int jobFilesOnDisk = verifyFilesOnDisk(jobDir);
diff --git
a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
index a8762c867a..14370bfe7f 100644
---
a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
+++
b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
@@ -137,7 +137,7 @@ public class ActiveMQConnectionFactoryTest {
try {
final TransportConnector transportConnector =
brokerService.getTransportConnectors().get(0);
- String failoverUrl =
String.format("failover:(%s)?maxReconnectAttempts=10&initialReconnectDelay=100",
transportConnector.getConnectUri());
+ String failoverUrl =
String.format("failover:(%s)?startupMaxReconnectAttempts=10&maxReconnectAttempts=2&initialReconnectDelay=100&timeout=2000",
transportConnector.getConnectUri());
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.start(null);
@@ -168,7 +168,7 @@ public class ActiveMQConnectionFactoryTest {
// Wait for failover to reconnect and recover() to succeed
// The ReconnectingXAResource should handle reconnection
transparently
- // Timeout: 30s accounts for maxReconnectAttempts=10 with
exponential backoff
+ // Timeout: 30s accounts for startupMaxReconnectAttempts=10 with
exponential backoff
// up to the default maxReconnectDelay (30s per attempt)
// Poll interval: 500ms balances responsiveness without
overwhelming the system
final XAResource resource = resources[0];
diff --git
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index eba3c6747f..218dda6524 100644
---
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -2299,7 +2299,8 @@ public class StompTest extends StompTestSupport {
// We only want one of them, to trigger the shutdown and potentially
// see a deadlock.
- while (!gotMessage && !gotReceipt) {
+ boolean gotError = false;
+ while (!gotMessage && !gotReceipt && !gotError) {
frame = stompConnection.receiveFrame();
LOG.debug("Received the frame: " + frame);
@@ -2308,6 +2309,10 @@ public class StompTest extends StompTestSupport {
gotReceipt = true;
} else if(frame.startsWith("MESSAGE")) {
gotMessage = true;
+ } else if(frame.startsWith("ERROR")) {
+ // ERROR can occur with SSL/NIO transports due to reconnection
attempts
+ // It still indicates the connection is not deadlocked
+ gotError = true;
} else {
fail("Received a frame that we were not expecting.");
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java
index 52757ce86f..d3e4d5814b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQSslConnectionFactoryTest.java
@@ -41,6 +41,9 @@ import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.util.Arrays;
+import java.util.List;
+
import static org.junit.Assert.assertArrayEquals;
public class ActiveMQSslConnectionFactoryTest extends CombinationTestSupport {
@@ -53,6 +56,7 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
private ActiveMQConnection connection;
private BrokerService broker;
+ private String actualBrokerUri;
@Override
protected void tearDown() throws Exception {
@@ -70,10 +74,10 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testCreateTcpConnectionUsingKnownPort() throws Exception {
// Control case: check that the factory can create an ordinary
(non-ssl) connection.
- broker =
createBroker("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true");
+ broker =
createBroker("tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true");
// This should create the connection.
- ActiveMQSslConnectionFactory cf =
getFactory("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true");
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
connection = (ActiveMQConnection)cf.createConnection();
assertNotNull(connection);
connection.start();
@@ -83,11 +87,13 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testCreateTcpConnectionWithSocketParameters() throws Exception
{
// Control case: check that the factory can create an ordinary
(non-ssl) connection.
- String tcpUri =
"tcp://localhost:61610?socket.OOBInline=true&socket.keepAlive=true&tcpNoDelay=true";
+ String tcpUri = "tcp://localhost:0";
broker = createBroker(tcpUri);
- // This should create the connection.
- ActiveMQSslConnectionFactory cf = getFactory(tcpUri);
+ // Socket parameters must be set on the CLIENT URI, not the server URI.
+ // The broker's publishable URI doesn't include socket.* parameters as
those are server-side configs.
+ String clientUri = actualBrokerUri +
"?socket.OOBInline=true&socket.keepAlive=true&tcpNoDelay=true";
+ ActiveMQSslConnectionFactory cf = getFactory(clientUri);
connection = (ActiveMQConnection)cf.createConnection();
assertNotNull(connection);
@@ -103,10 +109,10 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testCreateFailoverTcpConnectionUsingKnownPort() throws
Exception {
// Control case: check that the factory can create an ordinary
(non-ssl) connection.
- broker =
createBroker("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true");
+ broker =
createBroker("tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true");
- // This should create the connection.
- ActiveMQSslConnectionFactory cf =
getFactory("failover:(tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true)");
+ // This should create the connection using the actual bound URI.
+ ActiveMQSslConnectionFactory cf = getFactory("failover:(" +
actualBrokerUri + "?wireFormat.tcpNoDelayEnabled=true)");
connection = (ActiveMQConnection)cf.createConnection();
assertNotNull(connection);
connection.start();
@@ -116,12 +122,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testCreateSslConnection() throws Exception {
// Create SSL/TLS connection with trusted cert from truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
- // This should create the connection.
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ // This should create the connection using the actual bound URI.
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
cf.setTrustStore("server.keystore");
cf.setTrustStorePassword("password");
connection = (ActiveMQConnection)cf.createConnection();
@@ -134,12 +140,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testCreateSslConnectionWithSocketParameters() throws Exception
{
// Create SSL/TLS connection with trusted cert from truststore.
- String sslUri =
"ssl://localhost:61611?socket.enabledProtocols=TLSv1.3&socket.enableSessionCreation=true&socket.needClientAuth=true";
+ String sslUri =
"ssl://localhost:0?socket.enabledProtocols=TLSv1.3&socket.enableSessionCreation=true&socket.needClientAuth=true";
broker = createSslBroker(sslUri);
assertNotNull(broker);
- // This should create the connection.
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ // This should create the connection using the actual bound URI.
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri +
"?socket.enabledProtocols=TLSv1.3&socket.enableSessionCreation=true&socket.needClientAuth=true");
cf.setTrustStore("server.keystore");
cf.setTrustStorePassword("password");
connection = (ActiveMQConnection)cf.createConnection();
@@ -158,12 +164,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testCreateSslConnectionKeyStore() throws Exception {
// Create SSL/TLS connection with trusted cert from truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
- // This should create the connection.
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ // This should create the connection using the actual bound URI.
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
cf.setKeyStore("server.keystore");
cf.setKeyStorePassword("password");
cf.setTrustStore("server.keystore");
@@ -178,12 +184,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testFailoverSslConnection() throws Exception {
// Create SSL/TLS connection with trusted cert from truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
- // This should create the connection.
- ActiveMQSslConnectionFactory cf = getFactory("failover:(" + sslUri +
")?maxReconnectAttempts=4");
+ // This should create the connection using the actual bound URI.
+ ActiveMQSslConnectionFactory cf = getFactory("failover:(" +
actualBrokerUri + ")?maxReconnectAttempts=4");
cf.setTrustStore("server.keystore");
cf.setTrustStorePassword("password");
connection = (ActiveMQConnection)cf.createConnection();
@@ -196,11 +202,11 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
}
public void testFailoverSslConnectionWithKeyAndTrustManagers() throws
Exception {
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
- ActiveMQSslConnectionFactory cf = getFactory("failover:(" + sslUri +
")?maxReconnectAttempts=4");
+ ActiveMQSslConnectionFactory cf = getFactory("failover:(" +
actualBrokerUri + ")?maxReconnectAttempts=4");
cf.setKeyAndTrustManagers(getKeyManager(), getTrustManager(), new
SecureRandom());
connection = (ActiveMQConnection)cf.createConnection();
LOG.info("Created client connection");
@@ -213,12 +219,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testNegativeCreateSslConnectionWithWrongTrustStorePassword()
throws Exception {
// Create SSL/TLS connection with trusted cert from truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
// This should FAIL to connect, due to wrong password.
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
cf.setTrustStore("server.keystore");
cf.setTrustStorePassword("wrongPassword");
try {
@@ -237,12 +243,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testCreateSslConnectionWithNullTrustStorePassword() throws
Exception {
// Create SSL/TLS connection with trusted cert from truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
// This should create the connection.
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
cf.setTrustStore("server.keystore");
//don't set a truststore password so it's null, this caused an NPE
//before AMQ-8550. truststore password is used to protect the integrity
@@ -257,12 +263,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testNegativeCreateSslConnectionWithWrongKeyStorePassword()
throws Exception {
// Create SSL/TLS connection with keystore and trusted cert from
truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
// This should FAIL to connect, due to wrong keystore password.
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
cf.setKeyStore("server.keystore");
cf.setKeyStorePassword("badPassword");
cf.setTrustStore("server.keystore");
@@ -283,13 +289,13 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testNegativeCreateSslConnectionWithNullKeyStorePassword()
throws Exception {
// Create SSL/TLS connection with keystore and trusted cert from
truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
// This should FAIL to connect, due to null password for keystore.
//Before AMQ-8550 this would fail with a NPE
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
cf.setKeyStore("server.keystore");
//don't set keystore password so it's null
cf.setTrustStore("server.keystore");
@@ -311,12 +317,12 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
public void testNegativeCreateSslConnectionWithWrongCert() throws
Exception {
// Create SSL/TLS connection with trusted cert from truststore.
- String sslUri = "ssl://localhost:61611";
+ String sslUri = "ssl://localhost:0";
broker = createSslBroker(sslUri);
assertNotNull(broker);
// This should FAIL to connect, due to wrong password.
- ActiveMQSslConnectionFactory cf = getFactory(sslUri);
+ ActiveMQSslConnectionFactory cf = getFactory(actualBrokerUri);
cf.setTrustStore("dummy.keystore");
cf.setTrustStorePassword("password");
try {
@@ -339,6 +345,9 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
service.addConnector(uri);
service.start();
+ // Get the actual bound URI if ephemeral port was used
+ actualBrokerUri =
service.getTransportConnectors().get(0).getPublishableConnectString();
+
return service;
}
@@ -356,6 +365,9 @@ public class ActiveMQSslConnectionFactoryTest extends
CombinationTestSupport {
service.addSslConnector(uri, km, tm, null);
service.start();
+ // Get the actual bound URI if ephemeral port was used
+ actualBrokerUri =
service.getTransportConnectors().get(0).getPublishableConnectString();
+
return service;
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 605fb9f693..8008f6fb93 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -794,6 +794,14 @@ public class JMSConsumerTest extends JmsTestSupport {
Session session2 = connection2.createSession(true, 0);
MessageConsumer consumer2 = session2.createConsumer(destination);
+ // Wait for consumer2 to fully register with the broker
+ assertTrue("consumer2 registered", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getDestinationConsumers(broker, destination).size() ==
2;
+ }
+ }, 5000));
+
// Pick up the first message.
Message message1 = consumer.receive(1000);
assertNotNull(message1);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index 9466ee2ff1..66b48b2515 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -71,6 +71,8 @@ public class FailoverConsumerOutstandingCommitTest {
public void startBroker(boolean deleteAllMessagesOnStartup) throws
Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup)
throws Exception {
@@ -90,7 +92,7 @@ public class FailoverConsumerOutstandingCommitTest {
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
- url =
broker.getTransportConnectors().get(0).getConnectUri().toString();
+ // Do not set url here - need to get it after broker starts when using
ephemeral ports
return broker;
}
@@ -127,6 +129,9 @@ public class FailoverConsumerOutstandingCommitTest {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories);
cf.setDispatchAsync(false);
@@ -181,6 +186,9 @@ public class FailoverConsumerOutstandingCommitTest {
broker = createBroker(false, url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("consumer added through failover",
commitDoneLatch.await(20, TimeUnit.SECONDS));
assertTrue("another message was recieved after failover",
messagesReceived.await(20, TimeUnit.SECONDS));
@@ -228,6 +236,9 @@ public class FailoverConsumerOutstandingCommitTest {
} });
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories);
cf.setDispatchAsync(false);
@@ -288,6 +299,9 @@ public class FailoverConsumerOutstandingCommitTest {
broker = createBroker(false, url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("commit done through failover", commitDoneLatch.await(20,
TimeUnit.SECONDS));
assertTrue("commit failed", gotCommitException.get());
assertTrue("another message was received after failover",
messagesReceived.await(20, TimeUnit.SECONDS));
@@ -308,6 +322,9 @@ public class FailoverConsumerOutstandingCommitTest {
broker = createBroker(true);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setConsumerFailoverRedeliveryWaitPeriod(10000);
final ActiveMQConnection connection = (ActiveMQConnection)
cf.createConnection();
@@ -333,6 +350,9 @@ public class FailoverConsumerOutstandingCommitTest {
broker = createBroker(false, url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
consumerSession.rollback();
// receive again
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
index bc6e032534..913ca7fc60 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
@@ -71,6 +71,8 @@ public class FailoverConsumerUnconsumedTest {
public void startBroker(boolean deleteAllMessagesOnStartup) throws
Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup)
throws Exception {
@@ -82,7 +84,7 @@ public class FailoverConsumerUnconsumedTest {
broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
- this.url =
broker.getTransportConnectors().get(0).getConnectUri().toString();
+ // Do not set url here - need to get it after broker starts when using
ephemeral ports
return broker;
}
@@ -131,6 +133,9 @@ public class FailoverConsumerUnconsumedTest {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(false);
@@ -201,6 +206,9 @@ public class FailoverConsumerUnconsumedTest {
broker = createBroker(false, this.url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("consumer added through failover",
shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
// each should again get prefetch messages - all unacked deliveries
should be rolledback
@@ -265,6 +273,9 @@ public class FailoverConsumerUnconsumedTest {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories);
@@ -327,6 +338,9 @@ public class FailoverConsumerUnconsumedTest {
broker = createBroker(false, this.url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("consumer added through failover",
shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
// each should again get prefetch messages - all unconsumed deliveries
should be rolledback
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
index 2152d5fad3..8aea7f4ece 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
@@ -65,11 +65,15 @@ public class FailoverDuplicateTest extends TestSupport {
public void startBroker(boolean deleteAllMessagesOnStartup) throws
Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void startBroker(boolean deleteAllMessagesOnStartup, String
bindAddress) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup)
throws Exception {
@@ -83,7 +87,7 @@ public class FailoverDuplicateTest extends TestSupport {
broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
- url =
broker.getTransportConnectors().get(0).getConnectUri().toString();
+ // Do not set url here - need to get it after broker starts when using
ephemeral ports
return broker;
}
@@ -132,6 +136,9 @@ public class FailoverDuplicateTest extends TestSupport {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
Connection sendConnection = cf.createConnection();
@@ -223,6 +230,9 @@ public class FailoverDuplicateTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
index 6b57c6ae19..ad4db46912 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
@@ -178,6 +178,7 @@ public class FailoverDurableSubTransactionTest {
}
});
broker.start();
+
// Get the actual bound URI after broker starts (important for
ephemeral ports)
url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
@@ -285,6 +286,7 @@ public class FailoverDurableSubTransactionTest {
});
broker.start();
+
// Get the actual bound URI after broker starts (important for
ephemeral ports)
url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
index 1020b66698..9a19bbf41a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
@@ -63,6 +63,8 @@ public class FailoverPrefetchZeroTest {
public void startBroker(boolean deleteAllMessagesOnStartup) throws
Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup)
throws Exception {
@@ -74,7 +76,7 @@ public class FailoverPrefetchZeroTest {
broker.addConnector(bindAddress);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
- url =
broker.getTransportConnectors().get(0).getConnectUri().toString();
+ // Do not set url here - need to get it after broker starts when using
ephemeral ports
return broker;
}
@@ -107,6 +109,9 @@ public class FailoverPrefetchZeroTest {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(false);
@@ -143,6 +148,9 @@ public class FailoverPrefetchZeroTest {
broker = createBroker(false, url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("receive completed through failover", receiveDone.await(30,
TimeUnit.SECONDS));
assertTrue("we got our message:", !received.isEmpty());
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 25e413aadd..3d911e3f5d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -108,6 +108,8 @@ public class FailoverTransactionTest extends TestSupport {
public void startBroker(boolean deleteAllMessagesOnStartup) throws
Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void startBroker(boolean deleteAllMessagesOnStartup, String
bindAddress) throws Exception {
@@ -132,7 +134,7 @@ public class FailoverTransactionTest extends TestSupport {
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
- url =
broker.getTransportConnectors().get(0).getConnectUri().toString();
+ // Do not set url here - need to get it after broker starts when using
ephemeral ports
return broker;
}
@@ -203,6 +205,9 @@ public class FailoverTransactionTest extends TestSupport {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
@@ -235,6 +240,9 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("tx committed through failover", commitDoneLatch.await(30,
TimeUnit.SECONDS));
// new transaction
@@ -255,6 +263,9 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -298,6 +309,9 @@ public class FailoverTransactionTest extends TestSupport {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
@@ -331,6 +345,9 @@ public class FailoverTransactionTest extends TestSupport {
broker.setPlugins(new BrokerPlugin[]{new
DestinationPathSeparatorBroker()});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("tx committed trough failover", commitDoneLatch.await(30,
TimeUnit.SECONDS));
// new transaction
@@ -352,6 +369,9 @@ public class FailoverTransactionTest extends TestSupport {
broker.setPlugins(new BrokerPlugin[]{new
DestinationPathSeparatorBroker()});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -409,6 +429,9 @@ public class FailoverTransactionTest extends TestSupport {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
@@ -441,6 +464,9 @@ public class FailoverTransactionTest extends TestSupport {
LOG.info("restarting....");
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("message sent through failover", sendDoneLatch.await(30,
TimeUnit.SECONDS));
// new transaction
@@ -464,6 +490,9 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -531,6 +560,9 @@ public class FailoverTransactionTest extends TestSupport {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
proxy.setTarget(new URI(url));
proxy.open();
@@ -584,6 +616,9 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -752,6 +787,9 @@ public class FailoverTransactionTest extends TestSupport {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
Vector<Connection> connections = new Vector<Connection>();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -827,6 +865,9 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("tx committed through failover", commitDoneLatch.await(30,
TimeUnit.SECONDS));
LOG.info("received message count: " + receivedMessages.size());
@@ -857,6 +898,9 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -897,6 +941,9 @@ public class FailoverTransactionTest extends TestSupport {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
Vector<Connection> connections = new Vector<Connection>();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -988,6 +1035,9 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
consumer = consumerSession.createConsumer(destination);
LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer)
consumer).getConsumerId());
@@ -1007,6 +1057,10 @@ public class FailoverTransactionTest extends TestSupport
{
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
broker = createBroker(true);
broker.start();
+
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
@@ -1037,6 +1091,9 @@ public class FailoverTransactionTest extends TestSupport {
broker = createBroker(false, url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertNotNull("should get rolledback message from original restarted
broker", consumer.receive(20000));
connection.close();
}
@@ -1067,6 +1124,9 @@ public class FailoverTransactionTest extends TestSupport {
setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
final CountDownLatch commitDone = new CountDownLatch(1);
final CountDownLatch gotException = new CountDownLatch(1);
// will block pending re-deliveries
@@ -1089,6 +1149,9 @@ public class FailoverTransactionTest extends TestSupport {
broker = createBroker(false, url);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
assertTrue("commit was successful", commitDone.await(30,
TimeUnit.SECONDS));
assertTrue("got exception on commit", gotException.await(30,
TimeUnit.SECONDS));
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java
index 880366ec59..9125611ae2 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java
@@ -62,11 +62,15 @@ public class FailoverXATransactionTest {
public void startBroker(boolean deleteAllMessagesOnStartup) throws
Exception {
broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void startBroker(boolean deleteAllMessagesOnStartup, String
bindAddress) throws Exception {
broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public BrokerService createBroker(boolean deleteAllMessagesOnStartup)
throws Exception {
@@ -86,7 +90,7 @@ public class FailoverXATransactionTest {
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
- url =
broker.getTransportConnectors().get(0).getConnectUri().toString();
+ // Do not set url here - need to get it after broker starts when using
ephemeral ports
return broker;
}
@@ -123,6 +127,9 @@ public class FailoverXATransactionTest {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQXAConnectionFactory cf = new
ActiveMQXAConnectionFactory("failover:(" + url + ")");
XAConnection connection = cf.createXAConnection();
connection.start();
@@ -181,6 +188,9 @@ public class FailoverXATransactionTest {
});
broker.start();
+ // Get the actual bound URI after broker starts (important for
ephemeral ports)
+ url =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
ActiveMQXAConnectionFactory cf = new
ActiveMQXAConnectionFactory("failover:(" + url + ")");
XAConnection connection = cf.createXAConnection();
connection.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact