This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 8d51a293f5 test(usecases): various fixes for flaky tests (#1704)
8d51a293f5 is described below
commit 8d51a293f5b374e6d685f97f493d7627f3915c1b
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Feb 20 17:03:53 2026 +0100
test(usecases): various fixes for flaky tests (#1704)
* Enhance QueueZeroPrefetchLazyDispatchPriorityTest with improved message
enqueuing checks and logging consistency
* Refactor ConnectionFailureEvictsFromPoolTest and
BrokerNetworkWithStuckMessagesTest for improved readability and consistency in
message assertions
* Update assertDeqInflight method to use >= for dequeues and inflight
checks, enhancing accuracy in advisory message assertions
* Add wait for broker to process connection drop in
RedeliveryRestartWithExceptionTest
* Fix handling of IllegalStateException in AMQ3166Test to prevent
transaction hang during async sends
* fix: avoid test hanging on Windows
* test(usecases): fix flaky advisory propagation waits in
TwoBrokerVirtualTopicSelectorAwareForwardingTest
---
.../JournalCorruptionEofIndexRecoveryTest.java | 12 ++--
.../pool/ConnectionFailureEvictsFromPoolTest.java | 6 +-
.../broker/RedeliveryRestartWithExceptionTest.java | 11 +++
.../java/org/apache/activemq/bugs/AMQ3166Test.java | 33 +++++----
.../BrokerNetworkWithStuckMessagesTest.java | 62 +++++------------
.../activemq/usecases/AdvisoryViaNetworkTest.java | 30 ++++-----
.../QueueZeroPrefetchLazyDispatchPriorityTest.java | 51 +++++++++-----
...kerVirtualTopicSelectorAwareForwardingTest.java | 78 ++++++----------------
8 files changed, 129 insertions(+), 154 deletions(-)
diff --git
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index c3b9fc2046..a49d37c916 100644
---
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -100,9 +100,9 @@ public class JournalCorruptionEofIndexRecoveryTest {
}
if (whackIndex) {
- File indexToDelete = new File(brokerDataDir, "db.data");
+ final File indexToDelete = new File(brokerDataDir, "db.data");
LOG.info("Whacking index: " + indexToDelete);
- indexToDelete.delete();
+ IOHelper.deleteFileNonBlocking(indexToDelete);
}
doStartBroker(false, forceRecoverIndex);
@@ -219,14 +219,15 @@ public class JournalCorruptionEofIndexRecoveryTest {
broker.getPersistenceAdapter().checkpoint(true);
Location location = ((KahaDBPersistenceAdapter)
broker.getPersistenceAdapter()).getStore().getMetadata().producerSequenceIdTrackerLocation;
- DataFile dataFile = ((KahaDBPersistenceAdapter)
broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
- RecoverableRandomAccessFile randomAccessFile =
dataFile.openRandomAccessFile();
+ final DataFile dataFile = ((KahaDBPersistenceAdapter)
broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
+ final RecoverableRandomAccessFile randomAccessFile =
dataFile.openRandomAccessFile();
randomAccessFile.seek(location.getOffset());
// Use an invalid size well past the end of the data file to trigger
corruption handling without large allocation.
- int bogusSize = ((KahaDBPersistenceAdapter)
broker.getPersistenceAdapter()).getStore().getJournal()
+ final int bogusSize = ((KahaDBPersistenceAdapter)
broker.getPersistenceAdapter()).getStore().getJournal()
.getFileMap().get(location.getDataFileId()).getLength() * 10;
randomAccessFile.writeInt(bogusSize);
randomAccessFile.getChannel().force(true);
+ dataFile.closeRandomAccessFile(randomAccessFile);
((KahaDBPersistenceAdapter)
broker.getPersistenceAdapter()).getStore().getJournal().close();
try {
@@ -437,6 +438,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
randomAccessFile.writeInt(4 * 1024 * 1024);
randomAccessFile.writeLong(0l);
randomAccessFile.getChannel().force(true);
+ dataFile.closeRandomAccessFile(randomAccessFile);
}
private void corruptOrderIndex(final int num, final int size) throws
Exception {
diff --git
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
index 12c46be421..596eb00fe3 100644
---
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
+++
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
@@ -116,8 +116,12 @@ public class ConnectionFailureEvictsFromPoolTest extends
TestSupport {
TestCase.fail("Expected Error");
} catch (JMSException e) {
}
+ // Wait for async exception event BEFORE the try-with-resources
closes the connection.
+ // ActiveMQConnection.onException() fires TransportListener
callbacks via executeAsync(),
+ // so the callback runs in a separate thread. If we wait after
connection.close(), the
+ // async executor may already be shut down and the callback never
fires.
+ TestCase.assertTrue("exception event propagated ok",
gotExceptionEvent.await(15, TimeUnit.SECONDS));
}
- TestCase.assertTrue("exception event propagated ok",
gotExceptionEvent.await(15, TimeUnit.SECONDS));
// If we get another connection now it should be a new connection that
// works.
LOG.info("expect new connection after failure");
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
index 7941f64703..36daff54e1 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
@@ -47,6 +47,7 @@ import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
@@ -241,6 +242,16 @@ public class RedeliveryRestartWithExceptionTest extends
TestSupport {
connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new
IOException("Die"));
+ // Wait for the broker to fully process the connection drop and return
+ // unacked messages to the queue. The onException triggers async
cleanup
+ // via executeAsync(), so without this wait the new consumer may
receive
+ // fresh messages (6-10) instead of the redelivered ones (1-5).
+ final ActiveMQQueue dest = new ActiveMQQueue(queueName);
+ assertTrue("unacked messages returned to queue", Wait.waitFor(() -> {
+ final org.apache.activemq.broker.region.Destination d =
broker.getDestination(dest);
+ return d != null &&
d.getDestinationStatistics().getInflight().getCount() == 0;
+ }, 10000, 100));
+
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
index 47d7753ce3..66d9a108a3 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
@@ -191,18 +191,25 @@ public class AMQ3166Test {
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)
session.createProducer(session.createQueue("QAT"));
for (int i=0; i<batchSize; i++) {
- producer.send(session.createTextMessage("Hello A"), new
AsyncCallback() {
- @Override
- public void onSuccess() {
- batchSent.countDown();
- }
+ try {
+ producer.send(session.createTextMessage("Hello A"), new
AsyncCallback() {
+ @Override
+ public void onSuccess() {
+ batchSent.countDown();
+ }
- @Override
- public void onException(JMSException e) {
- session.getTransactionContext().setRollbackOnly(true);
- batchSent.countDown();
- }
- });
+ @Override
+ public void onException(JMSException e) {
+ session.getTransactionContext().setRollbackOnly(true);
+ batchSent.countDown();
+ }
+ });
+ } catch (jakarta.jms.IllegalStateException alreadyRolledBack) {
+ // Async error from an earlier send may have already marked
the transaction
+ // rollback-only. Count down the latch so beforeEnd doesn't
hang.
+ batchSent.countDown();
+ continue;
+ }
if (i==0) {
// transaction context begun on first send
@@ -211,9 +218,9 @@ public class AMQ3166Test {
public void beforeEnd() throws Exception {
// await response to all sends in the batch
if (!batchSent.await(10, TimeUnit.SECONDS)) {
- LOG.error("TimedOut waiting for aync send
requests!");
+ LOG.error("TimedOut waiting for async send
requests!");
session.getTransactionContext().setRollbackOnly(true);
- };
+ }
super.beforeEnd();
}
});
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
index 0a199ded33..cca1d6401b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
@@ -235,15 +235,8 @@ public class BrokerNetworkWithStuckMessagesTest {
// Ensure that there are zero messages on the local broker. This tells
// us that those messages have been prefetched to the remote broker
// where the demand exists.
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- Object[] result = browseQueueWithJmx(localBroker);
- return 0 == result.length;
- }
- });
- messages = browseQueueWithJmx(localBroker);
- assertEquals(0, messages.length);
+ assertTrue("local broker drained", Wait.waitFor(() ->
+ browseQueueWithJmx(localBroker).length == 0));
// try and pull the messages from remote, should be denied b/c on
networkTtl
LOG.info("creating demand on second remote...");
@@ -270,15 +263,9 @@ public class BrokerNetworkWithStuckMessagesTest {
connection2.send(connectionInfo2.createRemoveCommand());
// There should now be 5 messages stuck on the remote broker
- assertTrue("correct stuck message count", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- Object[] result = browseQueueWithJmx(remoteBroker);
- return 5 == result.length;
- }
- }));
+ assertTrue("correct stuck message count", Wait.waitFor(() ->
+ browseQueueWithJmx(remoteBroker).length == 5));
messages = browseQueueWithJmx(remoteBroker);
- assertEquals(5, messages.length);
assertTrue("can see broker path property",
((String)((CompositeData)messages[1]).get("BrokerPath")).contains(localBroker.getBroker().getBrokerId().toString()));
@@ -295,15 +282,15 @@ public class BrokerNetworkWithStuckMessagesTest {
connection1.send(createAck(consumerInfo1, message1, 1,
MessageAck.INDIVIDUAL_ACK_TYPE));
LOG.info("acked one message on origin, waiting for all messages to
percolate back");
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- Object[] result = browseQueueWithJmx(localBroker);
- return 4 == result.length;
- }
- });
- messages = browseQueueWithJmx(localBroker);
- assertEquals(4, messages.length);
+ // Wait for ALL stuck messages to replay from remote back to local.
+ // Must check both brokers: local == 4 (5 replayed - 1 acked) AND
remote == 0
+ // (replay complete). Without the remote check, the Wait can return
when only
+ // 4 of 5 messages have arrived on local (transient match), then the
5th arrives.
+ assertTrue("messages percolated back", Wait.waitFor(() -> {
+ final Object[] localResult = browseQueueWithJmx(localBroker);
+ final Object[] remoteResult = browseQueueWithJmx(remoteBroker);
+ return localResult.length == 4 && remoteResult.length == 0;
+ }, TimeUnit.SECONDS.toMillis(30), 100));
LOG.info("checking for messages on remote again");
// messages won't migrate back again till consumer closes
@@ -329,25 +316,10 @@ public class BrokerNetworkWithStuckMessagesTest {
assertEquals(receiveNumMessages, counter);
// verify all messages consumed
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- Object[] result = browseQueueWithJmx(remoteBroker);
- return 0 == result.length;
- }
- });
- messages = browseQueueWithJmx(remoteBroker);
- assertEquals(0, messages.length);
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- Object[] result = browseQueueWithJmx(localBroker);
- return 0 == result.length;
- }
- });
- messages = browseQueueWithJmx(localBroker);
- assertEquals(0, messages.length);
+ assertTrue("remote drained", Wait.waitFor(() ->
+ browseQueueWithJmx(remoteBroker).length == 0));
+ assertTrue("local drained", Wait.waitFor(() ->
+ browseQueueWithJmx(localBroker).length == 0));
// Close the consumer on the remote broker
connection2.send(consumerInfo3.createRemoveCommand());
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
index 091957a075..e865b35670 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java
@@ -277,23 +277,23 @@ public class AdvisoryViaNetworkTest extends
JmsMultipleBrokersTestSupport {
private void assertDeqInflight(final int dequeue, final int inflight,
final ActiveMQTopic... topics) throws
Exception {
- assertTrue("deq and inflight as expected", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- long actualDeq = 0;
- long actualInflight = 0;
- for (ActiveMQTopic topic : topics) {
- ActiveMQTopic advisory =
AdvisorySupport.getConsumerAdvisoryTopic(topic);
- Destination destination =
brokers.get("A").broker.getDestination(advisory);
- if (destination != null) {
- actualDeq +=
destination.getDestinationStatistics().getDequeues().getCount();
- actualInflight +=
destination.getDestinationStatistics().getInflight().getCount();
- }
+ // Use >= instead of == because duplex bridges with statically
included destinations
+ // may generate additional advisory messages from the bridge's own
subscriptions,
+ // depending on subscription registration ordering.
+ assertTrue("deq and inflight as expected", Wait.waitFor(() -> {
+ long actualDeq = 0;
+ long actualInflight = 0;
+ for (final ActiveMQTopic topic : topics) {
+ final ActiveMQTopic advisory =
AdvisorySupport.getConsumerAdvisoryTopic(topic);
+ final Destination destination =
brokers.get("A").broker.getDestination(advisory);
+ if (destination != null) {
+ actualDeq +=
destination.getDestinationStatistics().getDequeues().getCount();
+ actualInflight +=
destination.getDestinationStatistics().getInflight().getCount();
}
- LOG.info("A Deq:" + actualDeq);
- LOG.info("A Inflight:" + actualInflight);
- return actualDeq == dequeue && actualInflight == inflight;
}
+ LOG.info("A Deq:{} (expected >={}), Inflight:{} (expected >={})",
+ actualDeq, dequeue, actualInflight, inflight);
+ return actualDeq >= dequeue && actualInflight >= inflight;
}));
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
index 880b394153..dd4dd8e356 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
@@ -74,6 +74,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
@Test(timeout=120000)
public void testPriorityMessages() throws Exception {
+ final ActiveMQQueue destination = new ActiveMQQueue("TestQ");
for (int i = 0; i < ITERATIONS; i++) {
@@ -85,11 +86,15 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
LOG.info("On iteration {}", i);
- Thread.sleep(1000);
+ // Wait for all messages to be enqueued before consuming
+ assertTrue("Messages enqueued", Wait.waitFor(() -> {
+ final Queue queue = (Queue) broker.getDestination(destination);
+ return queue != null &&
queue.getDestinationStatistics().getMessages().getCount() == 5;
+ }, 5000, 100));
// consume messages
- ArrayList<Message> consumeList = consumeMessages("TestQ", 5,
TimeUnit.SECONDS.toMillis(30));
- LOG.info("Consumed list " + consumeList.size());
+ final ArrayList<Message> consumeList = consumeMessages("TestQ", 5,
TimeUnit.SECONDS.toMillis(30));
+ LOG.info("Consumed list {}", consumeList.size());
// compare lists
assertEquals("message 1 should be priority high", 5,
consumeList.get(0).getJMSPriority());
@@ -170,27 +175,32 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
public void testPriorityMessagesWithJmsBrowser() throws Exception {
final int numToSend = 5;
+ final ActiveMQQueue destination = new ActiveMQQueue("TestQ");
for (int i = 0; i < ITERATIONS; i++) {
produceMessages(numToSend - 1, 4, "TestQ");
- ArrayList<Message> browsed = browseMessages("TestQ");
+ final ArrayList<Message> browsed = browseMessages("TestQ");
LOG.info("Browsed: {}", browsed.size());
// send 1 message priority HIGH
produceMessages(1, 5, "TestQ");
- Thread.sleep(1000);
+ // Wait for all messages to be enqueued
+ assertTrue("Messages enqueued", Wait.waitFor(() -> {
+ final Queue queue = (Queue) broker.getDestination(destination);
+ return queue != null &&
queue.getDestinationStatistics().getMessages().getCount() == numToSend;
+ }, 5000, 100));
LOG.info("On iteration {}", i);
- Message message = consumeOneMessage("TestQ");
+ final Message message = consumeOneMessage("TestQ");
assertNotNull(message);
assertEquals(5, message.getJMSPriority());
// consume messages
- ArrayList<Message> consumeList = consumeMessages("TestQ");
+ final ArrayList<Message> consumeList = consumeMessages("TestQ");
LOG.info("Consumed list {}", consumeList.size());
// compare lists
@@ -217,30 +227,37 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
return queue != null &&
queue.getDestinationStatistics().getMessages().getCount() == numToSend;
}, 5000, 100));
- ArrayList<Message> browsed = browseMessages("TestQ");
+ final ArrayList<Message> browsed = browseMessages("TestQ");
LOG.info("Browsed: {}", browsed.size());
assertEquals(0, browsed.size());
- Message message = consumeOneMessage("TestQ",
Session.CLIENT_ACKNOWLEDGE);
+ final Message message = consumeOneMessage("TestQ",
Session.CLIENT_ACKNOWLEDGE);
assertNotNull(message);
- browsed = browseMessages("TestQ");
+ final ArrayList<Message> browsedAfterPull =
browseMessages("TestQ");
- LOG.info("Browsed: {}", browsed.size());
+ LOG.info("Browsed: {}", browsedAfterPull.size());
- assertEquals("see only the paged in for pull", 1, browsed.size());
+ assertEquals("see only the paged in for pull", 1,
browsedAfterPull.size());
- // Wait for all messages to be available (including redelivery of
unacked message)
+ // Wait for the unacked message to be fully redelivered after
connection close.
+ // messages.getCount() includes in-flight messages so it's already
== numToSend;
+ // we must also check inflight == 0 to ensure the redelivery
processing is complete
+ // and all messages are truly available for dispatch to a new
consumer.
assertTrue("All messages available for consumption",
Wait.waitFor(() -> {
final Queue queue = (Queue) broker.getDestination(destination);
- return queue != null &&
queue.getDestinationStatistics().getMessages().getCount() == numToSend;
+ return queue != null
+ &&
queue.getDestinationStatistics().getMessages().getCount() == numToSend
+ &&
queue.getDestinationStatistics().getInflight().getCount() == 0;
}, 5000, 100));
- // consume messages
- ArrayList<Message> consumeList = consumeMessages("TestQ");
- LOG.info("Consumed list " + consumeList.size());
+ // Use the retry-based consume to handle zero-prefetch dispatch
timing:
+ // with prioritized messages + lazy dispatch + redelivered
messages in the
+ // dispatch pending list, the pull mechanism may need multiple
attempts on slow CI.
+ final ArrayList<Message> consumeList = consumeMessages("TestQ",
numToSend, TimeUnit.SECONDS.toMillis(30));
+ LOG.info("Consumed list {}", consumeList.size());
assertEquals(numToSend, consumeList.size());
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
index 73375b727a..905ce34294 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
@@ -89,15 +89,9 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
"ceposta = 'redhat'");
- Wait.waitFor(new Wait.Condition() {
-
- Destination dest = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
- @Override
- public boolean isSatisified() throws Exception {
- return dest.getConsumers().size() == 2;
- }
- }, 500);
+ final Destination destA0 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+ assertTrue("advisories should propagate: 2 consumers on BrokerA",
+ Wait.waitFor(() -> destA0.getConsumers().size() == 2, 5000,
100));
selectors = cache.selectorsForDestination(testQueue);
assertEquals(1, selectors.size());
@@ -289,14 +283,9 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
MessageConsumer nonSelectingConsumer = createConsumer("BrokerB",
consumerBQueue);
// let advisories propogate
- Wait.waitFor(new Wait.Condition() {
- Destination dest = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
- @Override
- public boolean isSatisified() throws Exception {
- return dest.getConsumers().size() == 2;
- }
- }, 500);
+ final Destination destA1 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+ assertTrue("advisories should propagate: 2 consumers on BrokerA",
+ Wait.waitFor(() -> destA1.getConsumers().size() == 2, 5000,
100));
Destination destination = getDestination(brokerB, consumerBQueue);
@@ -344,14 +333,9 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
// let advisories propogate
- Wait.waitFor(new Wait.Condition() {
- Destination dest = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
- @Override
- public boolean isSatisified() throws Exception {
- return dest.getConsumers().size() == 1;
- }
- }, 500);
+ final Destination destA2 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+ assertTrue("advisories should propagate: 1 consumer on BrokerA",
+ Wait.waitFor(() -> destA2.getConsumers().size() == 1, 5000,
100));
// and let's send messages with a selector that doesnt' match
selectingConsumerMessages.flushMessages();
@@ -384,14 +368,9 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
selectingConsumer.close();
// let advisories propogate
- Wait.waitFor(new Wait.Condition() {
- Destination dest = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
- @Override
- public boolean isSatisified() throws Exception {
- return dest.getConsumers().size() == 0;
- }
- }, 500);
+ final Destination destA3 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+ assertTrue("advisories should propagate: 0 consumers on BrokerA",
+ Wait.waitFor(() -> destA3.getConsumers().isEmpty(), 5000,
100));
selectingConsumerMessages.flushMessages();
@@ -399,7 +378,7 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
// assert broker A stats
- waitForMessagesToBeConsumed(brokerA,
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
+ waitForMessagesToBeConsumed(brokerA,
"Consumer.B.VirtualTopic.tempTopic", false, 30, 20, 5000);
assertEquals(30, brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount());
assertEquals(20, brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -422,14 +401,9 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
assertEquals(10, selectingConsumerMessages.getMessageCount());
// let advisories propogate
- Wait.waitFor(new Wait.Condition() {
- Destination dest = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
- @Override
- public boolean isSatisified() throws Exception {
- return dest.getConsumers().size() == 1;
- }
- }, 500);
+ final Destination destA4 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+ assertTrue("advisories should propagate: 1 consumer on BrokerA",
+ Wait.waitFor(() -> destA4.getConsumers().size() == 1, 5000,
100));
// assert broker A stats
waitForMessagesToBeConsumed(brokerA,
"Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
@@ -684,23 +658,11 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
destination = new ActiveMQQueue(destinationName);
}
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
-
- return broker.getDestination(destination)
- .getDestinationStatistics().getEnqueues().getCount()
== numEnqueueMsgs;
- }
- }, waitTime);
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
+ Wait.waitFor(() -> broker.getDestination(destination)
+ .getDestinationStatistics().getEnqueues().getCount() >=
numEnqueueMsgs, waitTime);
- return broker.getDestination(destination)
- .getDestinationStatistics().getDequeues().getCount()
== numDequeueMsgs;
- }
- }, waitTime);
+ Wait.waitFor(() -> broker.getDestination(destination)
+ .getDestinationStatistics().getDequeues().getCount() >=
numDequeueMsgs, waitTime);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact