This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 77d2327ba7 ARTEMIS-5469 clear MQTT send quota when session stops
77d2327ba7 is described below
commit 77d2327ba722083d54c9583153c097b2da9ce40f
Author: Justin Bertram <[email protected]>
AuthorDate: Wed May 14 12:08:26 2025 -0500
ARTEMIS-5469 clear MQTT send quota when session stops
---
.../artemis/core/protocol/mqtt/MQTTSession.java | 1 +
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 94 ++++++++++++++++++++--
2 files changed, 88 insertions(+), 7 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index be40225d5d..769156c9ed 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -133,6 +133,7 @@ public class MQTTSession {
state.setAttached(false);
state.setDisconnectedTime(System.currentTimeMillis());
state.clearTopicAliases();
+ state.getOutboundStore().clear();
if (getVersion() == MQTTVersion.MQTT_5) {
if (state.getClientSessionExpiryInterval() == 0) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 5829bfdbde..ef1ddd2396 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -16,13 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt5;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Message;
@@ -35,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import io.netty.handler.codec.mqtt.MqttMessageType;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -43,9 +37,11 @@ import
org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionAccessor;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerSession;
@@ -69,6 +65,13 @@ import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
/**
* General tests for things not covered directly in the specification.
*/
@@ -791,4 +794,81 @@ public class MQTT5Test extends MQTT5TestSupport {
}
client.close();
}
+
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void testFlowControlAfterDisconnect() throws Exception {
+ final String topic = getName();
+ final String clientId = "subscriber";
+
+ // set up interceptor to block first PUBACK; this simulates a client
that doesn't ack the QoS 1 message
+ CountDownLatch interceptorBlockedLatch = new CountDownLatch(1);
+ CountDownLatch pendingCountCheckLatch = new CountDownLatch(1);
+ MQTTInterceptor incomingInterceptor = (packet, connection) -> {
+ if (packet.fixedHeader().messageType() == MqttMessageType.PUBACK &&
interceptorBlockedLatch.getCount() > 0) {
+ interceptorBlockedLatch.countDown();
+ try {
+ // as soon as we return false here the broker will disconnect
the client which will trigger a clean-up
+ // but we want to perform a check before that happens
+ pendingCountCheckLatch.await(2000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return false;
+ }
+ return true;
+ };
+ server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
+
+ // set up subscriber
+ CountDownLatch subscriberLatch = new CountDownLatch(1);
+ MqttClient subscriber = createPahoClient(clientId);
+ MqttConnectionOptions options = new MqttConnectionOptions();
+ options.setReceiveMaximum(1);
+ options.setCleanStart(false);
+ options.setSessionExpiryInterval(9999L);
+ subscriber.setCallback(new LatchedMqttCallback(subscriberLatch));
+ subscriber.connect(options);
+ subscriber.subscribe(topic, AT_LEAST_ONCE);
+ Queue subscriptionQueue = getSubscriptionQueue(topic, clientId);
+ assertNotNull(subscriptionQueue);
+ MQTTSessionState mqttSessionState = getSessionStates().get(clientId);
+ assertNotNull(mqttSessionState);
+ assertTrue(mqttSessionState.isAttached());
+ assertEquals(1L, subscriptionQueue.getConsumerCount());
+
+ // publish message
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1,
false);
+ Wait.assertEquals(1L, () -> subscriptionQueue.getMessageCount(), 2000,
50);
+ Wait.assertEquals(1L, () -> subscriptionQueue.getMessagesAdded(), 2000,
50);
+
+ // ensure subscriber got message and ack was blocked
+ assertTrue(subscriberLatch.await(500, TimeUnit.MILLISECONDS));
+ assertTrue(interceptorBlockedLatch.await(500, TimeUnit.MILLISECONDS));
+ Wait.assertEquals(1L, () ->
mqttSessionState.getOutboundStore().getPendingMessages(), 2000, 10);
+ pendingCountCheckLatch.countDown();
+
+ // disconnect subscriber
+ subscriber.disconnect();
+ Wait.assertFalse(() -> mqttSessionState.isAttached(), 2000, 50);
+ assertEquals(0,
mqttSessionState.getOutboundStore().getPendingMessages());
+ assertEquals(1L, subscriptionQueue.getMessageCount());
+ assertEquals(0L, subscriptionQueue.getMessagesAcknowledged());
+ assertEquals(0L, subscriptionQueue.getConsumerCount());
+
+ // reconnect subscriber w/same options as before
+ subscriberLatch = new CountDownLatch(1);
+ subscriber.setCallback(new LatchedMqttCallback(subscriberLatch));
+ subscriber.connect(options);
+ assertTrue(mqttSessionState.isAttached());
+ subscriber.subscribe(topic, AT_LEAST_ONCE);
+ assertEquals(1L, subscriptionQueue.getConsumerCount());
+
+ // ensure subscriber got message and ack received successfully
+ assertTrue(subscriberLatch.await(500, TimeUnit.MILLISECONDS));
+ Wait.assertEquals(1L, () -> subscriptionQueue.getMessagesAcknowledged(),
2000, 50);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000,
50);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact