This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 6581ed7e74 NO-JIRA: Fix flaky DurableSubscriptionHangTestCase
6581ed7e74 is described below
commit 6581ed7e74cf1ad9ca95aa192a2f6cd05a464a5d
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jun 5 07:13:40 2025 -0400
NO-JIRA: Fix flaky DurableSubscriptionHangTestCase
This fixes the test by correctly waiting for expiration and greatly
speeds things up by reducing how many messages are sent
---
.../usecases/DurableSubscriptionHangTestCase.java | 22 ++++++++++++----------
1 file changed, 12 insertions(+), 10 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
index 708824f4eb..7aabf7338b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
@@ -30,6 +30,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.After;
import org.junit.Before;
@@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class DurableSubscriptionHangTestCase {
private static final Logger LOG =
LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class);
@@ -55,7 +58,7 @@ public class DurableSubscriptionHangTestCase {
brokerService.setBrokerName(brokerName);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setExpireMessagesPeriod(5000);
+ defaultEntry.setExpireMessagesPeriod(1000);
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
brokerService.start();
@@ -67,12 +70,13 @@ public class DurableSubscriptionHangTestCase {
}
@Test
- public void testHanging() throws Exception
- {
+ public void testHanging() throws Exception {
registerDurableSubscription();
produceExpiredAndOneNonExpiredMessages();
- TimeUnit.SECONDS.sleep(10); // make sure messages
are expired
- Message message = collectMessagesFromDurableSubscriptionForOneMinute();
+ assertTrue(Wait.waitFor(() -> brokerService.getDestination(new
ActiveMQTopic(topicName))
+
.getDestinationStatistics().getExpired().getCount() == 1000, 30000, 500));
+
+ Message message = getUnexpiredMessageFromDurableSubscription();
LOG.info("got message:" + message);
assertNotNull("Unable to read unexpired message", message);
}
@@ -84,8 +88,7 @@ public class DurableSubscriptionHangTestCase {
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1));
- for(int i=0; i<40000; i++)
- {
+ for(int i = 0; i < 1000; i++) {
sendRandomMessage(session, producer);
}
producer.setTimeToLive(TimeUnit.DAYS.toMillis(1));
@@ -108,8 +111,7 @@ public class DurableSubscriptionHangTestCase {
LOG.info("Durable Sub Registered");
}
- private Message collectMessagesFromDurableSubscriptionForOneMinute()
throws Exception
- {
+ private Message getUnexpiredMessageFromDurableSubscription() throws
Exception {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection =
connectionFactory.createTopicConnection();
@@ -119,7 +121,7 @@ public class DurableSubscriptionHangTestCase {
connection.start();
TopicSubscriber subscriber =
topicSession.createDurableSubscriber(topic, durableSubName);
LOG.info("About to receive messages");
- Message message = subscriber.receive(120000);
+ Message message = subscriber.receive(1000);
subscriber.close();
connection.close();
LOG.info("collectMessagesFromDurableSubscriptionForOneMinute
done");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact