This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b6e4a98c8 Refactor JMSConsumerTest and ZeroPrefetchConsumerTest 
(#1628)
4b6e4a98c8 is described below

commit 4b6e4a98c87e8cf2088da839b595d471bfbef646
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Jan 23 15:47:22 2026 +0100

    Refactor JMSConsumerTest and ZeroPrefetchConsumerTest (#1628)
    
    * Refactor JMSConsumerTest and ZeroPrefetchConsumerTest to eliminate 
Thread.sleep and improve message assertion logic
    
    * Enhance MKahaDBTxRecoveryTest to wait for async cleanup after corruption 
detection
---
 .../java/org/apache/activemq/JMSConsumerTest.java  | 111 ++++++++++-----------
 .../org/apache/activemq/JmsQueueBrowserTest.java   |  23 +++++
 .../apache/activemq/ZeroPrefetchConsumerTest.java  |  22 +++-
 .../activemq/bugs/MKahaDBTxRecoveryTest.java       |  19 +++-
 4 files changed, 110 insertions(+), 65 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 8008f6fb93..b0d45f8b2c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -43,6 +43,8 @@ import javax.management.ObjectName;
 import junit.framework.Test;
 
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.QueueSubscription;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
@@ -254,26 +256,23 @@ public class JMSConsumerTest extends JmsTestSupport {
         }
 
         final List<Subscription> subscriptions = 
getDestinationConsumers(broker, destination);
-        Thread.sleep(1000);
 
-        assertTrue("prefetch extension..",
+        assertTrue("prefetch extension..", Wait.waitFor(() ->
                 subscriptions.stream().
                         filter(s -> s instanceof TopicSubscription).
                         mapToInt(s -> 
((TopicSubscription)s).getPrefetchExtension().get()).
-                        allMatch(e -> e == 4));
+                        allMatch(e -> e == 4)
+        , TimeUnit.SECONDS.toMillis(5), 100));
 
         assertNull(consumer.receiveNoWait());
         message.acknowledge();
 
-        assertTrue("prefetch extension back to 0", Wait.waitFor(new 
Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return subscriptions.stream().
+        assertTrue("prefetch extension back to 0", Wait.waitFor(() ->
+                subscriptions.stream().
                         filter(s -> s instanceof TopicSubscription).
                         mapToInt(s -> 
((TopicSubscription)s).getPrefetchExtension().get()).
-                        allMatch(e -> e == 0);
-            }
-        }));
+                        allMatch(e -> e == 0)
+        ));
 
     }
 
@@ -299,29 +298,23 @@ public class JMSConsumerTest extends JmsTestSupport {
 
         final List<Subscription> subscriptions = 
getDestinationConsumers(broker, destination);
 
-        assertTrue("prefetch extension..", Wait.waitFor(new Wait.Condition() {
-                    @Override
-                    public boolean isSatisified() throws Exception {
-                        return subscriptions.stream().
-                                filter(s -> s instanceof QueueSubscription).
-                                mapToInt(s -> 
((QueueSubscription)s).getPrefetchExtension().get()).
-                                allMatch(e -> e == 4);
-                    }
-                }));
+        assertTrue("prefetch extension..", Wait.waitFor(() ->
+                subscriptions.stream().
+                        filter(s -> s instanceof QueueSubscription).
+                        mapToInt(s -> 
((QueueSubscription)s).getPrefetchExtension().get()).
+                        allMatch(e -> e == 4)
+        ));
 
 
         assertNull(consumer.receiveNoWait());
         message.acknowledge();
 
-        assertTrue("prefetch extension back to 0", Wait.waitFor(new 
Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return subscriptions.stream().
+        assertTrue("prefetch extension back to 0", Wait.waitFor(() ->
+                subscriptions.stream().
                         filter(s -> s instanceof QueueSubscription).
                         mapToInt(s -> 
((QueueSubscription)s).getPrefetchExtension().get()).
-                        allMatch(e -> e == 0);
-            }
-        }));
+                        allMatch(e -> e == 0)
+        ));
     }
 
     public void initCombosForTestDurableConsumerSelectorChange() {
@@ -429,10 +422,9 @@ public class JMSConsumerTest extends JmsTestSupport {
         });
 
         assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
 
         // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
+        assertNoAdditionalMessages(counter, 4);
     }
 
     public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
@@ -463,10 +455,9 @@ public class JMSConsumerTest extends JmsTestSupport {
         sendMessages(session, destination, 4);
 
         assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
 
         // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
+        assertNoAdditionalMessages(counter, 4);
     }
 
     public void 
initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() 
{
@@ -551,7 +542,7 @@ public class JMSConsumerTest extends JmsTestSupport {
         });
 
         assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
+        assertNoAdditionalMessages(counter, 5);
 
         // assert msg 2 was redelivered as close() from onMessages() will only 
ack in auto_ack and dups_ok mode
         assertEquals(5, counter.get());
@@ -637,11 +628,9 @@ public class JMSConsumerTest extends JmsTestSupport {
         });
 
         assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
 
-        // close from onMessage with Auto_ack will ack
         // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
+        assertNoAdditionalMessages(counter, 4);
     }
 
     public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
@@ -676,10 +665,9 @@ public class JMSConsumerTest extends JmsTestSupport {
         sendMessages(session, destination, 4);
 
         assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
 
         // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
+        assertNoAdditionalMessages(counter, 4);
     }
 
     public void initCombosForTestMessageListenerWithConsumer() {
@@ -712,10 +700,9 @@ public class JMSConsumerTest extends JmsTestSupport {
         sendMessages(session, destination, 4);
 
         assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
-        Thread.sleep(200);
 
         // Make sure only 4 messages were delivered.
-        assertEquals(4, counter.get());
+        assertNoAdditionalMessages(counter, 4);
     }
 
     public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
@@ -795,19 +782,16 @@ public class JMSConsumerTest extends JmsTestSupport {
         MessageConsumer consumer2 = session2.createConsumer(destination);
 
         // Wait for consumer2 to fully register with the broker
-        assertTrue("consumer2 registered", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getDestinationConsumers(broker, destination).size() == 
2;
-            }
-        }, 5000));
+        assertTrue("consumer2 registered", Wait.waitFor(() ->
+                getDestinationConsumers(broker, destination).size() == 2
+        , TimeUnit.SECONDS.toMillis(5), 100));
 
         // Pick up the first message.
-        Message message1 = consumer.receive(1000);
+        Message message1 = consumer.receive(10_000);
         assertNotNull(message1);
 
         // Pick up the 2nd messages.
-        Message message2 = consumer2.receive(5000);
+        Message message2 = consumer2.receive(10_000);
         assertNotNull(message2);
 
         session.commit();
@@ -1019,26 +1003,27 @@ public class JMSConsumerTest extends JmsTestSupport {
 
         Session sendSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = sendSession.createProducer(destination);
-        producer.setTimeToLive(500);
+        final int ttl = 500;
+        producer.setTimeToLive(ttl);
         final int count = 4;
         for (int i = 0; i < count; i++) {
-            TextMessage message = sendSession.createTextMessage("" + i);
+            final TextMessage message = sendSession.createTextMessage("" + i);
             producer.send(message);
         }
 
-        // let first bunch in queue expire
-        Thread.sleep(1000);
+        // let first bunch expire - messages expire based on TTL
+        Thread.sleep(ttl * 2L);
 
         producer.setTimeToLive(0);
         for (int i = 0; i < count; i++) {
-            TextMessage message = sendSession.createTextMessage("no expiry" + 
i);
+            final TextMessage message = sendSession.createTextMessage("no 
expiry" + i);
             producer.send(message);
         }
 
-        ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) 
consumer;
+        final ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) 
consumer;
 
         for (int i=0; i<count; i++) {
-            TextMessage msg = (TextMessage) amqConsumer.receive();
+            final TextMessage msg = (TextMessage) amqConsumer.receive();
             assertNotNull(msg);
             assertTrue("message has \"no expiry\" text: " + msg.getText(), 
msg.getText().contains("no expiry"));
 
@@ -1047,12 +1032,14 @@ public class JMSConsumerTest extends JmsTestSupport {
         }
         assertEquals("consumer has expiredMessages", count, 
amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
 
-        DestinationViewMBean view = createView(destination);
-
-        assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, 
view.getInFlightCount());
-        assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, 
view.getDispatchCount());
-        assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, 
view.getDequeueCount());
-        assertEquals("Wrong expired count: " + view.getExpiredCount(), 4, 
view.getExpiredCount());
+        // Wait for broker to update statistics
+        assertTrue("broker statistics updated", Wait.waitFor(() -> {
+            final DestinationViewMBean view = createView(destination);
+            return view.getInFlightCount() == 0 &&
+                   view.getDispatchCount() == 8 &&
+                   view.getDequeueCount() == 8 &&
+                   view.getExpiredCount() == 4;
+        }, TimeUnit.SECONDS.toMillis(5), 100));
     }
 
     protected DestinationViewMBean createView(ActiveMQDestination destination) 
throws Exception {
@@ -1066,4 +1053,10 @@ public class JMSConsumerTest extends JmsTestSupport {
         }
         return (DestinationViewMBean) 
broker.getManagementContext().newProxyInstance(name, 
DestinationViewMBean.class, true);
     }
+
+    private void assertNoAdditionalMessages(final AtomicInteger counter, final 
int expected) throws Exception {
+        assertFalse("unexpected additional messages received", Wait.waitFor(
+            (Wait.Condition) () -> counter.get() > expected,
+            TimeUnit.SECONDS.toMillis(2), 50));
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
index 48c0ba098c..4cfc6ea5d9 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import jakarta.jms.Message;
 import jakarta.jms.MessageConsumer;
@@ -35,9 +36,11 @@ import junit.framework.Test;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.activemq.test.annotations.ParallelTest;
@@ -136,6 +139,16 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
             producer.send(outbound[i]);
         }
 
+        // Wait for messages to be fully processed by the broker before 
browsing
+        final int expectedCount = outbound.length;
+        assertTrue("messages arrived in queue", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                final Queue queueView = (Queue) 
broker.getDestination(destination);
+                return queueView != null && 
queueView.getDestinationStatistics().getMessages().getCount() == expectedCount;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)));
+
         QueueBrowser browser = session.createBrowser(destination);
         Enumeration<?> enumeration = browser.getEnumeration();
 
@@ -149,6 +162,16 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
             producer.send(outbound[i]);
         }
 
+        // Wait for second batch of messages to be fully processed by the 
broker before browsing
+        final int expectedCount2 = outbound.length * 2;
+        assertTrue("second batch arrived in queue", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                final Queue queueView = (Queue) 
broker.getDestination(destination);
+                return queueView != null && 
queueView.getDestinationStatistics().getMessages().getCount() == expectedCount2;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)));
+
         // verify second batch is visible to browse
         browser = session.createBrowser(destination);
         enumeration = browser.getEnumeration();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index abc25438f3..d4de2ee84a 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -26,6 +26,8 @@ import jakarta.jms.Queue;
 import jakarta.jms.Session;
 import jakarta.jms.TextMessage;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -331,14 +333,24 @@ public class ZeroPrefetchConsumerTest extends 
EmbeddedBrokerTestSupport {
 
     // https://issues.apache.org/jira/browse/AMQ-4224
     public void testBrokerZeroPrefetchConfig() throws Exception {
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-        MessageProducer producer = session.createProducer(brokerZeroQueue);
+        final MessageProducer producer = 
session.createProducer(brokerZeroQueue);
         producer.send(session.createTextMessage("Msg1"));
         // now lets receive it
-        MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
+        final MessageConsumer consumer = 
session.createConsumer(brokerZeroQueue);
 
-        TextMessage answer = (TextMessage)consumer.receive(5000);
+        // Wait for broker subscription to be created and policy applied (same 
as testBrokerZeroPrefetchConfigWithConsumerControl)
+        final ActiveMQDestination transformedDest = 
ActiveMQDestination.transform(brokerZeroQueue);
+        org.apache.activemq.util.Wait.waitFor(new 
org.apache.activemq.util.Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
+                    && 
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
+
+        final TextMessage answer = 
(TextMessage)consumer.receive(TimeUnit.SECONDS.toMillis(5));
         assertNotNull("Consumer should have read a message", answer);
         assertEquals("Should have received a message!", answer.getText(), 
"Msg1");
     }
@@ -358,7 +370,7 @@ public class ZeroPrefetchConsumerTest extends 
EmbeddedBrokerTestSupport {
                 return 
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
                     && 
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
             }
-        }, 5000, 100);
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
 
         assertEquals("broker config prefetch in effect", 0, 
consumer.info.getCurrentPrefetchSize());
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
index 6b21fa16a8..5ea39e1799 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -52,6 +52,7 @@ import jakarta.jms.MessageProducer;
 import jakarta.jms.Session;
 import javax.management.ObjectName;
 import java.io.IOException;
+import java.io.File;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -435,7 +436,23 @@ public class MKahaDBTxRecoveryTest {
             assertTrue("broker/store found corruption", 
foundSomeCorruption.get());
             assertTrue("broker/store ignored corruption", 
ignoringCorruption.get());
 
+            // effectively wait for the async process to clean up after 
corrupt detection
+            final File txStoreDir = new File(pathToDataDir, "mKahaDB/txStore");
+            assertTrue("txStore cleanup", Wait.waitFor(() -> {
+                File[] files = txStoreDir.listFiles((dir, name) -> 
name.endsWith(".log"));
+                if (files == null || files.length == 0) {
+                    return false;
+                }
+                for (File file : files) {
+                    if ("db-1.log".equals(file.getName())) {
+                        return false;
+                    }
+                }
+                return true;
+            }, TimeUnit.SECONDS.toMillis(5), 100));
+
             broker.stop();
+            broker.waitUntilStopped();
 
             foundSomeCorruption.set(false);
             ignoringCorruption.set(false);
@@ -544,4 +561,4 @@ public class MKahaDBTxRecoveryTest {
         }
                return template;
        }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to