This is an automated email from the ASF dual-hosted git repository.

jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d51a293f5 test(usecases): various fixes for flaky tests (#1704)
8d51a293f5 is described below

commit 8d51a293f5b374e6d685f97f493d7627f3915c1b
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Feb 20 17:03:53 2026 +0100

    test(usecases): various fixes for flaky tests (#1704)
    
    * Enhance QueueZeroPrefetchLazyDispatchPriorityTest with improved message 
enqueuing checks and logging consistency
    
    * Refactor ConnectionFailureEvictsFromPoolTest and 
BrokerNetworkWithStuckMessagesTest for improved readability and consistency in 
message assertions
    
    * Update assertDeqInflight method to use >= for dequeues and inflight 
checks, enhancing accuracy in advisory message assertions
    
    * Add wait for broker to process connection drop in 
RedeliveryRestartWithExceptionTest
    
    * Fix handling of IllegalStateException in AMQ3166Test to prevent 
transaction hang during async sends
    
    * fix: avoid test hanging on Windows
    
    * test(usecases): fix flaky advisory propagation waits in 
TwoBrokerVirtualTopicSelectorAwareForwardingTest
---
 .../JournalCorruptionEofIndexRecoveryTest.java     | 12 ++--
 .../pool/ConnectionFailureEvictsFromPoolTest.java  |  6 +-
 .../broker/RedeliveryRestartWithExceptionTest.java | 11 +++
 .../java/org/apache/activemq/bugs/AMQ3166Test.java | 33 +++++----
 .../BrokerNetworkWithStuckMessagesTest.java        | 62 +++++------------
 .../activemq/usecases/AdvisoryViaNetworkTest.java  | 30 ++++-----
 .../QueueZeroPrefetchLazyDispatchPriorityTest.java | 51 +++++++++-----
 ...kerVirtualTopicSelectorAwareForwardingTest.java | 78 ++++++----------------
 8 files changed, 129 insertions(+), 154 deletions(-)

diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index c3b9fc2046..a49d37c916 100644
--- 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -100,9 +100,9 @@ public class JournalCorruptionEofIndexRecoveryTest {
         }
 
         if (whackIndex) {
-            File indexToDelete = new File(brokerDataDir, "db.data");
+            final File indexToDelete = new File(brokerDataDir, "db.data");
             LOG.info("Whacking index: " + indexToDelete);
-            indexToDelete.delete();
+            IOHelper.deleteFileNonBlocking(indexToDelete);
         }
 
         doStartBroker(false, forceRecoverIndex);
@@ -219,14 +219,15 @@ public class JournalCorruptionEofIndexRecoveryTest {
         broker.getPersistenceAdapter().checkpoint(true);
         Location location = ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getMetadata().producerSequenceIdTrackerLocation;
 
-        DataFile dataFile = ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
-        RecoverableRandomAccessFile randomAccessFile = 
dataFile.openRandomAccessFile();
+        final DataFile dataFile = ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
+        final RecoverableRandomAccessFile randomAccessFile = 
dataFile.openRandomAccessFile();
         randomAccessFile.seek(location.getOffset());
         // Use an invalid size well past the end of the data file to trigger 
corruption handling without large allocation.
-        int bogusSize = ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal()
+        final int bogusSize = ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal()
                .getFileMap().get(location.getDataFileId()).getLength() * 10;
         randomAccessFile.writeInt(bogusSize);
         randomAccessFile.getChannel().force(true);
+        dataFile.closeRandomAccessFile(randomAccessFile);
 
         ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal().close();
         try {
@@ -437,6 +438,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
         randomAccessFile.writeInt(4 * 1024 * 1024);
         randomAccessFile.writeLong(0l);
         randomAccessFile.getChannel().force(true);
+        dataFile.closeRandomAccessFile(randomAccessFile);
     }
 
     private void corruptOrderIndex(final int num, final int size) throws 
Exception {
diff --git 
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
 
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
index 12c46be421..596eb00fe3 100644
--- 
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
+++ 
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
@@ -116,8 +116,12 @@ public class ConnectionFailureEvictsFromPoolTest extends 
TestSupport {
                 TestCase.fail("Expected Error");
             } catch (JMSException e) {
             }
+            // Wait for async exception event BEFORE the try-with-resources 
closes the connection.
+            // ActiveMQConnection.onException() fires TransportListener 
callbacks via executeAsync(),
+            // so the callback runs in a separate thread. If we wait after 
connection.close(), the
+            // async executor may already be shut down and the callback never 
fires.
+            TestCase.assertTrue("exception event propagated ok", 
gotExceptionEvent.await(15, TimeUnit.SECONDS));
         }
-        TestCase.assertTrue("exception event propagated ok", 
gotExceptionEvent.await(15, TimeUnit.SECONDS));
         // If we get another connection now it should be a new connection that
         // works.
         LOG.info("expect new connection after failure");
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
index 7941f64703..36daff54e1 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
@@ -47,6 +47,7 @@ import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -241,6 +242,16 @@ public class RedeliveryRestartWithExceptionTest extends 
TestSupport {
 
         
connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new
 IOException("Die"));
 
+        // Wait for the broker to fully process the connection drop and return
+        // unacked messages to the queue. The onException triggers async 
cleanup
+        // via executeAsync(), so without this wait the new consumer may 
receive
+        // fresh messages (6-10) instead of the redelivered ones (1-5).
+        final ActiveMQQueue dest = new ActiveMQQueue(queueName);
+        assertTrue("unacked messages returned to queue", Wait.waitFor(() -> {
+            final org.apache.activemq.broker.region.Destination d = 
broker.getDestination(dest);
+            return d != null && 
d.getDestinationStatistics().getInflight().getCount() == 0;
+        }, 10000, 100));
+
         connection = (ActiveMQConnection) connectionFactory.createConnection();
         connection.start();
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
index 47d7753ce3..66d9a108a3 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
@@ -191,18 +191,25 @@ public class AMQ3166Test {
         ActiveMQMessageProducer producer = (ActiveMQMessageProducer) 
session.createProducer(session.createQueue("QAT"));
 
         for (int i=0; i<batchSize; i++) {
-            producer.send(session.createTextMessage("Hello A"), new 
AsyncCallback() {
-                @Override
-                public void onSuccess() {
-                    batchSent.countDown();
-                }
+            try {
+                producer.send(session.createTextMessage("Hello A"), new 
AsyncCallback() {
+                    @Override
+                    public void onSuccess() {
+                        batchSent.countDown();
+                    }
 
-                @Override
-                public void onException(JMSException e) {
-                    session.getTransactionContext().setRollbackOnly(true);
-                    batchSent.countDown();
-                }
-            });
+                    @Override
+                    public void onException(JMSException e) {
+                        session.getTransactionContext().setRollbackOnly(true);
+                        batchSent.countDown();
+                    }
+                });
+            } catch (jakarta.jms.IllegalStateException alreadyRolledBack) {
+                // Async error from an earlier send may have already marked 
the transaction
+                // rollback-only. Count down the latch so beforeEnd doesn't 
hang.
+                batchSent.countDown();
+                continue;
+            }
 
             if (i==0) {
                 // transaction context begun on first send
@@ -211,9 +218,9 @@ public class AMQ3166Test {
                     public void beforeEnd() throws Exception {
                         // await response to all sends in the batch
                         if (!batchSent.await(10, TimeUnit.SECONDS)) {
-                            LOG.error("TimedOut waiting for aync send 
requests!");
+                            LOG.error("TimedOut waiting for async send 
requests!");
                             
session.getTransactionContext().setRollbackOnly(true);
-                        };
+                        }
                         super.beforeEnd();
                     }
                 });
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
index 0a199ded33..cca1d6401b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
@@ -235,15 +235,8 @@ public class BrokerNetworkWithStuckMessagesTest {
         // Ensure that there are zero messages on the local broker. This tells
         // us that those messages have been prefetched to the remote broker
         // where the demand exists.
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                Object[] result = browseQueueWithJmx(localBroker);
-               return 0 == result.length;
-            }
-        });
-        messages = browseQueueWithJmx(localBroker);
-        assertEquals(0, messages.length);
+        assertTrue("local broker drained", Wait.waitFor(() ->
+                browseQueueWithJmx(localBroker).length == 0));
 
         // try and pull the messages from remote, should be denied b/c on 
networkTtl
         LOG.info("creating demand on second remote...");
@@ -270,15 +263,9 @@ public class BrokerNetworkWithStuckMessagesTest {
         connection2.send(connectionInfo2.createRemoveCommand());
 
         // There should now be 5 messages stuck on the remote broker
-        assertTrue("correct stuck message count", Wait.waitFor(new 
Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                Object[] result = browseQueueWithJmx(remoteBroker);
-                return 5 == result.length;
-            }
-        }));
+        assertTrue("correct stuck message count", Wait.waitFor(() ->
+                browseQueueWithJmx(remoteBroker).length == 5));
         messages = browseQueueWithJmx(remoteBroker);
-        assertEquals(5, messages.length);
 
         assertTrue("can see broker path property",
                 
((String)((CompositeData)messages[1]).get("BrokerPath")).contains(localBroker.getBroker().getBrokerId().toString()));
@@ -295,15 +282,15 @@ public class BrokerNetworkWithStuckMessagesTest {
         connection1.send(createAck(consumerInfo1, message1, 1, 
MessageAck.INDIVIDUAL_ACK_TYPE));
         LOG.info("acked one message on origin, waiting for all messages to 
percolate back");
 
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                Object[] result = browseQueueWithJmx(localBroker);
-               return 4 == result.length;
-            }
-        });
-        messages = browseQueueWithJmx(localBroker);
-        assertEquals(4, messages.length);
+        // Wait for ALL stuck messages to replay from remote back to local.
+        // Must check both brokers: local == 4 (5 replayed - 1 acked) AND 
remote == 0
+        // (replay complete). Without the remote check, the Wait can return 
when only
+        // 4 of 5 messages have arrived on local (transient match), then the 
5th arrives.
+        assertTrue("messages percolated back", Wait.waitFor(() -> {
+            final Object[] localResult = browseQueueWithJmx(localBroker);
+            final Object[] remoteResult = browseQueueWithJmx(remoteBroker);
+            return localResult.length == 4 && remoteResult.length == 0;
+        }, TimeUnit.SECONDS.toMillis(30), 100));
 
         LOG.info("checking for messages on remote again");
         // messages won't migrate back again till consumer closes
@@ -329,25 +316,10 @@ public class BrokerNetworkWithStuckMessagesTest {
         assertEquals(receiveNumMessages, counter);
 
         // verify all messages consumed
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                Object[] result = browseQueueWithJmx(remoteBroker);
-               return 0 == result.length;
-            }
-        });
-        messages = browseQueueWithJmx(remoteBroker);
-        assertEquals(0, messages.length);
-
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                Object[] result = browseQueueWithJmx(localBroker);
-               return 0 == result.length;
-            }
-        });
-        messages = browseQueueWithJmx(localBroker);
-        assertEquals(0, messages.length);
+        assertTrue("remote drained", Wait.waitFor(() ->
+                browseQueueWithJmx(remoteBroker).length == 0));
+        assertTrue("local drained", Wait.waitFor(() ->
+                browseQueueWithJmx(localBroker).length == 0));
 
         // Close the consumer on the remote broker
         connection2.send(consumerInfo3.createRemoveCommand());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index 091957a075..e865b35670 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -277,23 +277,23 @@ public class AdvisoryViaNetworkTest extends 
JmsMultipleBrokersTestSupport {
 
     private void assertDeqInflight(final int dequeue, final int inflight,
                                    final ActiveMQTopic... topics) throws 
Exception {
-        assertTrue("deq and inflight as expected", Wait.waitFor(new 
Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                long actualDeq = 0;
-                long actualInflight = 0;
-                for (ActiveMQTopic topic : topics) {
-                    ActiveMQTopic advisory = 
AdvisorySupport.getConsumerAdvisoryTopic(topic);
-                    Destination destination = 
brokers.get("A").broker.getDestination(advisory);
-                    if (destination != null) {
-                        actualDeq += 
destination.getDestinationStatistics().getDequeues().getCount();
-                        actualInflight += 
destination.getDestinationStatistics().getInflight().getCount();
-                    }
+        // Use >= instead of == because duplex bridges with statically 
included destinations
+        // may generate additional advisory messages from the bridge's own 
subscriptions,
+        // depending on subscription registration ordering.
+        assertTrue("deq and inflight as expected", Wait.waitFor(() -> {
+            long actualDeq = 0;
+            long actualInflight = 0;
+            for (final ActiveMQTopic topic : topics) {
+                final ActiveMQTopic advisory = 
AdvisorySupport.getConsumerAdvisoryTopic(topic);
+                final Destination destination = 
brokers.get("A").broker.getDestination(advisory);
+                if (destination != null) {
+                    actualDeq += 
destination.getDestinationStatistics().getDequeues().getCount();
+                    actualInflight += 
destination.getDestinationStatistics().getInflight().getCount();
                 }
-                LOG.info("A Deq:" + actualDeq);
-                LOG.info("A Inflight:" + actualInflight);
-                return actualDeq == dequeue && actualInflight == inflight;
             }
+            LOG.info("A Deq:{} (expected >={}), Inflight:{} (expected >={})",
+                    actualDeq, dequeue, actualInflight, inflight);
+            return actualDeq >= dequeue && actualInflight >= inflight;
         }));
     }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
index 880b394153..dd4dd8e356 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
@@ -74,6 +74,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
 
     @Test(timeout=120000)
     public void testPriorityMessages() throws Exception {
+        final ActiveMQQueue destination = new ActiveMQQueue("TestQ");
 
         for (int i = 0; i < ITERATIONS; i++) {
 
@@ -85,11 +86,15 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
 
             LOG.info("On iteration {}", i);
 
-            Thread.sleep(1000);
+            // Wait for all messages to be enqueued before consuming
+            assertTrue("Messages enqueued", Wait.waitFor(() -> {
+                final Queue queue = (Queue) broker.getDestination(destination);
+                return queue != null && 
queue.getDestinationStatistics().getMessages().getCount() == 5;
+            }, 5000, 100));
 
             // consume messages
-            ArrayList<Message> consumeList = consumeMessages("TestQ", 5, 
TimeUnit.SECONDS.toMillis(30));
-            LOG.info("Consumed list " + consumeList.size());
+            final ArrayList<Message> consumeList = consumeMessages("TestQ", 5, 
TimeUnit.SECONDS.toMillis(30));
+            LOG.info("Consumed list {}", consumeList.size());
 
             // compare lists
             assertEquals("message 1 should be priority high", 5, 
consumeList.get(0).getJMSPriority());
@@ -170,27 +175,32 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
     public void testPriorityMessagesWithJmsBrowser() throws Exception {
 
         final int numToSend = 5;
+        final ActiveMQQueue destination = new ActiveMQQueue("TestQ");
 
         for (int i = 0; i < ITERATIONS; i++) {
             produceMessages(numToSend - 1, 4, "TestQ");
 
-            ArrayList<Message> browsed = browseMessages("TestQ");
+            final ArrayList<Message> browsed = browseMessages("TestQ");
 
             LOG.info("Browsed: {}", browsed.size());
 
             // send 1 message priority HIGH
             produceMessages(1, 5, "TestQ");
 
-            Thread.sleep(1000);
+            // Wait for all messages to be enqueued
+            assertTrue("Messages enqueued", Wait.waitFor(() -> {
+                final Queue queue = (Queue) broker.getDestination(destination);
+                return queue != null && 
queue.getDestinationStatistics().getMessages().getCount() == numToSend;
+            }, 5000, 100));
 
             LOG.info("On iteration {}", i);
 
-            Message message = consumeOneMessage("TestQ");
+            final Message message = consumeOneMessage("TestQ");
             assertNotNull(message);
             assertEquals(5, message.getJMSPriority());
 
             // consume messages
-            ArrayList<Message> consumeList = consumeMessages("TestQ");
+            final ArrayList<Message> consumeList = consumeMessages("TestQ");
             LOG.info("Consumed list {}", consumeList.size());
 
             // compare lists
@@ -217,30 +227,37 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
                 return queue != null && 
queue.getDestinationStatistics().getMessages().getCount() == numToSend;
             }, 5000, 100));
 
-            ArrayList<Message> browsed = browseMessages("TestQ");
+            final ArrayList<Message> browsed = browseMessages("TestQ");
 
             LOG.info("Browsed: {}", browsed.size());
 
             assertEquals(0, browsed.size());
 
-            Message message = consumeOneMessage("TestQ", 
Session.CLIENT_ACKNOWLEDGE);
+            final Message message = consumeOneMessage("TestQ", 
Session.CLIENT_ACKNOWLEDGE);
             assertNotNull(message);
 
-            browsed = browseMessages("TestQ");
+            final ArrayList<Message> browsedAfterPull = 
browseMessages("TestQ");
 
-            LOG.info("Browsed: {}", browsed.size());
+            LOG.info("Browsed: {}", browsedAfterPull.size());
 
-            assertEquals("see only the paged in for pull", 1, browsed.size());
+            assertEquals("see only the paged in for pull", 1, 
browsedAfterPull.size());
 
-            // Wait for all messages to be available (including redelivery of 
unacked message)
+            // Wait for the unacked message to be fully redelivered after 
connection close.
+            // messages.getCount() includes in-flight messages so it's already 
== numToSend;
+            // we must also check inflight == 0 to ensure the redelivery 
processing is complete
+            // and all messages are truly available for dispatch to a new 
consumer.
             assertTrue("All messages available for consumption", 
Wait.waitFor(() -> {
                 final Queue queue = (Queue) broker.getDestination(destination);
-                return queue != null && 
queue.getDestinationStatistics().getMessages().getCount() == numToSend;
+                return queue != null
+                    && 
queue.getDestinationStatistics().getMessages().getCount() == numToSend
+                    && 
queue.getDestinationStatistics().getInflight().getCount() == 0;
             }, 5000, 100));
 
-            // consume messages
-            ArrayList<Message> consumeList = consumeMessages("TestQ");
-            LOG.info("Consumed list " + consumeList.size());
+            // Use the retry-based consume to handle zero-prefetch dispatch 
timing:
+            // with prioritized messages + lazy dispatch + redelivered 
messages in the
+            // dispatch pending list, the pull mechanism may need multiple 
attempts on slow CI.
+            final ArrayList<Message> consumeList = consumeMessages("TestQ", 
numToSend, TimeUnit.SECONDS.toMillis(30));
+            LOG.info("Consumed list {}", consumeList.size());
             assertEquals(numToSend, consumeList.size());
         }
     }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
index 73375b727a..905ce34294 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
@@ -89,15 +89,9 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
                 "ceposta = 'redhat'");
 
 
-        Wait.waitFor(new Wait.Condition() {
-
-            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dest.getConsumers().size() == 2;
-            }
-        }, 500);
+        final Destination destA0 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+        assertTrue("advisories should propagate: 2 consumers on BrokerA",
+                Wait.waitFor(() -> destA0.getConsumers().size() == 2, 5000, 
100));
 
         selectors = cache.selectorsForDestination(testQueue);
         assertEquals(1, selectors.size());
