This is an automated email from the ASF dual-hosted git repository.

jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f3b4cc017 fix(test): fix MQTTTest#testReceiveMessageSentWhileOffline 
flakiness (#1751)
1f3b4cc017 is described below

commit 1f3b4cc017c8c1ded35301d1ae18c6ea837c2181
Author: JB Onofré <[email protected]>
AuthorDate: Tue Mar 10 22:49:11 2026 +0100

    fix(test): fix MQTTTest#testReceiveMessageSentWhileOffline flakiness (#1751)
    
    * fix(test): fix MQTTTest#testReceiveMessageSentWhileOffline flakiness
    
    subscribe() returns on SUBACK but the broker processes ConsumerInfo
    asynchronously, so the durable subscription may not be fully reactivated
    when receive() is called. Add Wait.waitFor on isSubscriptionActive()
    after subscribe to ensure the broker has activated the subscription
    before attempting to receive queued messages.
    
    Also remove the try/catch that silently swallowed assertion failures
    mid-loop, which masked the real failure with a misleading count mismatch.
    
    * fix(test): increase subscription activation timeout to 30s in MQTTTest
    
    The NIO+SSL transport variant (MQTTNIOSSLTest) can be slower to
    reactivate subscriptions under CI load, causing the 15s timeout to
    be insufficient.
---
 .../apache/activemq/transport/mqtt/MQTTTest.java   | 68 +++++++++++++---------
 1 file changed, 42 insertions(+), 26 deletions(-)

diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 1f95b03e8d..6729a968d7 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1640,7 +1640,7 @@ public class MQTTTest extends MQTTTestSupport {
         assertEquals("Should have received " + topics.length + " messages", 
topics.length, received);
     }
 
-    @Test(timeout = 60 * 1000)
+    @Test(timeout = 2 * 60 * 1000)
     public void testReceiveMessageSentWhileOffline() throws Exception {
         final byte[] payload = new byte[1024 * 32];
         for (int i = 0; i < payload.length; i++) {
@@ -1679,36 +1679,38 @@ public class MQTTTest extends MQTTTestSupport {
         // Wait for broker to process disconnect before publishing messages 
for offline delivery.
         assertTrue("Subscription should become inactive",
                 Wait.waitFor(() -> isSubscriptionInactive(topics[0], 
mqttSub.getClientId().toString()),
-                        TimeUnit.SECONDS.toMillis(5), 100));
-
-        try {
-            for (int j = 0; j < numberOfRuns; j++) {
+                        TimeUnit.SECONDS.toMillis(10), 100));
 
-                for (int i = 0; i < messagesPerRun; ++i) {
-                    connectionPub.publish(topics[0].name().toString(), 
payload, QoS.AT_LEAST_ONCE, false);
-                }
+        for (int j = 0; j < numberOfRuns; j++) {
 
-                connectionSub = mqttSub.blockingConnection();
-                connectionSub.connect();
-                connectionSub.subscribe(topics);
+            for (int i = 0; i < messagesPerRun; ++i) {
+                connectionPub.publish(topics[0].name().toString(), payload, 
QoS.AT_LEAST_ONCE, false);
+            }
 
-                for (int i = 0; i < messagesPerRun; ++i) {
-                    Message message = connectionSub.receive(5, 
TimeUnit.SECONDS);
-                    assertNotNull(message);
-                    received++;
-                    assertTrue(Arrays.equals(payload, message.getPayload()));
-                    message.ack();
-                }
-                connectionSub.disconnect();
+            connectionSub = mqttSub.blockingConnection();
+            connectionSub.connect();
+            connectionSub.subscribe(topics);
 
-                // Wait for broker to process disconnect before next iteration 
publishes
-                assertTrue("Subscription should become inactive",
-                        Wait.waitFor(() -> isSubscriptionInactive(topics[0], 
mqttSub.getClientId().toString()),
-                                TimeUnit.SECONDS.toMillis(5), 100));
+            // Wait for broker to fully activate the subscription and start 
dispatching
+            // queued messages. subscribe() returns on SUBACK but broker 
processes the
+            // ConsumerInfo asynchronously, so messages may not be ready for 
dispatch yet.
+            assertTrue("Subscription should become active in run " + (j + 1),
+                    Wait.waitFor(() -> isSubscriptionActive(topics[0], 
mqttSub.getClientId().toString()),
+                            TimeUnit.SECONDS.toMillis(30), 100));
+
+            for (int i = 0; i < messagesPerRun; ++i) {
+                Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+                assertNotNull("Should have received message " + (i + 1) + " of 
" + messagesPerRun + " in run " + (j + 1), message);
+                received++;
+                assertTrue(Arrays.equals(payload, message.getPayload()));
+                message.ack();
             }
-        } catch (Exception exception) {
-            LOG.error("unexpected exception", exception);
-            exception.printStackTrace();
+            connectionSub.disconnect();
+
+            // Wait for broker to process disconnect before next iteration 
publishes
+            assertTrue("Subscription should become inactive",
+                    Wait.waitFor(() -> isSubscriptionInactive(topics[0], 
mqttSub.getClientId().toString()),
+                            TimeUnit.SECONDS.toMillis(10), 100));
         }
         assertEquals("Should have received " + (messagesPerRun * (numberOfRuns 
+ 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
     }
@@ -1727,6 +1729,20 @@ public class MQTTTest extends MQTTTestSupport {
         }
     }
 
+    private boolean isSubscriptionActive(Topic topic, String clientId) throws 
Exception {
+        if (isVirtualTopicSubscriptionStrategy()) {
+            String queueName = buildVirtualTopicQueueName(topic, clientId);
+            try {
+                return getProxyToQueue(queueName).getConsumerCount() > 0;
+            } catch (Exception ignore) {
+                return false;
+            }
+        } else {
+            return 
brokerService.getAdminView().getDurableTopicSubscribers().length == 1 &&
+                   
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
+        }
+    }
+
     private boolean isVirtualTopicSubscriptionStrategy() {
         String config = getProtocolConfig();
         return config != null && 
config.contains("mqtt-virtual-topic-subscriptions");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to