This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 09f1105cb4 fix(tests): fix flaky tests (#1753)
09f1105cb4 is described below

commit 09f1105cb41ef6fee112f3a3c6d31b4638a0dfe8
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Mar 11 13:21:50 2026 +0100

    fix(tests): fix flaky tests (#1753)
    
    * fix(tests): fix flaky tests
    
    * fix(test): improve flakiness in 
AbstractCachedLDAPAuthorizationMapLegacyTest by using WaitFor
---
 .../transport/amqp/interop/AmqpReceiverTest.java   |  44 ++--
 .../activemq/usage/MemoryUsageConcurrencyTest.java | 122 +++++------
 .../apache/activemq/transport/mqtt/MQTTTest.java   |  17 +-
 .../DurableFiveBrokerNetworkBridgeTest.java        | 226 ++++++++++-----------
 ...stractCachedLDAPAuthorizationMapLegacyTest.java |  25 ++-
 .../security/SimpleAuthenticationPluginTest.java   |  15 +-
 .../transport/vm/VMTransportThreadSafeTest.java    |  51 ++---
 .../org/apache/activemq/usecases/AMQ6366Test.java  |  56 +++--
 8 files changed, 289 insertions(+), 267 deletions(-)

diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 78626d83f1..886b2a5f4a 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -453,14 +453,14 @@ public class AmqpReceiverTest extends 
AmqpClientTestSupport {
 
     @Test(timeout = 60000)
     public void 
testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws 
Exception {
-        int MSG_COUNT = 4;
+        final int MSG_COUNT = 4;
         sendMessages(getTestName(), MSG_COUNT, false);
 
-        AmqpClient client = createAmqpClient();
-        AmqpConnection connection = trackConnection(client.connect());
-        AmqpSession session = connection.createSession();
+        final AmqpClient client = createAmqpClient();
+        final AmqpConnection connection = trackConnection(client.connect());
+        final AmqpSession session = connection.createSession();
 
-        AmqpReceiver receiver1 = session.createReceiver("queue://" + 
getTestName());
+        final AmqpReceiver receiver1 = session.createReceiver("queue://" + 
getTestName());
 
         final QueueViewMBean queueView = getProxyToQueue(getTestName());
         assertEquals(MSG_COUNT, queueView.getQueueSize());
@@ -473,17 +473,15 @@ public class AmqpReceiverTest extends 
AmqpClientTestSupport {
         assertNotNull(message);
         message.accept();
 
-        assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+        assertTrue("Should have ack'd two", Wait.waitFor(
+                () -> queueView.getDequeueCount() == 2,
+                TimeUnit.SECONDS.toMillis(5), 
TimeUnit.MILLISECONDS.toMillis(50)));
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return queueView.getDequeueCount() == 2;
-            }
-        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+        final AmqpReceiver receiver2 = session.createReceiver("queue://" + 
getTestName());
 
-        AmqpReceiver receiver2 = session.createReceiver("queue://" + 
getTestName());
-
-        assertEquals(2, 
brokerService.getAdminView().getQueueSubscribers().length);
+        assertTrue("Second receiver should be registered", Wait.waitFor(
+                () -> 
brokerService.getAdminView().getQueueSubscribers().length == 2,
+                TimeUnit.SECONDS.toMillis(5), 
TimeUnit.MILLISECONDS.toMillis(50)));
 
         receiver2.flow(2);
         message = receiver2.receive(5, TimeUnit.SECONDS);
@@ -493,19 +491,19 @@ public class AmqpReceiverTest extends 
AmqpClientTestSupport {
         assertNotNull(message);
         message.accept();
 
-        assertEquals(MSG_COUNT, queueView.getDispatchCount());
-        assertTrue("Queue should be empty now", Wait.waitFor(new 
Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return queueView.getDequeueCount() == 4;
-            }
-        }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
+        assertTrue("All messages should be dispatched", Wait.waitFor(
+                () -> queueView.getDispatchCount() == MSG_COUNT,
+                TimeUnit.SECONDS.toMillis(5), 
TimeUnit.MILLISECONDS.toMillis(50)));
+        assertTrue("Queue should be empty now", Wait.waitFor(
+                () -> queueView.getDequeueCount() == MSG_COUNT,
+                TimeUnit.SECONDS.toMillis(15), 
TimeUnit.MILLISECONDS.toMillis(10)));
 
         receiver1.close();
         receiver2.close();
 
-        assertEquals(0, queueView.getQueueSize());
+        assertTrue("Queue size should reach zero", Wait.waitFor(
+                () -> queueView.getQueueSize() == 0,
+                TimeUnit.SECONDS.toMillis(5), 
TimeUnit.MILLISECONDS.toMillis(50)));
 
         connection.close();
     }
diff --git 
a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
 
b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
index 503ebc4eac..56a4297522 100644
--- 
a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
+++ 
b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java
@@ -39,26 +39,27 @@ public class MemoryUsageConcurrencyTest {
 
     @Test
     public void testCycle() throws Exception {
-        Random r = new Random(0xb4a14);
+        final Random r = new Random(0xb4a14);
         for (int i = 0; i < 30000; i++) {
             checkPercentage(i, i, r.nextInt(100) + 10, i % 2 == 0, i % 5 == 0);
         }
     }
 
-    private void checkPercentage(int attempt, int seed, int operations, 
boolean useArrayBlocking, boolean useWaitForSpaceThread) throws 
InterruptedException {
+    private void checkPercentage(final int attempt, final int seed, final int 
operations,
+                                 final boolean useArrayBlocking, final boolean 
useWaitForSpaceThread) throws InterruptedException {
 
         final BlockingQueue<Integer> toAdd;
         final BlockingQueue<Integer> toRemove;
         final BlockingQueue<Integer> removed;
 
         if (useArrayBlocking) {
-            toAdd = new ArrayBlockingQueue<Integer>(operations);
-            toRemove = new ArrayBlockingQueue<Integer>(operations);
-            removed = new ArrayBlockingQueue<Integer>(operations);
+            toAdd = new ArrayBlockingQueue<>(operations);
+            toRemove = new ArrayBlockingQueue<>(operations);
+            removed = new ArrayBlockingQueue<>(operations);
         } else {
-            toAdd = new LinkedBlockingQueue<Integer>();
-            toRemove = new LinkedBlockingQueue<Integer>();
-            removed = new LinkedBlockingQueue<Integer>();
+            toAdd = new LinkedBlockingQueue<>();
+            toRemove = new LinkedBlockingQueue<>();
+            removed = new LinkedBlockingQueue<>();
         }
 
         final AtomicBoolean running = new AtomicBoolean(true);
@@ -68,14 +69,13 @@ public class MemoryUsageConcurrencyTest {
         memUsage.setLimit(1000);
         memUsage.start();
 
-        Thread addThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
+        try {
+            final Thread addThread = new Thread(() -> {
                 try {
                     startLatch.await();
 
                     while (true) {
-                        Integer add = toAdd.poll(1, TimeUnit.MILLISECONDS);
+                        final Integer add = toAdd.poll(1, 
TimeUnit.MILLISECONDS);
                         if (add == null) {
                             if (!running.get()) {
                                 break;
@@ -89,17 +89,14 @@ public class MemoryUsageConcurrencyTest {
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
-            }
-        });
+            });
 
-        Thread removeThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
+            final Thread removeThread = new Thread(() -> {
                 try {
                     startLatch.await();
 
                     while (true) {
-                        Integer remove = toRemove.poll(1, 
TimeUnit.MILLISECONDS);
+                        final Integer remove = toRemove.poll(1, 
TimeUnit.MILLISECONDS);
                         if (remove == null) {
                             if (!running.get()) {
                                 break;
@@ -112,65 +109,74 @@ public class MemoryUsageConcurrencyTest {
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
-            }
-        });
+            });
 
-        Thread waitForSpaceThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
+            // Use waitForSpace(timeout) instead of unbounded waitForSpace() 
to avoid
+            // indefinite blocking when usage is >= 100%. The bounded version 
will return
+            // after the timeout, allowing the thread to check the running 
flag and exit.
+            final Thread waitForSpaceThread = new Thread(() -> {
                 try {
                     startLatch.await();
 
                     while (running.get()) {
-                        memUsage.waitForSpace();
+                        memUsage.waitForSpace(100);
                     }
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
+            });
+
+            // Mark all threads as daemon so they cannot prevent JVM shutdown
+            // even if cleanup logic fails to stop them
+            addThread.setDaemon(true);
+            removeThread.setDaemon(true);
+            waitForSpaceThread.setDaemon(true);
+
+            removeThread.start();
+            addThread.start();
+            if (useWaitForSpaceThread) {
+                waitForSpaceThread.start();
             }
-        });
 
-        removeThread.start();
-        addThread.start();
-        if (useWaitForSpaceThread) {
-            waitForSpaceThread.start();
-        }
+            final Random r = new Random(seed);
 
-        Random r = new Random(seed);
+            startLatch.countDown();
 
-        startLatch.countDown();
+            for (int i = 0; i < operations; i++) {
+                toAdd.add(r.nextInt(100) + 1);
+            }
 
-        for (int i = 0; i < operations; i++) {
-            toAdd.add(r.nextInt(100) + 1);
-        }
+            // we expect the failure percentage to be related to the last 
operation
+            final List<Integer> ops = new ArrayList<>(operations);
+            for (int i = 0; i < operations; i++) {
+                final Integer op = removed.poll(1000, TimeUnit.MILLISECONDS);
+                assertNotNull(op);
+                ops.add(op);
+            }
 
-        // we expect the failure percentage to be related to the last operation
-        List<Integer> ops = new ArrayList<Integer>(operations);
-        for (int i = 0; i < operations; i++) {
-            Integer op = removed.poll(1000, TimeUnit.MILLISECONDS);
-            assertNotNull(op);
-            ops.add(op);
-        }
+            running.set(false);
 
-        running.set(false);
+            addThread.join(5000);
+            removeThread.join(5000);
 
-        if (useWaitForSpaceThread) {
-            try {
-                waitForSpaceThread.join(1000);
-            } catch (InterruptedException e) {
-                LOG.debug("Attempt: {} : {} waitForSpace never returned", 
attempt, memUsage);
-                waitForSpaceThread.interrupt();
-                waitForSpaceThread.join();
+            if (useWaitForSpaceThread) {
+                waitForSpaceThread.join(5000);
+                if (waitForSpaceThread.isAlive()) {
+                    LOG.debug("Attempt: {} : {} waitForSpace thread still 
alive after join, interrupting", attempt, memUsage);
+                    waitForSpaceThread.interrupt();
+                    waitForSpaceThread.join(1000);
+                }
             }
-        }
 
-        removeThread.join();
-        addThread.join();
-
-        if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() != 
memUsage.getPercentUsage()) {
-            LOG.debug("Attempt: {} : {}", attempt, memUsage);
-            LOG.debug("Operations: {}", ops);
-            assertEquals(0, memUsage.getPercentUsage());
+            if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() != 
memUsage.getPercentUsage()) {
+                LOG.debug("Attempt: {} : {}", attempt, memUsage);
+                LOG.debug("Operations: {}", ops);
+                assertEquals(0, memUsage.getPercentUsage());
+            }
+        } finally {
+            // Stop the MemoryUsage to signal waitForSpaceCondition, which 
unblocks
+            // any thread stuck in waitForSpace(). This is critical for 
cleanup.
+            memUsage.stop();
         }
     }
 }
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 6729a968d7..2f22505c33 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1647,8 +1647,8 @@ public class MQTTTest extends MQTTTestSupport {
             payload[i] = '2';
         }
 
-        int numberOfRuns = 10;
-        int messagesPerRun = 2;
+        final int numberOfRuns = 10;
+        final int messagesPerRun = 2;
 
         final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
         final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
@@ -1659,16 +1659,21 @@ public class MQTTTest extends MQTTTestSupport {
         BlockingConnection connectionSub = mqttSub.blockingConnection();
         connectionSub.connect();
 
-        Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
+        final Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
         connectionSub.subscribe(topics);
 
+        // Wait for subscription to become active before publishing
+        assertTrue("Subscription should become active",
+                Wait.waitFor(() -> isSubscriptionActive(topics[0], 
mqttSub.getClientId().toString()),
+                        TimeUnit.SECONDS.toMillis(5), 100));
+
         for (int i = 0; i < messagesPerRun; ++i) {
             connectionPub.publish(topics[0].name().toString(), payload, 
QoS.AT_LEAST_ONCE, false);
         }
 
         int received = 0;
         for (int i = 0; i < messagesPerRun; ++i) {
-            Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+            final Message message = connectionSub.receive(5, TimeUnit.SECONDS);
             assertNotNull(message);
             received++;
             assertTrue(Arrays.equals(payload, message.getPayload()));
@@ -1717,7 +1722,7 @@ public class MQTTTest extends MQTTTestSupport {
 
     private boolean isSubscriptionInactive(Topic topic, String clientId) 
throws Exception {
         if (isVirtualTopicSubscriptionStrategy()) {
-            String queueName = buildVirtualTopicQueueName(topic, clientId);
+            final String queueName = buildVirtualTopicQueueName(topic, 
clientId);
             try {
                 return getProxyToQueue(queueName).getConsumerCount() == 0;
             } catch (Exception ignore) {
@@ -1738,7 +1743,7 @@ public class MQTTTest extends MQTTTestSupport {
                 return false;
             }
         } else {
-            return 
brokerService.getAdminView().getDurableTopicSubscribers().length == 1 &&
+            return 
brokerService.getAdminView().getDurableTopicSubscribers().length >= 1 &&
                    
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
         }
     }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 723c854027..0265229fb4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -20,12 +20,14 @@ import java.io.File;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import jakarta.jms.Connection;
 import jakarta.jms.MessageConsumer;
 import jakarta.jms.Session;
 
 import junit.framework.AssertionFailedError;
+import junit.framework.Test;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
@@ -35,9 +37,6 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-
-import junit.framework.Test;
 
 /**
  * Test to make sure durable subscriptions propagate properly throughout 
network bridges
@@ -55,8 +54,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
     protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName,
                                              boolean dynamicOnly, int 
networkTTL) throws Exception {
-        NetworkConnector connector = super.bridgeBrokers(localBrokerName, 
remoteBrokerName);
-        ArrayList<ActiveMQDestination> includedDestinations = new 
ArrayList<>();
+        final NetworkConnector connector = 
super.bridgeBrokers(localBrokerName, remoteBrokerName);
+        final ArrayList<ActiveMQDestination> includedDestinations = new 
ArrayList<>();
         includedDestinations.add(new 
ActiveMQTopic("TEST.FOO?forceDurable=true"));
         connector.setDynamicallyIncludedDestinations(includedDestinations);
         connector.setDuplex(duplex);
@@ -79,17 +78,18 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         bridgeBrokers("Broker_D_D", "Broker_E_E");
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
         // Setup consumers
         Connection conn = brokers.get("Broker_A_A").factory.createConnection();
         conn.setClientID("clientId1");
         conn.start();
         Session ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientA2 = ses.createDurableSubscriber(dest, "subA2");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
+        final MessageConsumer clientA2 = ses.createDurableSubscriber(dest, 
"subA2");
 
         // let consumers propagate around the network
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
@@ -104,7 +104,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         conn2.start();
         Session ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
-        MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, "subE2");
+        final MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, 
"subE2");
 
         // let consumers propagate around the network
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
@@ -120,18 +120,19 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
         this.destroyAllBrokers();
         deletePersistentMessagesOnStartup = false;
-        String options = new String("?persistent=true&useJmx=false");
-        createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + 
options));
+        final String options = "?persistent=true&useJmx=false";
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_A_A" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_B_B" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_C_C" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_D_D" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:0)/Broker_E_E" + 
options));
         bridgeBrokers("Broker_A_A", "Broker_B_B");
         bridgeBrokers("Broker_B_B", "Broker_C_C");
         bridgeBrokers("Broker_C_C", "Broker_D_D");
         bridgeBrokers("Broker_D_D", "Broker_E_E");
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         conn = brokers.get("Broker_A_A").factory.createConnection();
         conn.setClientID("clientId1");
@@ -180,14 +181,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         }
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+        final Session ses = createSession("Broker_A_A");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
+        final MessageConsumer clientB = ses.createDurableSubscriber(dest, 
"subB");
 
         // let consumers propagate around the network
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
@@ -199,8 +201,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         assertNotNull(clientB.receive(1000));
 
         //bring online a consumer on the other side
-        Session ses2 = createSession("Broker_C_C");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        final Session ses2 = createSession("Broker_C_C");
+        final MessageConsumer clientC = ses2.createDurableSubscriber(dest, 
"subC");
         //there will be 2 network durables, 1 for each direction of the bridge
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
@@ -216,7 +218,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
-
     }
 
     public void testDurablePropagationConsumerAllBrokersDuplex() throws 
Exception {
@@ -239,13 +240,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         }
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        final Session ses = createSession("Broker_A_A");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
 
         // let consumers propagate around the network
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
@@ -253,21 +255,20 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
         //bring online a consumer on the other side
-        Session ses2 = createSession("Broker_B_B");
-        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+        final Session ses2 = createSession("Broker_B_B");
+        final MessageConsumer clientB = ses2.createDurableSubscriber(dest, 
"subB");
 
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
-        Session ses3 = createSession("Broker_C_C");
-        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+        final Session ses3 = createSession("Broker_C_C");
+        final MessageConsumer clientC = ses3.createDurableSubscriber(dest, 
"subC");
 
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
 
-
         clientA.close();
         clientB.close();
         clientC.close();
@@ -275,11 +276,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         ses2.unsubscribe("subB");
         ses3.unsubscribe("subC");
 
-
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
-
     }
 
     public void testDurablePropagation5BrokerDuplex() throws Exception {
@@ -306,15 +305,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         }
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        final Session ses = createSession("Broker_A_A");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
 
-        // let consumers propagate around the network 
(assertNCDurableSubsCount waits internally)
+        // let consumers propagate around the network
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
@@ -325,10 +325,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         assertNotNull(clientA.receive(1000));
 
         //bring online a consumer on the other side
-        Session ses2 = createSession("Broker_E_E");
-        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        final Session ses2 = createSession("Broker_E_E");
+        final MessageConsumer clientE = ses2.createDurableSubscriber(dest, 
"subE");
 
-        //there will be 2 network durables, 1 for each direction of the bridge 
(assertNCDurableSubsCount waits internally)
+        //there will be 2 network durables, 1 for each direction of the bridge
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
@@ -345,7 +345,6 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
-
     }
 
     public void testDurablePropagationSpokeDuplex() throws Exception {
@@ -370,26 +369,27 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         }
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        Session ses2 = createSession("Broker_B_B");
-        Session ses3 = createSession("Broker_C_C");
-        Session ses4 = createSession("Broker_D_D");
+        final Session ses = createSession("Broker_A_A");
+        final Session ses2 = createSession("Broker_B_B");
+        final Session ses3 = createSession("Broker_C_C");
+        final Session ses4 = createSession("Broker_D_D");
 
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
+        final MessageConsumer clientAB = ses.createDurableSubscriber(dest, 
"subAB");
 
-        // let consumers propagate around the network 
(assertNCDurableSubsCount waits internally)
+        // let consumers propagate around the network
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
 
-        MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
+        final MessageConsumer clientD = ses4.createDurableSubscriber(dest, 
"subD");
 
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
@@ -401,10 +401,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         sendMessages("Broker_C_C", dest, 1);
         assertNotNull(clientD.receive(1000));
 
-        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
-        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
-        Thread.sleep(1000);
+        final MessageConsumer clientB = ses2.createDurableSubscriber(dest, 
"subB");
+        final MessageConsumer clientC = ses3.createDurableSubscriber(dest, 
"subC");
 
+        // let consumers propagate around the network (Wait.waitFor polls 
internally)
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 3);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
@@ -448,16 +448,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         }
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        MessageConsumer clientA = ses.createConsumer(dest);
-        Thread.sleep(1000);
+        final Session ses = createSession("Broker_A_A");
+        final MessageConsumer clientA = ses.createConsumer(dest);
 
-        // let consumers propagate around the network
+        // let consumers propagate around the network (Wait.waitFor polls 
internally)
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
@@ -465,10 +465,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         sendMessages("Broker_C_C", dest, 1);
         assertNotNull(clientA.receive(1000));
 
-        Session ses2 = createSession("Broker_C_C");
-        MessageConsumer clientC = ses2.createConsumer(dest);
-        Thread.sleep(1000);
+        final Session ses2 = createSession("Broker_C_C");
+        final MessageConsumer clientC = ses2.createConsumer(dest);
 
+        // let consumers propagate around the network (Wait.waitFor polls 
internally)
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
@@ -493,8 +493,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
     protected void testDurablePropagationSync() throws Exception {
         // Setup broker networks
-        NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B");
-        NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
+        final NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+        final NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
 
         NetworkConnector nc3 = null;
         NetworkConnector nc4 = null;
@@ -514,16 +514,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         }
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
-        // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        Session ses2 = createSession("Broker_C_C");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-        Thread.sleep(1000);
+        // Setup consumers -- no bridges are running so no NC subs should be 
created
+        final Session ses = createSession("Broker_A_A");
+        final Session ses2 = createSession("Broker_C_C");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
+        final MessageConsumer clientB = ses.createDurableSubscriber(dest, 
"subB");
+        final MessageConsumer clientC = ses2.createDurableSubscriber(dest, 
"subC");
 
+        // No bridges running, so no NC durable subs should exist
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
@@ -553,31 +553,31 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         bridgeBrokers("Broker_B_B", "Broker_C_C");
 
         //Duplicate the bridges with different included destinations - valid 
use case
-        NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B");
-        NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C");
+        final NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B");
+        final NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C");
         nc3.setName("nc_3_3");
         nc4.setName("nc_4_4");
-        ArrayList<ActiveMQDestination> includedDestinations = new 
ArrayList<>();
+        final ArrayList<ActiveMQDestination> includedDestinations = new 
ArrayList<>();
         includedDestinations.add(new 
ActiveMQTopic("TEST.FOO2?forceDurable=true"));
         nc3.setDynamicallyIncludedDestinations(includedDestinations);
         nc4.setDynamicallyIncludedDestinations(includedDestinations);
 
         startAllBrokers();
+        waitForBridgeFormation();
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
-        ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
+        final ActiveMQTopic dest2 = (ActiveMQTopic) 
createDestination("TEST.FOO2", true);
 
         // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        Session ses2 = createSession("Broker_C_C");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-        MessageConsumer clientCc = ses2.createDurableSubscriber(dest2, 
"subCc");
-        Thread.sleep(1000);
-
-        //make sure network durables are online
+        final Session ses = createSession("Broker_A_A");
+        final Session ses2 = createSession("Broker_C_C");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
+        final MessageConsumer clientAa = ses.createDurableSubscriber(dest2, 
"subAa");
+        final MessageConsumer clientC = ses2.createDurableSubscriber(dest, 
"subC");
+        final MessageConsumer clientCc = ses2.createDurableSubscriber(dest2, 
"subCc");
+
+        //make sure network durables are online (Wait.waitFor polls internally)
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
@@ -729,32 +729,32 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         testDurablePropagation(-1, true, List.of(1, 2, 2, 2, 1));
     }
 
-    private void testDurablePropagation(int ttl, boolean dynamicOnly,
-                                        List<Integer> expected) throws 
Exception {
+    private void testDurablePropagation(final int ttl, final boolean 
dynamicOnly,
+                                        final List<Integer> expected) throws 
Exception {
         testDurablePropagation(ttl, dynamicOnly, false, expected);
     }
 
-    private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean 
restartBrokers,
-                                         List<Integer> expected) throws 
Exception {
+    private void testDurablePropagation(final int ttl, final boolean 
dynamicOnly, final boolean restartBrokers,
+                                         final List<Integer> expected) throws 
Exception {
         duplex = true;
 
         // Setup broker networks
-        NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B", 
dynamicOnly, ttl);
-        NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C", 
dynamicOnly, ttl);
-        NetworkConnector nc3 = bridgeBrokers("Broker_C_C", "Broker_D_D", 
dynamicOnly, ttl);
-        NetworkConnector nc4 = bridgeBrokers("Broker_D_D", "Broker_E_E", 
dynamicOnly, ttl);
+        final NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B", 
dynamicOnly, ttl);
+        final NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C", 
dynamicOnly, ttl);
+        final NetworkConnector nc3 = bridgeBrokers("Broker_C_C", "Broker_D_D", 
dynamicOnly, ttl);
+        final NetworkConnector nc4 = bridgeBrokers("Broker_D_D", "Broker_E_E", 
dynamicOnly, ttl);
 
         startAllBrokers();
         stopNetworkConnectors(nc1, nc2, nc3, nc4);
 
         // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        final ActiveMQTopic dest = (ActiveMQTopic) 
createDestination("TEST.FOO", true);
 
         // Setup consumers
-        Session ses = createSession("Broker_A_A");
-        Session ses2 = createSession("Broker_E_E");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        final Session ses = createSession("Broker_A_A");
+        final Session ses2 = createSession("Broker_E_E");
+        final MessageConsumer clientA = ses.createDurableSubscriber(dest, 
"subA");
+        final MessageConsumer clientE = ses2.createDurableSubscriber(dest, 
"subE");
 
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
@@ -764,7 +764,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
         startNetworkConnectors(nc1, nc2, nc3, nc4);
 
-        // Check that the correct network durables exist 
(assertNCDurableSubsCount waits internally)
+        // Check that the correct network durables exist
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 
expected.get(0));
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 
expected.get(1));
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 
expected.get(2));
@@ -784,6 +784,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
             bridgeBrokers("Broker_C_C", "Broker_D_D", dynamicOnly, ttl);
             bridgeBrokers("Broker_D_D", "Broker_E_E", dynamicOnly, ttl);
             startAllBrokers();
+            waitForBridgeFormation();
         } else {
             // restart just the network connectors but leave the consumers 
online
             // to test sync works ok. Things should work for all cases both 
dynamicOnly
@@ -792,7 +793,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
             startNetworkConnectors(nc1, nc2, nc3, nc4);
         }
 
-        // after restarting the bridges, check sync/demand are correct 
(assertNCDurableSubsCount waits internally)
+        // after restarting the bridges, check sync/demand are correct
         assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 
expected.get(0));
         assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 
expected.get(1));
         assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 
expected.get(2));
@@ -802,28 +803,27 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
     protected void assertNCDurableSubsCount(final BrokerService brokerService, 
final ActiveMQTopic dest,
             final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getNCDurableSubs(brokerService, dest).size();
-            }
-        }, 10000, 500));
+        assertTrue("Expected " + count + " NC durable sub(s) on " + 
brokerService.getBrokerName()
+                + " for " + dest.getTopicName() + ", but got "
+                + getNCDurableSubs(brokerService, dest).size(),
+            Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
+                TimeUnit.SECONDS.toMillis(30), 500));
     }
 
     protected List<DurableTopicSubscription> getNCDurableSubs(final 
BrokerService brokerService,
             final ActiveMQTopic dest) throws Exception {
-        List<DurableTopicSubscription> subs = new ArrayList<>();
-        Destination d = brokerService.getDestination(dest);
-        org.apache.activemq.broker.region.Topic destination = null;
+        final List<DurableTopicSubscription> subs = new ArrayList<>();
+        final Destination d = brokerService.getDestination(dest);
+        final org.apache.activemq.broker.region.Topic destination;
         if (d instanceof DestinationFilter) {
             destination = ((DestinationFilter) 
d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
         } else {
             destination = (org.apache.activemq.broker.region.Topic) d;
         }
 
-        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) 
{
+        for (final SubscriptionKey key : 
destination.getDurableTopicSubs().keySet()) {
             if 
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
 {
-                DurableTopicSubscription sub = 
destination.getDurableTopicSubs().get(key);
+                final DurableTopicSubscription sub = 
destination.getDurableTopicSubs().get(key);
                 if (sub != null && sub.isActive()) {
                     subs.add(sub);
                 }
@@ -855,19 +855,19 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
     }
 
     protected void startNetworkConnectors(NetworkConnector... connectors) 
throws Exception {
-        for (NetworkConnector connector : connectors) {
+        for (final NetworkConnector connector : connectors) {
             connector.start();
         }
     }
 
     protected void stopNetworkConnectors(NetworkConnector... connectors) 
throws Exception {
-        for (NetworkConnector connector : connectors) {
+        for (final NetworkConnector connector : connectors) {
             connector.stop();
         }
     }
 
     protected Session createSession(String broker) throws Exception {
-        Connection con = createConnection(broker);
+        final Connection con = createConnection(broker);
         con.start();
         return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
index 36069630ea..8d7339bc42 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
@@ -202,16 +202,23 @@ public abstract class 
AbstractCachedLDAPAuthorizationMapLegacyTest extends Abstr
 
         reader.close();
 
-        assertTrue("did not get expected size after remove", Wait.waitFor(new 
Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return map.getReadACLs(new ActiveMQQueue("TEST.FOO")).size() 
== 0;
-            }
+        assertTrue("did not get expected size after remove", Wait.waitFor(
+                () -> map.getReadACLs(new ActiveMQQueue("TEST.FOO")).size() == 
0));
+
+        // Temp destination ACLs are removed by a separate LDAP listener
+        // (on the Temp subtree), so events may arrive after the Queue events.
+        assertTrue("Temp read ACLs not cleared after remove", Wait.waitFor(() 
-> {
+            final Set<?> acls = map.getTempDestinationReadACLs();
+            return acls == null || acls.isEmpty();
+        }));
+        assertTrue("Temp write ACLs not cleared after remove", Wait.waitFor(() 
-> {
+            final Set<?> acls = map.getTempDestinationWriteACLs();
+            return acls == null || acls.isEmpty();
+        }));
+        assertTrue("Temp admin ACLs not cleared after remove", Wait.waitFor(() 
-> {
+            final Set<?> acls = map.getTempDestinationAdminACLs();
+            return acls == null || acls.isEmpty();
         }));
-
-        assertTrue(map.getTempDestinationReadACLs() == null || 
map.getTempDestinationReadACLs().isEmpty());
-        assertTrue(map.getTempDestinationWriteACLs() == null || 
map.getTempDestinationWriteACLs().isEmpty());
-        assertTrue(map.getTempDestinationAdminACLs() == null || 
map.getTempDestinationAdminACLs().isEmpty());
     }
 
     @Test
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
index 269a2ee056..215947280d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleAuthenticationPluginTest.java
@@ -43,6 +43,7 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.experimental.categories.Category;
 import org.apache.activemq.test.annotations.ParallelTest;
 
@@ -111,11 +112,12 @@ public class SimpleAuthenticationPluginTest extends 
SecurityTestSupport {
 
     public void testConnectionStartThrowsJMSSecurityException() throws 
Exception {
         final CountDownLatch exceptionLatch = new CountDownLatch(1);
+        final AtomicReference<JMSException> receivedException = new 
AtomicReference<>();
 
         try (final Connection connection = factory.createConnection("badUser", 
"password")) {
             connection.setExceptionListener(e -> {
                 LOG.info("Connection received exception: {}", e.getMessage());
-                assertTrue(e instanceof JMSSecurityException);
+                receivedException.set(e);
                 exceptionLatch.countDown();
             });
 
@@ -123,14 +125,13 @@ public class SimpleAuthenticationPluginTest extends 
SecurityTestSupport {
                 connection.start();
 
                 // If start() doesn't throw synchronously, wait for async 
exception
-                assertTrue("Should receive security exception via listener", 
exceptionLatch.await(5, TimeUnit.SECONDS));
-
+                assertTrue("Should receive security exception via listener",
+                    exceptionLatch.await(5, TimeUnit.SECONDS));
+                assertNotNull("Exception should have been received", 
receivedException.get());
+                assertTrue("Should be JMSSecurityException but was: " + 
receivedException.get().getClass(),
+                    receivedException.get() instanceof JMSSecurityException);
             } catch (final JMSSecurityException jmsEx) {
                 // Synchronous security exception - expected
-            } catch (final JMSException e) {
-                // with the latch, we should always pass first into the 
listener and assert the right exception
-                LOG.info("Expected JMSSecurityException but was: {}", 
e.getClass());
-                fail("Should throw JMSSecurityException");
             }
         }
     }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
index e857de100e..9c49e19d2e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
@@ -558,9 +558,10 @@ public class VMTransportThreadSafeTest {
     @Test(timeout=60000)
     public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
         // In the async case the iterate method should see that we are 
stopping and
-        // drop out before we dispatch all the messages but it should get at 
least 49 since
-        // the stop thread waits 500 mills and the listener is waiting 10 
mills on each receive.
-        doTestStopWhileStartingWithNoAsyncLimit(true, 49);
+        // drop out before we dispatch all the messages. We wait until the 
TaskRunner
+        // has started processing (at least 1 message received), then stop 
mid-stream.
+        // Some messages should be received but not all 100.
+        doTestStopWhileStartingWithNoAsyncLimit(true, 1);
     }
 
     @Test(timeout=60000)
@@ -569,7 +570,7 @@ public class VMTransportThreadSafeTest {
         doTestStopWhileStartingWithNoAsyncLimit(false, 100);
     }
 
-    private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final 
int expect) throws Exception {
+    private void doTestStopWhileStartingWithNoAsyncLimit(final boolean async, 
final int expect) throws Exception {
 
         final VMTransport local = new VMTransport(new URI(location1));
         final VMTransport remote = new VMTransport(new URI(location2));
@@ -588,39 +589,31 @@ public class VMTransportThreadSafeTest {
             local.oneway(new DummyCommand(i));
         }
 
-        Thread t = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    Thread.sleep(1000);
-                    remote.stop();
-                } catch (Exception e) {
-                }
-            }
-        });
-
         remote.start();
 
-        t.start();
+        if (async) {
+            // Wait until the TaskRunner has actually started processing 
messages
+            // before stopping, so we don't race and stop before any delivery 
occurs.
+            assertTrue("Remote should start receiving messages",
+                Wait.waitFor(() -> remoteReceived.size() > 0, 5000, 10));
 
-        assertTrue("Remote should receive: " + expect + ", commands but got: " 
+ remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return remoteReceived.size() >= expect;
-            }
-        }));
+            // Now stop mid-stream - some messages have been delivered, 
transport should dispose cleanly
+            remote.stop();
+        } else {
+            // Non-async: start() dispatches all messages synchronously, so 
they are
+            // already received. Just stop normally.
+            remote.stop();
+        }
+
+        assertTrue("Remote should receive at least " + expect + " commands but 
got: " + remoteReceived.size(),
+            Wait.waitFor(() -> remoteReceived.size() >= expect));
 
         LOG.debug("Remote listener received " + remoteReceived.size() + " 
messages");
 
         local.stop();
 
-        assertTrue("Remote transport never was disposed.", Wait.waitFor(new 
Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return remote.isDisposed();
-            }
-        }));
+        assertTrue("Remote transport never was disposed.",
+            Wait.waitFor(() -> remote.isDisposed()));
     }
 
     @Test(timeout=120000)
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
index ec57b22303..afe382934c 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
@@ -34,6 +34,7 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 /**
@@ -63,20 +64,31 @@ public class AMQ6366Test extends 
JmsMultipleBrokersTestSupport {
         testNonDurableReceiveThrougRestart("BrokerB", "BrokerA");
     }
 
-    protected void testNonDurableReceiveThrougRestart(String pubBroker, String 
conBroker) throws Exception {
-        NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", 
"BrokerB");
+    protected void testNonDurableReceiveThrougRestart(final String pubBroker, 
final String conBroker) throws Exception {
+        final NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", 
"BrokerB");
 
         startAllBrokers();
         waitForBridgeFormation();
 
-        MessageConsumer client = createDurableSubscriber(conBroker, dest, 
"sub1");
+        final MessageConsumer client = createDurableSubscriber(conBroker, 
dest, "sub1");
         client.close();
 
-        Thread.sleep(1000);
+        // Wait for the durable subscription to become inactive after closing 
the consumer
+        final Topic conBrokerDest = (Topic) 
brokers.get(conBroker).broker.getDestination(dest);
+        assertTrue("Durable sub should become inactive after close",
+                Wait.waitFor(() -> {
+                    final DurableTopicSubscription[] subs = 
conBrokerDest.getDurableTopicSubs()
+                            .values().toArray(new DurableTopicSubscription[0]);
+                    return subs.length > 0 && !subs[0].isActive();
+                }, 5000, 100));
+
         networkConnector.stop();
-        Thread.sleep(1000);
 
-        Set<ActiveMQDestination> durableDests = new HashSet<>();
+        // Wait for the network connector to fully stop
+        assertTrue("Network connector should stop",
+                Wait.waitFor(networkConnector::isStopped, 5000, 100));
+
+        final Set<ActiveMQDestination> durableDests = new HashSet<>();
         durableDests.add(dest);
         //Normally set on broker start from the persistence layer but
         //simulate here since we just stopped and started the network connector
@@ -87,14 +99,15 @@ public class AMQ6366Test extends 
JmsMultipleBrokersTestSupport {
 
         // Send messages
         sendMessages(pubBroker, dest, 1);
-        Thread.sleep(1000);
-
-        Topic destination = (Topic) 
brokers.get(conBroker).broker.getDestination(dest);
-        DurableTopicSubscription sub = destination.getDurableTopicSubs().
-                values().toArray(new DurableTopicSubscription[0])[0];
 
-        //Assert that the message made it to the other broker
-        assertEquals(1, 
sub.getSubscriptionStatistics().getEnqueues().getCount());
+        // Wait for the message to be enqueued through the network bridge
+        final Topic destination = (Topic) 
brokers.get(conBroker).broker.getDestination(dest);
+        assertTrue("Message should be enqueued to durable subscription",
+                Wait.waitFor(() -> {
+                    final DurableTopicSubscription sub = 
destination.getDurableTopicSubs()
+                            .values().toArray(new 
DurableTopicSubscription[0])[0];
+                    return 
sub.getSubscriptionStatistics().getEnqueues().getCount() == 1;
+                }, 10000, 100));
     }
 
     @Override
@@ -103,16 +116,15 @@ public class AMQ6366Test extends 
JmsMultipleBrokersTestSupport {
         broker.setAdvisorySupport(true);
     }
 
-    protected NetworkConnector bridgeBrokerPair(String localBrokerName, String 
remoteBrokerName) throws Exception {
-        BrokerService localBroker = brokers.get(localBrokerName).broker;
-        BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
+    protected NetworkConnector bridgeBrokerPair(final String localBrokerName, 
final String remoteBrokerName) throws Exception {
+        final BrokerService localBroker = brokers.get(localBrokerName).broker;
+        final BrokerService remoteBroker = 
brokers.get(remoteBrokerName).broker;
 
-        List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
-        URI remoteURI;
+        final List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
         if (!transportConnectors.isEmpty()) {
-            remoteURI = transportConnectors.get(0).getConnectUri();
-            String uri = "static:(" + remoteURI + ")";
-            NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI(uri));
+            final URI remoteURI = transportConnectors.get(0).getConnectUri();
+            final String uri = "static:(" + remoteURI + ")";
+            final NetworkConnector connector = new 
DiscoveryNetworkConnector(new URI(uri));
             connector.setDynamicOnly(false); // so matching durable subs are 
loaded on start
             connector.setStaticBridge(false);
             connector.setDuplex(true);
@@ -126,7 +138,7 @@ public class AMQ6366Test extends 
JmsMultipleBrokersTestSupport {
 
     @Override
     public void setUp() throws Exception {
-        File dataDir = new File(IOHelper.getDefaultDataDirectory());
+        final File dataDir = new File(IOHelper.getDefaultDataDirectory());
         LOG.info("Delete dataDir.." + dataDir.getCanonicalPath());
         org.apache.activemq.TestSupport.recursiveDelete(dataDir);
         super.setAutoFail(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to