@@ -289,14 +283,9 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         MessageConsumer nonSelectingConsumer = createConsumer("BrokerB", 
consumerBQueue);
 
         // let advisories propogate
-        Wait.waitFor(new Wait.Condition() {
-            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dest.getConsumers().size() == 2;
-            }
-        }, 500);
+        final Destination destA1 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+        assertTrue("advisories should propagate: 2 consumers on BrokerA",
+                Wait.waitFor(() -> destA1.getConsumers().size() == 2, 5000, 
100));
 
 
         Destination destination = getDestination(brokerB, consumerBQueue);
@@ -344,14 +333,9 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
 
         // let advisories propogate
-        Wait.waitFor(new Wait.Condition() {
-            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dest.getConsumers().size() == 1;
-            }
-        }, 500);
+        final Destination destA2 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+        assertTrue("advisories should propagate: 1 consumer on BrokerA",
+                Wait.waitFor(() -> destA2.getConsumers().size() == 1, 5000, 
100));
 
         // and let's send messages with a selector that doesnt' match
         selectingConsumerMessages.flushMessages();
@@ -384,14 +368,9 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         selectingConsumer.close();
 
         // let advisories propogate
-        Wait.waitFor(new Wait.Condition() {
-            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dest.getConsumers().size() == 0;
-            }
-        }, 500);
+        final Destination destA3 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+        assertTrue("advisories should propagate: 0 consumers on BrokerA",
+                Wait.waitFor(() -> destA3.getConsumers().isEmpty(), 5000, 
100));
 
         selectingConsumerMessages.flushMessages();
 
