This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 09f1105cb4 fix(tests): fix flaky tests (#1753)
09f1105cb4 is described below
commit 09f1105cb41ef6fee112f3a3c6d31b4638a0dfe8
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Mar 11 13:21:50 2026 +0100
fix(tests): fix flaky tests (#1753)
* fix(tests): fix flaky tests
* fix(test): improve flakiness in
AbstractCachedLDAPAuthorizationMapLegacyTest by using WaitFor
---
.../transport/amqp/interop/AmqpReceiverTest.java | 44 ++--
.../activemq/usage/MemoryUsageConcurrencyTest.java | 122 +++++------
.../apache/activemq/transport/mqtt/MQTTTest.java | 17 +-
.../DurableFiveBrokerNetworkBridgeTest.java | 226 ++++++++++-----------
...stractCachedLDAPAuthorizationMapLegacyTest.java | 25 ++-
.../security/SimpleAuthenticationPluginTest.java | 15 +-
.../transport/vm/VMTransportThreadSafeTest.java | 51 ++---
.../org/apache/activemq/usecases/AMQ6366Test.java | 56 +++--
8 files changed, 289 insertions(+), 267 deletions(-)
diff --git
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 78626d83f1..886b2a5f4a 100644
---
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -453,14 +453,14 @@ public class AmqpReceiverTest extends
AmqpClientTestSupport {
@Test(timeout = 60000)
public void
testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws
Exception {
- int MSG_COUNT = 4;
+ final int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT, false);
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = trackConnection(client.connect());
- AmqpSession session = connection.createSession();
+ final AmqpClient client = createAmqpClient();
+ final AmqpConnection connection = trackConnection(client.connect());
+ final AmqpSession session = connection.createSession();
- AmqpReceiver receiver1 = session.createReceiver("queue://" +
getTestName());
+ final AmqpReceiver receiver1 = session.createReceiver("queue://" +
getTestName());
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getQueueSize());
@@ -473,17 +473,15 @@ public class AmqpReceiverTest extends
AmqpClientTestSupport {
assertNotNull(message);
message.accept();
- assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+ assertTrue("Should have ack'd two", Wait.waitFor(
+ () -> queueView.getDequeueCount() == 2,
+ TimeUnit.SECONDS.toMillis(5),
TimeUnit.MILLISECONDS.toMillis(50)));
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getDequeueCount() == 2;
- }
- }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+ final AmqpReceiver receiver2 = session.createReceiver("queue://" +
getTestName());
- AmqpReceiver receiver2 = session.createReceiver("queue://" +
getTestName());
-
- assertEquals(2,
brokerService.getAdminView().getQueueSubscribers().length);
+ assertTrue("Second receiver should be registered", Wait.waitFor(
+ () ->
brokerService.getAdminView().getQueueSubscribers().length == 2,
+ TimeUnit.SECONDS.toMillis(5),
TimeUnit.MILLISECONDS.toMillis(50)));
receiver2.flow(2);
message = receiver2.receive(5, TimeUnit.SECONDS);
@@ -493,19 +491,19 @@ public class AmqpReceiverTest extends
AmqpClientTestSupport {
assertNotNull(message);
message.accept();
- assertEquals(MSG_COUNT, queueView.getDispatchCount());
- assertTrue("Queue should be empty now", Wait.waitFor(new
Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getDequeueCount() == 4;
- }
- }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
+ assertTrue("All messages should be dispatched", Wait.waitFor(
+ () -> queueView.getDispatchCount() == MSG_COUNT,
+ TimeUnit.SECONDS.toMillis(5),
TimeUnit.MILLISECONDS.toMillis(50)));
+ assertTrue("Queue should be empty now", Wait.waitFor(
+ () -> queueView.getDequeueCount() == MSG_COUNT,
+ TimeUnit.SECONDS.toMillis(15),
TimeUnit.MILLISECONDS.toMillis(10)));
receiver1.close();
receiver2.close();
- assertEquals(0, queueView.getQueueSize());
+ assertTrue("Queue size should reach zero", Wait.waitFor(
+ () -> queueView.getQueueSize() == 0,
+ TimeUnit.SECONDS.toMillis(5),
TimeUnit.MILLISECONDS.toMillis(50)));
connection.close();
}
diff --git
a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
index 503ebc4eac..56a4297522 100644
---
a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
+++
b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
@@ -39,26 +39,27 @@ public class MemoryUsageConcurrencyTest {
@Test
public void testCycle() throws Exception {
- Random r = new Random(0xb4a14);
+ final Random r = new Random(0xb4a14);
for (int i = 0; i < 30000; i++) {
checkPercentage(i, i, r.nextInt(100) + 10, i % 2 == 0, i % 5 == 0);
}
}
- private void checkPercentage(int attempt, int seed, int operations,
boolean useArrayBlocking, boolean useWaitForSpaceThread) throws
InterruptedException {
+ private void checkPercentage(final int attempt, final int seed, final int
operations,
+ final boolean useArrayBlocking, final boolean
useWaitForSpaceThread) throws InterruptedException {
final BlockingQueue<Integer> toAdd;
final BlockingQueue<Integer> toRemove;
final BlockingQueue<Integer> removed;
if (useArrayBlocking) {
- toAdd = new ArrayBlockingQueue<Integer>(operations);
- toRemove = new ArrayBlockingQueue<Integer>(operations);
- removed = new ArrayBlockingQueue<Integer>(operations);
+ toAdd = new ArrayBlockingQueue<>(operations);
+ toRemove = new ArrayBlockingQueue<>(operations);
+ removed = new ArrayBlockingQueue<>(operations);
} else {
- toAdd = new LinkedBlockingQueue<Integer>();
- toRemove = new LinkedBlockingQueue<Integer>();
- removed = new LinkedBlockingQueue<Integer>();
+ toAdd = new LinkedBlockingQueue<>();
+ toRemove = new LinkedBlockingQueue<>();
+ removed = new LinkedBlockingQueue<>();
}
final AtomicBoolean running = new AtomicBoolean(true);
@@ -68,14 +69,13 @@ public class MemoryUsageConcurrencyTest {
memUsage.setLimit(1000);
memUsage.start();
- Thread addThread = new Thread(new Runnable() {
- @Override
- public void run() {
+ try {
+ final Thread addThread = new Thread(() -> {
try {
startLatch.await();
while (true) {
- Integer add = toAdd.poll(1, TimeUnit.MILLISECONDS);
+ final Integer add = toAdd.poll(1,
TimeUnit.MILLISECONDS);
if (add == null) {
if (!running.get()) {
break;
@@ -89,17 +89,14 @@ public class MemoryUsageConcurrencyTest {
} catch (Exception e) {
e.printStackTrace();
}
- }
- });
+ });
- Thread removeThread = new Thread(new Runnable() {
- @Override
- public void run() {
+ final Thread removeThread = new Thread(() -> {
try {
startLatch.await();
while (true) {
- Integer remove = toRemove.poll(1,
TimeUnit.MILLISECONDS);
+ final Integer remove = toRemove.poll(1,
TimeUnit.MILLISECONDS);
if (remove == null) {
if (!running.get()) {
break;
@@ -112,65 +109,74 @@ public class MemoryUsageConcurrencyTest {
} catch (Exception e) {
e.printStackTrace();
}
- }
- });
+ });
- Thread waitForSpaceThread = new Thread(new Runnable() {
- @Override
- public void run() {
+ // Use waitForSpace(timeout) instead of unbounded waitForSpace()
to avoid
+ // indefinite blocking when usage is >= 100%. The bounded version
will return
+ // after the timeout, allowing the thread to check the running
flag and exit.
+ final Thread waitForSpaceThread = new Thread(() -> {
try {
startLatch.await();
while (running.get()) {
- memUsage.waitForSpace();
+ memUsage.waitForSpace(100);
}
} catch (Exception e) {
e.printStackTrace();
}
+ });
+
+ // Mark all threads as daemon so they cannot prevent JVM shutdown
+ // even if cleanup logic fails to stop them
+ addThread.setDaemon(true);
+ removeThread.setDaemon(true);
+ waitForSpaceThread.setDaemon(true);
+
+ removeThread.start();
+ addThread.start();
+ if (useWaitForSpaceThread) {
+ waitForSpaceThread.start();
}
- });
- removeThread.start();
- addThread.start();
- if (useWaitForSpaceThread) {
- waitForSpaceThread.start();
- }
+ final Random r = new Random(seed);
- Random r = new Random(seed);
+ startLatch.countDown();
- startLatch.countDown();
+ for (int i = 0; i < operations; i++) {
+ toAdd.add(r.nextInt(100) + 1);
+ }
- for (int i = 0; i < operations; i++) {
- toAdd.add(r.nextInt(100) + 1);
- }
+ // we expect the failure percentage to be related to the last
operation
+ final List<Integer> ops = new ArrayList<>(operations);
+ for (int i = 0; i < operations; i++) {
+ final Integer op = removed.poll(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(op);
+ ops.add(op);
+ }
- // we expect the failure percentage to be related to the last operation
- List<Integer> ops = new ArrayList<Integer>(operations);
- for (int i = 0; i < operations; i++) {
- Integer op = removed.poll(1000, TimeUnit.MILLISECONDS);
- assertNotNull(op);
- ops.add(op);
- }
+ running.set(false);
- running.set(false);
+ addThread.join(5000);
+ removeThread.join(5000);
- if (useWaitForSpaceThread) {
- try {
- waitForSpaceThread.join(1000);
- } catch (InterruptedException e) {
- LOG.debug("Attempt: {} : {} waitForSpace never returned",
attempt, memUsage);
- waitForSpaceThread.interrupt();
- waitForSpaceThread.join();
+ if (useWaitForSpaceThread) {
+ waitForSpaceThread.join(5000);
+ if (waitForSpaceThread.isAlive()) {
+ LOG.debug("Attempt: {} : {} waitForSpace thread still
alive after join, interrupting", attempt, memUsage);
+ waitForSpaceThread.interrupt();
+ waitForSpaceThread.join(1000);
+ }
}
- }
- removeThread.join();
- addThread.join();
-
- if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() !=
memUsage.getPercentUsage()) {
- LOG.debug("Attempt: {} : {}", attempt, memUsage);
- LOG.debug("Operations: {}", ops);
- assertEquals(0, memUsage.getPercentUsage());
+ if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() !=
memUsage.getPercentUsage()) {
+ LOG.debug("Attempt: {} : {}", attempt, memUsage);
+ LOG.debug("Operations: {}", ops);
+ assertEquals(0, memUsage.getPercentUsage());
+ }
+ } finally {
+ // Stop the MemoryUsage to signal waitForSpaceCondition, which
unblocks
+ // any thread stuck in waitForSpace(). This is critical for
cleanup.
+ memUsage.stop();
}
}
}
diff --git
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 6729a968d7..2f22505c33 100644
---
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1647,8 +1647,8 @@ public class MQTTTest extends MQTTTestSupport {
payload[i] = '2';
}
- int numberOfRuns = 10;
- int messagesPerRun = 2;
+ final int numberOfRuns = 10;
+ final int messagesPerRun = 2;
final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
@@ -1659,16 +1659,21 @@ public class MQTTTest extends MQTTTestSupport {
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
- Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
+ final Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
connectionSub.subscribe(topics);
+ // Wait for subscription to become active before publishing
+ assertTrue("Subscription should become active",
+ Wait.waitFor(() -> isSubscriptionActive(topics[0],
mqttSub.getClientId().toString()),
+ TimeUnit.SECONDS.toMillis(5), 100));
+
for (int i = 0; i < messagesPerRun; ++i) {
connectionPub.publish(topics[0].name().toString(), payload,
QoS.AT_LEAST_ONCE, false);
}
int received = 0;
for (int i = 0; i < messagesPerRun; ++i) {
- Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+ final Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
received++;
assertTrue(Arrays.equals(payload, message.getPayload()));
@@ -1717,7 +1722,7 @@ public class MQTTTest extends MQTTTestSupport {
private boolean isSubscriptionInactive(Topic topic, String clientId)
throws Exception {
if (isVirtualTopicSubscriptionStrategy()) {
- String queueName = buildVirtualTopicQueueName(topic, clientId);
+ final String queueName = buildVirtualTopicQueueName(topic,
clientId);
try {
return getProxyToQueue(queueName).getConsumerCount() == 0;
} catch (Exception ignore) {
@@ -1738,7 +1743,7 @@ public class MQTTTest extends MQTTTestSupport {
return false;
}
} else {
- return
brokerService.getAdminView().getDurableTopicSubscribers().length == 1 &&
+ return
brokerService.getAdminView().getDurableTopicSubscribers().length >= 1 &&
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 723c854027..0265229fb4 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -20,12 +20,14 @@ import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import junit.framework.AssertionFailedError;
+import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
@@ -35,9 +37,6 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-
-import junit.framework.Test;
/**
* Test to make sure durable subscriptions propagate properly throughout
network bridges
@@ -55,8 +54,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
protected NetworkConnector bridgeBrokers(String localBrokerName, String
remoteBrokerName,
boolean dynamicOnly, int
networkTTL) throws Exception {
- NetworkConnector connector = super.bridgeBrokers(localBrokerName,
remoteBrokerName);
- ArrayList<ActiveMQDestination> includedDestinations = new
ArrayList<>();
+ final NetworkConnector connector =
super.bridgeBrokers(localBrokerName, remoteBrokerName);
+ final ArrayList<ActiveMQDestination> includedDestinations = new
ArrayList<>();
includedDestinations.add(new
ActiveMQTopic("TEST.FOO?forceDurable=true"));
connector.setDynamicallyIncludedDestinations(includedDestinations);
connector.setDuplex(duplex);
@@ -79,17 +78,18 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
bridgeBrokers("Broker_D_D", "Broker_E_E");
startAllBrokers();
+ waitForBridgeFormation();
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
// Setup consumers
Connection conn = brokers.get("Broker_A_A").factory.createConnection();
conn.setClientID("clientId1");
conn.start();
Session ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
- MessageConsumer clientA2 = ses.createDurableSubscriber(dest, "subA2");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
+ final MessageConsumer clientA2 = ses.createDurableSubscriber(dest,
"subA2");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
@@ -104,7 +104,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
conn2.start();
Session ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
- MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, "subE2");
+ final MessageConsumer clientE2 = ses2.createDurableSubscriber(dest,
"subE2");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
@@ -120,18 +120,19 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
this.destroyAllBrokers();
deletePersistentMessagesOnStartup = false;
- String options = new String("?persistent=true&useJmx=false");
- createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" +
options));
- createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" +
options));
- createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" +
options));
- createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" +
options));
- createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" +
options));
+ final String options = "?persistent=true&useJmx=false";
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_A_A" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_B_B" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_C_C" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_D_D" +
options));
+ createBroker(new URI("broker:(tcp://localhost:0)/Broker_E_E" +
options));
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
bridgeBrokers("Broker_C_C", "Broker_D_D");
bridgeBrokers("Broker_D_D", "Broker_E_E");
startAllBrokers();
+ waitForBridgeFormation();
conn = brokers.get("Broker_A_A").factory.createConnection();
conn.setClientID("clientId1");
@@ -180,14 +181,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
}
startAllBrokers();
+ waitForBridgeFormation();
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("Broker_A_A");
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
- MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+ final Session ses = createSession("Broker_A_A");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
+ final MessageConsumer clientB = ses.createDurableSubscriber(dest,
"subB");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
@@ -199,8 +201,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
assertNotNull(clientB.receive(1000));
//bring online a consumer on the other side
- Session ses2 = createSession("Broker_C_C");
- MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+ final Session ses2 = createSession("Broker_C_C");
+ final MessageConsumer clientC = ses2.createDurableSubscriber(dest,
"subC");
//there will be 2 network durables, 1 for each direction of the bridge
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
@@ -216,7 +218,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
-
}
public void testDurablePropagationConsumerAllBrokersDuplex() throws
Exception {
@@ -239,13 +240,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
}
startAllBrokers();
+ waitForBridgeFormation();
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("Broker_A_A");
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+ final Session ses = createSession("Broker_A_A");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
@@ -253,21 +255,20 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
//bring online a consumer on the other side
- Session ses2 = createSession("Broker_B_B");
- MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+ final Session ses2 = createSession("Broker_B_B");
+ final MessageConsumer clientB = ses2.createDurableSubscriber(dest,
"subB");
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
- Session ses3 = createSession("Broker_C_C");
- MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+ final Session ses3 = createSession("Broker_C_C");
+ final MessageConsumer clientC = ses3.createDurableSubscriber(dest,
"subC");
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
-
clientA.close();
clientB.close();
clientC.close();
@@ -275,11 +276,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
ses2.unsubscribe("subB");
ses3.unsubscribe("subC");
-
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
-
}
public void testDurablePropagation5BrokerDuplex() throws Exception {
@@ -306,15 +305,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
}
startAllBrokers();
+ waitForBridgeFormation();
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("Broker_A_A");
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+ final Session ses = createSession("Broker_A_A");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
- // let consumers propagate around the network
(assertNCDurableSubsCount waits internally)
+ // let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
@@ -325,10 +325,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
assertNotNull(clientA.receive(1000));
//bring online a consumer on the other side
- Session ses2 = createSession("Broker_E_E");
- MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+ final Session ses2 = createSession("Broker_E_E");
+ final MessageConsumer clientE = ses2.createDurableSubscriber(dest,
"subE");
- //there will be 2 network durables, 1 for each direction of the bridge
(assertNCDurableSubsCount waits internally)
+ //there will be 2 network durables, 1 for each direction of the bridge
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
@@ -345,7 +345,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
-
}
public void testDurablePropagationSpokeDuplex() throws Exception {
@@ -370,26 +369,27 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
}
startAllBrokers();
+ waitForBridgeFormation();
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("Broker_A_A");
- Session ses2 = createSession("Broker_B_B");
- Session ses3 = createSession("Broker_C_C");
- Session ses4 = createSession("Broker_D_D");
+ final Session ses = createSession("Broker_A_A");
+ final Session ses2 = createSession("Broker_B_B");
+ final Session ses3 = createSession("Broker_C_C");
+ final Session ses4 = createSession("Broker_D_D");
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
- MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
+ final MessageConsumer clientAB = ses.createDurableSubscriber(dest,
"subAB");
- // let consumers propagate around the network
(assertNCDurableSubsCount waits internally)
+ // let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
- MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
+ final MessageConsumer clientD = ses4.createDurableSubscriber(dest,
"subD");
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
@@ -401,10 +401,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientD.receive(1000));
- MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
- MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
- Thread.sleep(1000);
+ final MessageConsumer clientB = ses2.createDurableSubscriber(dest,
"subB");
+ final MessageConsumer clientC = ses3.createDurableSubscriber(dest,
"subC");
+ // let consumers propagate around the network (Wait.waitFor polls
internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 3);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
@@ -448,16 +448,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
}
startAllBrokers();
+ waitForBridgeFormation();
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("Broker_A_A");
- MessageConsumer clientA = ses.createConsumer(dest);
- Thread.sleep(1000);
+ final Session ses = createSession("Broker_A_A");
+ final MessageConsumer clientA = ses.createConsumer(dest);
- // let consumers propagate around the network
+ // let consumers propagate around the network (Wait.waitFor polls
internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
@@ -465,10 +465,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientA.receive(1000));
- Session ses2 = createSession("Broker_C_C");
- MessageConsumer clientC = ses2.createConsumer(dest);
- Thread.sleep(1000);
+ final Session ses2 = createSession("Broker_C_C");
+ final MessageConsumer clientC = ses2.createConsumer(dest);
+ // let consumers propagate around the network (Wait.waitFor polls
internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
@@ -493,8 +493,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
protected void testDurablePropagationSync() throws Exception {
// Setup broker networks
- NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B");
- NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
+ final NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+ final NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
NetworkConnector nc3 = null;
NetworkConnector nc4 = null;
@@ -514,16 +514,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
}
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
- // Setup consumers
- Session ses = createSession("Broker_A_A");
- Session ses2 = createSession("Broker_C_C");
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
- MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
- MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
- Thread.sleep(1000);
+ // Setup consumers -- no bridges are running so no NC subs should be
created
+ final Session ses = createSession("Broker_A_A");
+ final Session ses2 = createSession("Broker_C_C");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
+ final MessageConsumer clientB = ses.createDurableSubscriber(dest,
"subB");
+ final MessageConsumer clientC = ses2.createDurableSubscriber(dest,
"subC");
+ // No bridges running, so no NC durable subs should exist
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
@@ -553,31 +553,31 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
bridgeBrokers("Broker_B_B", "Broker_C_C");
//Duplicate the bridges with different included destinations - valid
use case
- NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B");
- NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C");
+ final NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+ final NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C");
nc3.setName("nc_3_3");
nc4.setName("nc_4_4");
- ArrayList<ActiveMQDestination> includedDestinations = new
ArrayList<>();
+ final ArrayList<ActiveMQDestination> includedDestinations = new
ArrayList<>();
includedDestinations.add(new
ActiveMQTopic("TEST.FOO2?forceDurable=true"));
nc3.setDynamicallyIncludedDestinations(includedDestinations);
nc4.setDynamicallyIncludedDestinations(includedDestinations);
startAllBrokers();
+ waitForBridgeFormation();
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
- ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
+ final ActiveMQTopic dest2 = (ActiveMQTopic)
createDestination("TEST.FOO2", true);
// Setup consumers
- Session ses = createSession("Broker_A_A");
- Session ses2 = createSession("Broker_C_C");
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
- MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
- MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
- MessageConsumer clientCc = ses2.createDurableSubscriber(dest2,
"subCc");
- Thread.sleep(1000);
-
- //make sure network durables are online
+ final Session ses = createSession("Broker_A_A");
+ final Session ses2 = createSession("Broker_C_C");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
+ final MessageConsumer clientAa = ses.createDurableSubscriber(dest2,
"subAa");
+ final MessageConsumer clientC = ses2.createDurableSubscriber(dest,
"subC");
+ final MessageConsumer clientCc = ses2.createDurableSubscriber(dest2,
"subCc");
+
+ //make sure network durables are online (Wait.waitFor polls internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
@@ -729,32 +729,32 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
testDurablePropagation(-1, true, List.of(1, 2, 2, 2, 1));
}
- private void testDurablePropagation(int ttl, boolean dynamicOnly,
- List<Integer> expected) throws
Exception {
+ private void testDurablePropagation(final int ttl, final boolean
dynamicOnly,
+ final List<Integer> expected) throws
Exception {
testDurablePropagation(ttl, dynamicOnly, false, expected);
}
- private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean
restartBrokers,
- List<Integer> expected) throws
Exception {
+ private void testDurablePropagation(final int ttl, final boolean
dynamicOnly, final boolean restartBrokers,
+ final List<Integer> expected) throws
Exception {
duplex = true;
// Setup broker networks
- NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B",
dynamicOnly, ttl);
- NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C",
dynamicOnly, ttl);
- NetworkConnector nc3 = bridgeBrokers("Broker_C_C", "Broker_D_D",
dynamicOnly, ttl);
- NetworkConnector nc4 = bridgeBrokers("Broker_D_D", "Broker_E_E",
dynamicOnly, ttl);
+ final NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B",
dynamicOnly, ttl);
+ final NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C",
dynamicOnly, ttl);
+ final NetworkConnector nc3 = bridgeBrokers("Broker_C_C", "Broker_D_D",
dynamicOnly, ttl);
+ final NetworkConnector nc4 = bridgeBrokers("Broker_D_D", "Broker_E_E",
dynamicOnly, ttl);
startAllBrokers();
stopNetworkConnectors(nc1, nc2, nc3, nc4);
// Setup destination
- ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO",
true);
+ final ActiveMQTopic dest = (ActiveMQTopic)
createDestination("TEST.FOO", true);
// Setup consumers
- Session ses = createSession("Broker_A_A");
- Session ses2 = createSession("Broker_E_E");
- MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
- MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+ final Session ses = createSession("Broker_A_A");
+ final Session ses2 = createSession("Broker_E_E");
+ final MessageConsumer clientA = ses.createDurableSubscriber(dest,
"subA");
+ final MessageConsumer clientE = ses2.createDurableSubscriber(dest,
"subE");
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
@@ -764,7 +764,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
startNetworkConnectors(nc1, nc2, nc3, nc4);
- // Check that the correct network durables exist
(assertNCDurableSubsCount waits internally)
+ // Check that the correct network durables exist
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest,
expected.get(0));
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest,
expected.get(1));
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest,
expected.get(2));
@@ -784,6 +784,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
bridgeBrokers("Broker_C_C", "Broker_D_D", dynamicOnly, ttl);
bridgeBrokers("Broker_D_D", "Broker_E_E", dynamicOnly, ttl);
startAllBrokers();
+ waitForBridgeFormation();
} else {
// restart just the network connectors but leave the consumers
online
// to test sync works ok. Things should work for all cases both
dynamicOnly
@@ -792,7 +793,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
startNetworkConnectors(nc1, nc2, nc3, nc4);
}
- // after restarting the bridges, check sync/demand are correct
(assertNCDurableSubsCount waits internally)
+ // after restarting the bridges, check sync/demand are correct
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest,
expected.get(0));
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest,
expected.get(1));
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest,
expected.get(2));
@@ -802,28 +803,27 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest,
final int count) throws Exception {
- assertTrue(Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return count == getNCDurableSubs(brokerService, dest).size();
- }
- }, 10000, 500));
+ assertTrue("Expected " + count + " NC durable sub(s) on " +
brokerService.getBrokerName()
+ + " for " + dest.getTopicName() + ", but got "
+ + getNCDurableSubs(brokerService, dest).size(),
+ Wait.waitFor(() -> count == getNCDurableSubs(brokerService,
dest).size(),
+ TimeUnit.SECONDS.toMillis(30), 500));
}
protected List<DurableTopicSubscription> getNCDurableSubs(final
BrokerService brokerService,
final ActiveMQTopic dest) throws Exception {
- List<DurableTopicSubscription> subs = new ArrayList<>();
- Destination d = brokerService.getDestination(dest);
- org.apache.activemq.broker.region.Topic destination = null;
+ final List<DurableTopicSubscription> subs = new ArrayList<>();
+ final Destination d = brokerService.getDestination(dest);
+ final org.apache.activemq.broker.region.Topic destination;
if (d instanceof DestinationFilter) {
destination = ((DestinationFilter)
d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
} else {
destination = (org.apache.activemq.broker.region.Topic) d;
}
- for (SubscriptionKey key : destination.getDurableTopicSubs().keySet())
{
+ for (final SubscriptionKey key :
destination.getDurableTopicSubs().keySet()) {
if
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
{
- DurableTopicSubscription sub =
destination.getDurableTopicSubs().get(key);
+ final DurableTopicSubscription sub =
destination.getDurableTopicSubs().get(key);
if (sub != null && sub.isActive()) {
subs.add(sub);
}
@@ -855,19 +855,19 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
}
protected void startNetworkConnectors(NetworkConnector... connectors)
throws Exception {
- for (NetworkConnector connector : connectors) {
+ for (final NetworkConnector connector : connectors) {
connector.start();
}
}
protected void stopNetworkConnectors(NetworkConnector... connectors)
throws Exception {
- for (NetworkConnector connector : connectors) {
+ for (final NetworkConnector connector : connectors) {
connector.stop();
}
}
protected Session createSession(String broker) throws Exception {
- Connection con = createConnection(broker);
+ final Connection con = createConnection(broker);
con.start();
return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
index 36069630ea..8d7339bc42 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
@@ -202,16 +202,23 @@ public abstract class
AbstractCachedLDAPAuthorizationMapLegacyTest extends Abstr
reader.close();
- assertTrue("did not get expected size after remove", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return map.getReadACLs(new ActiveMQQueue("TEST.FOO")).size()
== 0;
- }
+ assertTrue("did not get expected size after remove", Wait.waitFor(
+ () -> map.getReadACLs(new ActiveMQQueue("TEST.FOO")).size() ==
0));
+
+ // Temp destination ACLs are removed by a separate LDAP listener
+ // (on the Temp subtree), so events may arrive after the Queue events.
+ assertTrue("Temp read ACLs not cleared after remove", Wait.waitFor(()
-> {
+ final Set<?> acls = map.getTempDestinationReadACLs();
+ return acls == null || acls.isEmpty();
+ }));
+ assertTrue("Temp write ACLs not cleared after remove", Wait.waitFor(()
-> {
+ final Set<?> acls = map.getTempDestinationWriteACLs();
+ return acls == null || acls.isEmpty();
+ }));
+ assertTrue("Temp admin ACLs not cleared after remove", Wait.waitFor(()
-> {
+ final Set<?> acls = map.getTempDestinationAdminACLs();
+ return acls == null || acls.isEmpty();
}));
-
- assertTrue(map.getTempDestinationReadACLs() == null ||
map.getTempDestinationReadACLs().isEmpty());
- assertTrue(map.getTempDestinationWriteACLs() == null ||
map.getTempDestinationWriteACLs().isEmpty());
- assertTrue(map.getTempDestinationAdminACLs() == null ||
map.getTempDestinationAdminACLs().isEmpty());
}
@Test
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
index 269a2ee056..215947280d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
@@ -43,6 +43,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.experimental.categories.Category;
import org.apache.activemq.test.annotations.ParallelTest;
@@ -111,11 +112,12 @@ public class SimpleAuthenticationPluginTest extends
SecurityTestSupport {
public void testConnectionStartThrowsJMSSecurityException() throws
Exception {
final CountDownLatch exceptionLatch = new CountDownLatch(1);
+ final AtomicReference<JMSException> receivedException = new
AtomicReference<>();
try (final Connection connection = factory.createConnection("badUser",
"password")) {
connection.setExceptionListener(e -> {
LOG.info("Connection received exception: {}", e.getMessage());
- assertTrue(e instanceof JMSSecurityException);
+ receivedException.set(e);
exceptionLatch.countDown();
});
@@ -123,14 +125,13 @@ public class SimpleAuthenticationPluginTest extends
SecurityTestSupport {
connection.start();
// If start() doesn't throw synchronously, wait for async
exception
- assertTrue("Should receive security exception via listener",
exceptionLatch.await(5, TimeUnit.SECONDS));
-
+ assertTrue("Should receive security exception via listener",
+ exceptionLatch.await(5, TimeUnit.SECONDS));
+ assertNotNull("Exception should have been received",
receivedException.get());
+ assertTrue("Should be JMSSecurityException but was: " +
receivedException.get().getClass(),
+ receivedException.get() instanceof JMSSecurityException);
} catch (final JMSSecurityException jmsEx) {
// Synchronous security exception - expected
- } catch (final JMSException e) {
- // with the latch, we should always pass first into the
listener and assert the right exception
- LOG.info("Expected JMSSecurityException but was: {}",
e.getClass());
- fail("Should throw JMSSecurityException");
}
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
index e857de100e..9c49e19d2e 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
@@ -558,9 +558,10 @@ public class VMTransportThreadSafeTest {
@Test(timeout=60000)
public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
// In the async case the iterate method should see that we are
stopping and
- // drop out before we dispatch all the messages but it should get at
least 49 since
- // the stop thread waits 500 mills and the listener is waiting 10
mills on each receive.
- doTestStopWhileStartingWithNoAsyncLimit(true, 49);
+ // drop out before we dispatch all the messages. We wait until the
TaskRunner
+ // has started processing (at least 1 message received), then stop
mid-stream.
+ // Some messages should be received but not all 100.
+ doTestStopWhileStartingWithNoAsyncLimit(true, 1);
}
@Test(timeout=60000)
@@ -569,7 +570,7 @@ public class VMTransportThreadSafeTest {
doTestStopWhileStartingWithNoAsyncLimit(false, 100);
}
- private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final
int expect) throws Exception {
+ private void doTestStopWhileStartingWithNoAsyncLimit(final boolean async,
final int expect) throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
@@ -588,39 +589,31 @@ public class VMTransportThreadSafeTest {
local.oneway(new DummyCommand(i));
}
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- remote.stop();
- } catch (Exception e) {
- }
- }
- });
-
remote.start();
- t.start();
+ if (async) {
+ // Wait until the TaskRunner has actually started processing
messages
+ // before stopping, so we don't race and stop before any delivery
occurs.
+ assertTrue("Remote should start receiving messages",
+ Wait.waitFor(() -> remoteReceived.size() > 0, 5000, 10));
- assertTrue("Remote should receive: " + expect + ", commands but got: "
+ remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() >= expect;
- }
- }));
+ // Now stop mid-stream - some messages have been delivered,
transport should dispose cleanly
+ remote.stop();
+ } else {
+ // Non-async: start() dispatches all messages synchronously, so
they are
+ // already received. Just stop normally.
+ remote.stop();
+ }
+
+ assertTrue("Remote should receive at least " + expect + " commands but
got: " + remoteReceived.size(),
+ Wait.waitFor(() -> remoteReceived.size() >= expect));
LOG.debug("Remote listener received " + remoteReceived.size() + "
messages");
local.stop();
- assertTrue("Remote transport never was disposed.", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remote.isDisposed();
- }
- }));
+ assertTrue("Remote transport never was disposed.",
+ Wait.waitFor(() -> remote.isDisposed()));
}
@Test(timeout=120000)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
index ec57b22303..afe382934c 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
@@ -34,6 +34,7 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
@@ -63,20 +64,31 @@ public class AMQ6366Test extends
JmsMultipleBrokersTestSupport {
testNonDurableReceiveThrougRestart("BrokerB", "BrokerA");
}
- protected void testNonDurableReceiveThrougRestart(String pubBroker, String
conBroker) throws Exception {
- NetworkConnector networkConnector = bridgeBrokerPair("BrokerA",
"BrokerB");
+ protected void testNonDurableReceiveThrougRestart(final String pubBroker,
final String conBroker) throws Exception {
+ final NetworkConnector networkConnector = bridgeBrokerPair("BrokerA",
"BrokerB");
startAllBrokers();
waitForBridgeFormation();
- MessageConsumer client = createDurableSubscriber(conBroker, dest,
"sub1");
+ final MessageConsumer client = createDurableSubscriber(conBroker,
dest, "sub1");
client.close();
- Thread.sleep(1000);
+ // Wait for the durable subscription to become inactive after closing
the consumer
+ final Topic conBrokerDest = (Topic)
brokers.get(conBroker).broker.getDestination(dest);
+ assertTrue("Durable sub should become inactive after close",
+ Wait.waitFor(() -> {
+ final DurableTopicSubscription[] subs =
conBrokerDest.getDurableTopicSubs()
+ .values().toArray(new DurableTopicSubscription[0]);
+ return subs.length > 0 && !subs[0].isActive();
+ }, 5000, 100));
+
networkConnector.stop();
- Thread.sleep(1000);
- Set<ActiveMQDestination> durableDests = new HashSet<>();
+ // Wait for the network connector to fully stop
+ assertTrue("Network connector should stop",
+ Wait.waitFor(networkConnector::isStopped, 5000, 100));
+
+ final Set<ActiveMQDestination> durableDests = new HashSet<>();
durableDests.add(dest);
//Normally set on broker start from the persistence layer but
//simulate here since we just stopped and started the network connector
@@ -87,14 +99,15 @@ public class AMQ6366Test extends
JmsMultipleBrokersTestSupport {
// Send messages
sendMessages(pubBroker, dest, 1);
- Thread.sleep(1000);
-
- Topic destination = (Topic)
brokers.get(conBroker).broker.getDestination(dest);
- DurableTopicSubscription sub = destination.getDurableTopicSubs().
- values().toArray(new DurableTopicSubscription[0])[0];
- //Assert that the message made it to the other broker
- assertEquals(1,
sub.getSubscriptionStatistics().getEnqueues().getCount());
+ // Wait for the message to be enqueued through the network bridge
+ final Topic destination = (Topic)
brokers.get(conBroker).broker.getDestination(dest);
+ assertTrue("Message should be enqueued to durable subscription",
+ Wait.waitFor(() -> {
+ final DurableTopicSubscription sub =
destination.getDurableTopicSubs()
+ .values().toArray(new
DurableTopicSubscription[0])[0];
+ return
sub.getSubscriptionStatistics().getEnqueues().getCount() == 1;
+ }, 10000, 100));
}
@Override
@@ -103,16 +116,15 @@ public class AMQ6366Test extends
JmsMultipleBrokersTestSupport {
broker.setAdvisorySupport(true);
}
- protected NetworkConnector bridgeBrokerPair(String localBrokerName, String
remoteBrokerName) throws Exception {
- BrokerService localBroker = brokers.get(localBrokerName).broker;
- BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
+ protected NetworkConnector bridgeBrokerPair(final String localBrokerName,
final String remoteBrokerName) throws Exception {
+ final BrokerService localBroker = brokers.get(localBrokerName).broker;
+ final BrokerService remoteBroker =
brokers.get(remoteBrokerName).broker;
- List<TransportConnector> transportConnectors =
remoteBroker.getTransportConnectors();
- URI remoteURI;
+ final List<TransportConnector> transportConnectors =
remoteBroker.getTransportConnectors();
if (!transportConnectors.isEmpty()) {
- remoteURI = transportConnectors.get(0).getConnectUri();
- String uri = "static:(" + remoteURI + ")";
- NetworkConnector connector = new DiscoveryNetworkConnector(new
URI(uri));
+ final URI remoteURI = transportConnectors.get(0).getConnectUri();
+ final String uri = "static:(" + remoteURI + ")";
+ final NetworkConnector connector = new
DiscoveryNetworkConnector(new URI(uri));
connector.setDynamicOnly(false); // so matching durable subs are
loaded on start
connector.setStaticBridge(false);
connector.setDuplex(true);
@@ -126,7 +138,7 @@ public class AMQ6366Test extends
JmsMultipleBrokersTestSupport {
@Override
public void setUp() throws Exception {
- File dataDir = new File(IOHelper.getDefaultDataDirectory());
+ final File dataDir = new File(IOHelper.getDefaultDataDirectory());
LOG.info("Delete dataDir.." + dataDir.getCanonicalPath());
org.apache.activemq.TestSupport.recursiveDelete(dataDir);
super.setAutoFail(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact