This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 6581ed7e74 NO-JIRA: Fix flaky DurableSubscriptionHangTestCase
6581ed7e74 is described below

commit 6581ed7e74cf1ad9ca95aa192a2f6cd05a464a5d
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jun 5 07:13:40 2025 -0400

    NO-JIRA: Fix flaky DurableSubscriptionHangTestCase
    
    This fixes the test by correctly waiting for expiration and greatly
    speeds things up by reducing how many messages are sent
---
 .../usecases/DurableSubscriptionHangTestCase.java  | 22 ++++++++++++----------
 1 file changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
index 708824f4eb..7aabf7338b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
@@ -30,6 +30,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.lang.RandomStringUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class DurableSubscriptionHangTestCase {
     private static final Logger LOG = 
LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class);
@@ -55,7 +58,7 @@ public class DurableSubscriptionHangTestCase {
         brokerService.setBrokerName(brokerName);
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
-        defaultEntry.setExpireMessagesPeriod(5000);
+        defaultEntry.setExpireMessagesPeriod(1000);
         policyMap.setDefaultEntry(defaultEntry);
         brokerService.setDestinationPolicy(policyMap);
         brokerService.start();
@@ -67,12 +70,13 @@ public class DurableSubscriptionHangTestCase {
     }
 
        @Test
-       public void testHanging() throws Exception
-       {
+       public void testHanging() throws Exception {
                registerDurableSubscription();
                produceExpiredAndOneNonExpiredMessages();
-               TimeUnit.SECONDS.sleep(10);             // make sure messages 
are expired
-        Message message = collectMessagesFromDurableSubscriptionForOneMinute();
+               assertTrue(Wait.waitFor(() -> brokerService.getDestination(new 
ActiveMQTopic(topicName))
+                               
.getDestinationStatistics().getExpired().getCount() == 1000, 30000, 500));
+
+        Message message = getUnexpiredMessageFromDurableSubscription();
         LOG.info("got message:" + message);
         assertNotNull("Unable to read unexpired message", message);
        }
@@ -84,8 +88,7 @@ public class DurableSubscriptionHangTestCase {
         Topic topic = session.createTopic(topicName);
         MessageProducer producer = session.createProducer(topic);
         producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1));
-        for(int i=0; i<40000; i++)
-        {
+        for(int i = 0; i < 1000; i++) {
                sendRandomMessage(session, producer);
         }
         producer.setTimeToLive(TimeUnit.DAYS.toMillis(1));
@@ -108,8 +111,7 @@ public class DurableSubscriptionHangTestCase {
                LOG.info("Durable Sub Registered");
        }
 
-       private Message collectMessagesFromDurableSubscriptionForOneMinute() 
throws Exception
-       {
+       private Message getUnexpiredMessageFromDurableSubscription() throws 
Exception {
                ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://" + brokerName);
                TopicConnection connection = 
connectionFactory.createTopicConnection();
 
@@ -119,7 +121,7 @@ public class DurableSubscriptionHangTestCase {
                connection.start();
                TopicSubscriber subscriber = 
topicSession.createDurableSubscriber(topic, durableSubName);
                LOG.info("About to receive messages");
-               Message message = subscriber.receive(120000);
+               Message message = subscriber.receive(1000);
                subscriber.close();
                connection.close();
                LOG.info("collectMessagesFromDurableSubscriptionForOneMinute 
done");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to