@@ -399,7 +378,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
 
         // assert broker A stats
-        waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
+        waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 30, 20, 5000);
         assertEquals(30, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -422,14 +401,9 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         assertEquals(10, selectingConsumerMessages.getMessageCount());
 
         // let advisories propogate
-        Wait.waitFor(new Wait.Condition() {
-            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dest.getConsumers().size() == 1;
-            }
-        }, 500);
+        final Destination destA4 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+        assertTrue("advisories should propagate: 1 consumer on BrokerA",
+                Wait.waitFor(() -> destA4.getConsumers().size() == 1, 5000, 
100));
 
         // assert broker A stats
         waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
@@ -684,23 +658,11 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
             destination = new ActiveMQQueue(destinationName);
         }
 
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-
-                return broker.getDestination(destination)
-                        .getDestinationStatistics().getEnqueues().getCount() 
== numEnqueueMsgs;
-            }
-        }, waitTime);
-
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
+        Wait.waitFor(() -> broker.getDestination(destination)
+                .getDestinationStatistics().getEnqueues().getCount() >= 
numEnqueueMsgs, waitTime);
 
-                return broker.getDestination(destination)
-                        .getDestinationStatistics().getDequeues().getCount() 
== numDequeueMsgs;
-            }
-        }, waitTime);
+        Wait.waitFor(() -> broker.getDestination(destination)
+                .getDestinationStatistics().getDequeues().getCount() >= 
numDequeueMsgs, waitTime);
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to