This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 4b6e4a98c8 Refactor JMSConsumerTest and ZeroPrefetchConsumerTest
(#1628)
4b6e4a98c8 is described below
commit 4b6e4a98c87e8cf2088da839b595d471bfbef646
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Jan 23 15:47:22 2026 +0100
Refactor JMSConsumerTest and ZeroPrefetchConsumerTest (#1628)
* Refactor JMSConsumerTest and ZeroPrefetchConsumerTest to eliminate
Thread.sleep and improve message assertion logic
* Enhance MKahaDBTxRecoveryTest to wait for async cleanup after corruption
detection
---
.../java/org/apache/activemq/JMSConsumerTest.java | 111 ++++++++++-----------
.../org/apache/activemq/JmsQueueBrowserTest.java | 23 +++++
.../apache/activemq/ZeroPrefetchConsumerTest.java | 22 +++-
.../activemq/bugs/MKahaDBTxRecoveryTest.java | 19 +++-
4 files changed, 110 insertions(+), 65 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 8008f6fb93..b0d45f8b2c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -43,6 +43,8 @@ import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
@@ -254,26 +256,23 @@ public class JMSConsumerTest extends JmsTestSupport {
}
final List<Subscription> subscriptions =
getDestinationConsumers(broker, destination);
- Thread.sleep(1000);
- assertTrue("prefetch extension..",
+ assertTrue("prefetch extension..", Wait.waitFor(() ->
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s ->
((TopicSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 4));
+ allMatch(e -> e == 4)
+ , TimeUnit.SECONDS.toMillis(5), 100));
assertNull(consumer.receiveNoWait());
message.acknowledge();
- assertTrue("prefetch extension back to 0", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return subscriptions.stream().
+ assertTrue("prefetch extension back to 0", Wait.waitFor(() ->
+ subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s ->
((TopicSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 0);
- }
- }));
+ allMatch(e -> e == 0)
+ ));
}
@@ -299,29 +298,23 @@ public class JMSConsumerTest extends JmsTestSupport {
final List<Subscription> subscriptions =
getDestinationConsumers(broker, destination);
- assertTrue("prefetch extension..", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return subscriptions.stream().
- filter(s -> s instanceof QueueSubscription).
- mapToInt(s ->
((QueueSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 4);
- }
- }));
+ assertTrue("prefetch extension..", Wait.waitFor(() ->
+ subscriptions.stream().
+ filter(s -> s instanceof QueueSubscription).
+ mapToInt(s ->
((QueueSubscription)s).getPrefetchExtension().get()).
+ allMatch(e -> e == 4)
+ ));
assertNull(consumer.receiveNoWait());
message.acknowledge();
- assertTrue("prefetch extension back to 0", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return subscriptions.stream().
+ assertTrue("prefetch extension back to 0", Wait.waitFor(() ->
+ subscriptions.stream().
filter(s -> s instanceof QueueSubscription).
mapToInt(s ->
((QueueSubscription)s).getPrefetchExtension().get()).
- allMatch(e -> e == 0);
- }
- }));
+ allMatch(e -> e == 0)
+ ));
}
public void initCombosForTestDurableConsumerSelectorChange() {
@@ -429,10 +422,9 @@ public class JMSConsumerTest extends JmsTestSupport {
});
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
@@ -463,10 +455,9 @@ public class JMSConsumerTest extends JmsTestSupport {
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void
initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue()
{
@@ -551,7 +542,7 @@ public class JMSConsumerTest extends JmsTestSupport {
});
assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
+ assertNoAdditionalMessages(counter, 5);
// assert msg 2 was redelivered as close() from onMessages() will only
ack in auto_ack and dups_ok mode
assertEquals(5, counter.get());
@@ -637,11 +628,9 @@ public class JMSConsumerTest extends JmsTestSupport {
});
assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
- // close from onMessage with Auto_ack will ack
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
@@ -676,10 +665,9 @@ public class JMSConsumerTest extends JmsTestSupport {
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestMessageListenerWithConsumer() {
@@ -712,10 +700,9 @@ public class JMSConsumerTest extends JmsTestSupport {
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
// Make sure only 4 messages were delivered.
- assertEquals(4, counter.get());
+ assertNoAdditionalMessages(counter, 4);
}
public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
@@ -795,19 +782,16 @@ public class JMSConsumerTest extends JmsTestSupport {
MessageConsumer consumer2 = session2.createConsumer(destination);
// Wait for consumer2 to fully register with the broker
- assertTrue("consumer2 registered", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return getDestinationConsumers(broker, destination).size() ==
2;
- }
- }, 5000));
+ assertTrue("consumer2 registered", Wait.waitFor(() ->
+ getDestinationConsumers(broker, destination).size() == 2
+ , TimeUnit.SECONDS.toMillis(5), 100));
// Pick up the first message.
- Message message1 = consumer.receive(1000);
+ Message message1 = consumer.receive(10_000);
assertNotNull(message1);
// Pick up the 2nd messages.
- Message message2 = consumer2.receive(5000);
+ Message message2 = consumer2.receive(10_000);
assertNotNull(message2);
session.commit();
@@ -1019,26 +1003,27 @@ public class JMSConsumerTest extends JmsTestSupport {
Session sendSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(destination);
- producer.setTimeToLive(500);
+ final int ttl = 500;
+ producer.setTimeToLive(ttl);
final int count = 4;
for (int i = 0; i < count; i++) {
- TextMessage message = sendSession.createTextMessage("" + i);
+ final TextMessage message = sendSession.createTextMessage("" + i);
producer.send(message);
}
- // let first bunch in queue expire
- Thread.sleep(1000);
+ // let first bunch expire - messages expire based on TTL
+ Thread.sleep(ttl * 2L);
producer.setTimeToLive(0);
for (int i = 0; i < count; i++) {
- TextMessage message = sendSession.createTextMessage("no expiry" +
i);
+ final TextMessage message = sendSession.createTextMessage("no
expiry" + i);
producer.send(message);
}
- ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer)
consumer;
+ final ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer)
consumer;
for (int i=0; i<count; i++) {
- TextMessage msg = (TextMessage) amqConsumer.receive();
+ final TextMessage msg = (TextMessage) amqConsumer.receive();
assertNotNull(msg);
assertTrue("message has \"no expiry\" text: " + msg.getText(),
msg.getText().contains("no expiry"));
@@ -1047,12 +1032,14 @@ public class JMSConsumerTest extends JmsTestSupport {
}
assertEquals("consumer has expiredMessages", count,
amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
- DestinationViewMBean view = createView(destination);
-
- assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0,
view.getInFlightCount());
- assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8,
view.getDispatchCount());
- assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8,
view.getDequeueCount());
- assertEquals("Wrong expired count: " + view.getExpiredCount(), 4,
view.getExpiredCount());
+ // Wait for broker to update statistics
+ assertTrue("broker statistics updated", Wait.waitFor(() -> {
+ final DestinationViewMBean view = createView(destination);
+ return view.getInFlightCount() == 0 &&
+ view.getDispatchCount() == 8 &&
+ view.getDequeueCount() == 8 &&
+ view.getExpiredCount() == 4;
+ }, TimeUnit.SECONDS.toMillis(5), 100));
}
protected DestinationViewMBean createView(ActiveMQDestination destination)
throws Exception {
@@ -1066,4 +1053,10 @@ public class JMSConsumerTest extends JmsTestSupport {
}
return (DestinationViewMBean)
broker.getManagementContext().newProxyInstance(name,
DestinationViewMBean.class, true);
}
+
+ private void assertNoAdditionalMessages(final AtomicInteger counter, final
int expected) throws Exception {
+ assertFalse("unexpected additional messages received", Wait.waitFor(
+ (Wait.Condition) () -> counter.get() > expected,
+ TimeUnit.SECONDS.toMillis(2), 50));
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
index 48c0ba098c..4cfc6ea5d9 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
@@ -35,9 +36,11 @@ import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.test.annotations.ParallelTest;
@@ -136,6 +139,16 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
producer.send(outbound[i]);
}
+ // Wait for messages to be fully processed by the broker before
browsing
+ final int expectedCount = outbound.length;
+ assertTrue("messages arrived in queue", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ final Queue queueView = (Queue)
broker.getDestination(destination);
+ return queueView != null &&
queueView.getDestinationStatistics().getMessages().getCount() == expectedCount;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)));
+
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
@@ -149,6 +162,16 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
producer.send(outbound[i]);
}
+ // Wait for second batch of messages to be fully processed by the
broker before browsing
+ final int expectedCount2 = outbound.length * 2;
+ assertTrue("second batch arrived in queue", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ final Queue queueView = (Queue)
broker.getDestination(destination);
+ return queueView != null &&
queueView.getDestinationStatistics().getMessages().getCount() == expectedCount2;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)));
+
// verify second batch is visible to browse
browser = session.createBrowser(destination);
enumeration = browser.getEnumeration();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index abc25438f3..d4de2ee84a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -26,6 +26,8 @@ import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -331,14 +333,24 @@ public class ZeroPrefetchConsumerTest extends
EmbeddedBrokerTestSupport {
// https://issues.apache.org/jira/browse/AMQ-4224
public void testBrokerZeroPrefetchConfig() throws Exception {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(brokerZeroQueue);
+ final MessageProducer producer =
session.createProducer(brokerZeroQueue);
producer.send(session.createTextMessage("Msg1"));
// now lets receive it
- MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
+ final MessageConsumer consumer =
session.createConsumer(brokerZeroQueue);
- TextMessage answer = (TextMessage)consumer.receive(5000);
+ // Wait for broker subscription to be created and policy applied (same
as testBrokerZeroPrefetchConfigWithConsumerControl)
+ final ActiveMQDestination transformedDest =
ActiveMQDestination.transform(brokerZeroQueue);
+ org.apache.activemq.util.Wait.waitFor(new
org.apache.activemq.util.Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
+ &&
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
+
+ final TextMessage answer =
(TextMessage)consumer.receive(TimeUnit.SECONDS.toMillis(5));
assertNotNull("Consumer should have read a message", answer);
assertEquals("Should have received a message!", answer.getText(),
"Msg1");
}
@@ -358,7 +370,7 @@ public class ZeroPrefetchConsumerTest extends
EmbeddedBrokerTestSupport {
return
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
&&
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
}
- }, 5000, 100);
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
assertEquals("broker config prefetch in effect", 0,
consumer.info.getCurrentPrefetchSize());
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
index 6b21fa16a8..5ea39e1799 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -52,6 +52,7 @@ import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import javax.management.ObjectName;
import java.io.IOException;
+import java.io.File;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
@@ -435,7 +436,23 @@ public class MKahaDBTxRecoveryTest {
assertTrue("broker/store found corruption",
foundSomeCorruption.get());
assertTrue("broker/store ignored corruption",
ignoringCorruption.get());
+ // effectively wait for the async process to clean up after
corrupt detection
+ final File txStoreDir = new File(pathToDataDir, "mKahaDB/txStore");
+ assertTrue("txStore cleanup", Wait.waitFor(() -> {
+ File[] files = txStoreDir.listFiles((dir, name) ->
name.endsWith(".log"));
+ if (files == null || files.length == 0) {
+ return false;
+ }
+ for (File file : files) {
+ if ("db-1.log".equals(file.getName())) {
+ return false;
+ }
+ }
+ return true;
+ }, TimeUnit.SECONDS.toMillis(5), 100));
+
broker.stop();
+ broker.waitUntilStopped();
foundSomeCorruption.set(false);
ignoringCorruption.set(false);
@@ -544,4 +561,4 @@ public class MKahaDBTxRecoveryTest {
}
return template;
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact