This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 1f3b4cc017 fix(test): fix MQTTTest#testReceiveMessageSentWhileOffline
flakiness (#1751)
1f3b4cc017 is described below
commit 1f3b4cc017c8c1ded35301d1ae18c6ea837c2181
Author: JB Onofré <[email protected]>
AuthorDate: Tue Mar 10 22:49:11 2026 +0100
fix(test): fix MQTTTest#testReceiveMessageSentWhileOffline flakiness (#1751)
* fix(test): fix MQTTTest#testReceiveMessageSentWhileOffline flakiness
subscribe() returns on SUBACK but the broker processes ConsumerInfo
asynchronously, so the durable subscription may not be fully reactivated
when receive() is called. Add Wait.waitFor on isSubscriptionActive()
after subscribe to ensure the broker has activated the subscription
before attempting to receive queued messages.
Also remove the try/catch that silently swallowed assertion failures
mid-loop, which masked the real failure with a misleading count mismatch.
* fix(test): increase subscription activation timeout to 30s in MQTTTest
The NIO+SSL transport variant (MQTTNIOSSLTest) can be slower to
reactivate subscriptions under CI load, causing the 15s timeout to
be insufficient.
---
.../apache/activemq/transport/mqtt/MQTTTest.java | 68 +++++++++++++---------
1 file changed, 42 insertions(+), 26 deletions(-)
diff --git
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 1f95b03e8d..6729a968d7 100644
---
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1640,7 +1640,7 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals("Should have received " + topics.length + " messages",
topics.length, received);
}
- @Test(timeout = 60 * 1000)
+ @Test(timeout = 2 * 60 * 1000)
public void testReceiveMessageSentWhileOffline() throws Exception {
final byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++) {
@@ -1679,36 +1679,38 @@ public class MQTTTest extends MQTTTestSupport {
// Wait for broker to process disconnect before publishing messages
for offline delivery.
assertTrue("Subscription should become inactive",
Wait.waitFor(() -> isSubscriptionInactive(topics[0],
mqttSub.getClientId().toString()),
- TimeUnit.SECONDS.toMillis(5), 100));
-
- try {
- for (int j = 0; j < numberOfRuns; j++) {
+ TimeUnit.SECONDS.toMillis(10), 100));
- for (int i = 0; i < messagesPerRun; ++i) {
- connectionPub.publish(topics[0].name().toString(),
payload, QoS.AT_LEAST_ONCE, false);
- }
+ for (int j = 0; j < numberOfRuns; j++) {
- connectionSub = mqttSub.blockingConnection();
- connectionSub.connect();
- connectionSub.subscribe(topics);
+ for (int i = 0; i < messagesPerRun; ++i) {
+ connectionPub.publish(topics[0].name().toString(), payload,
QoS.AT_LEAST_ONCE, false);
+ }
- for (int i = 0; i < messagesPerRun; ++i) {
- Message message = connectionSub.receive(5,
TimeUnit.SECONDS);
- assertNotNull(message);
- received++;
- assertTrue(Arrays.equals(payload, message.getPayload()));
- message.ack();
- }
- connectionSub.disconnect();
+ connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ connectionSub.subscribe(topics);
- // Wait for broker to process disconnect before next iteration
publishes
- assertTrue("Subscription should become inactive",
- Wait.waitFor(() -> isSubscriptionInactive(topics[0],
mqttSub.getClientId().toString()),
- TimeUnit.SECONDS.toMillis(5), 100));
+ // Wait for broker to fully activate the subscription and start
dispatching
+ // queued messages. subscribe() returns on SUBACK but broker
processes the
+ // ConsumerInfo asynchronously, so messages may not be ready for
dispatch yet.
+ assertTrue("Subscription should become active in run " + (j + 1),
+ Wait.waitFor(() -> isSubscriptionActive(topics[0],
mqttSub.getClientId().toString()),
+ TimeUnit.SECONDS.toMillis(30), 100));
+
+ for (int i = 0; i < messagesPerRun; ++i) {
+ Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Should have received message " + (i + 1) + " of
" + messagesPerRun + " in run " + (j + 1), message);
+ received++;
+ assertTrue(Arrays.equals(payload, message.getPayload()));
+ message.ack();
}
- } catch (Exception exception) {
- LOG.error("unexpected exception", exception);
- exception.printStackTrace();
+ connectionSub.disconnect();
+
+ // Wait for broker to process disconnect before next iteration
publishes
+ assertTrue("Subscription should become inactive",
+ Wait.waitFor(() -> isSubscriptionInactive(topics[0],
mqttSub.getClientId().toString()),
+ TimeUnit.SECONDS.toMillis(10), 100));
}
assertEquals("Should have received " + (messagesPerRun * (numberOfRuns
+ 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
}
@@ -1727,6 +1729,20 @@ public class MQTTTest extends MQTTTestSupport {
}
}
+ private boolean isSubscriptionActive(Topic topic, String clientId) throws
Exception {
+ if (isVirtualTopicSubscriptionStrategy()) {
+ String queueName = buildVirtualTopicQueueName(topic, clientId);
+ try {
+ return getProxyToQueue(queueName).getConsumerCount() > 0;
+ } catch (Exception ignore) {
+ return false;
+ }
+ } else {
+ return
brokerService.getAdminView().getDurableTopicSubscribers().length == 1 &&
+
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
+ }
+ }
+
private boolean isVirtualTopicSubscriptionStrategy() {
String config = getProtocolConfig();
return config != null &&
config.contains("mqtt-virtual-topic-subscriptions");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact