This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 77d2327ba7 ARTEMIS-5469 clear MQTT send quota when session stops
77d2327ba7 is described below

commit 77d2327ba722083d54c9583153c097b2da9ce40f
Author: Justin Bertram <[email protected]>
AuthorDate: Wed May 14 12:08:26 2025 -0500

    ARTEMIS-5469 clear MQTT send quota when session stops
---
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  1 +
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 94 ++++++++++++++++++++--
 2 files changed, 88 insertions(+), 7 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index be40225d5d..769156c9ed 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -133,6 +133,7 @@ public class MQTTSession {
          state.setAttached(false);
          state.setDisconnectedTime(System.currentTimeMillis());
          state.clearTopicAliases();
+         state.getOutboundStore().clear();
 
          if (getVersion() == MQTTVersion.MQTT_5) {
             if (state.getClientSessionExpiryInterval() == 0) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 5829bfdbde..ef1ddd2396 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -16,13 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt5;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
 import javax.jms.JMSConsumer;
 import javax.jms.JMSContext;
 import javax.jms.Message;
@@ -35,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import io.netty.handler.codec.mqtt.MqttMessageType;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -43,9 +37,11 @@ import 
org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
 import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionAccessor;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -69,6 +65,13 @@ import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 /**
  * General tests for things not covered directly in the specification.
  */
@@ -791,4 +794,81 @@ public class MQTT5Test extends MQTT5TestSupport {
       }
       client.close();
    }
+
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void testFlowControlAfterDisconnect() throws Exception {
+      final String topic = getName();
+      final String clientId = "subscriber";
+
+      // set up interceptor to block first PUBACK; this simulates a client 
that doesn't ack the QoS 1 message
+      CountDownLatch interceptorBlockedLatch = new CountDownLatch(1);
+      CountDownLatch pendingCountCheckLatch = new CountDownLatch(1);
+      MQTTInterceptor incomingInterceptor = (packet, connection) -> {
+         if (packet.fixedHeader().messageType() == MqttMessageType.PUBACK && 
interceptorBlockedLatch.getCount() > 0) {
+            interceptorBlockedLatch.countDown();
+            try {
+               // as soon as we return false here the broker will disconnect 
the client which will trigger a clean-up
+               // but we want to perform a check before that happens
+               pendingCountCheckLatch.await(2000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+               // ignore
+            }
+            return false;
+         }
+         return true;
+      };
+      server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
+
+      // set up subscriber
+      CountDownLatch subscriberLatch = new CountDownLatch(1);
+      MqttClient subscriber = createPahoClient(clientId);
+      MqttConnectionOptions options = new MqttConnectionOptions();
+      options.setReceiveMaximum(1);
+      options.setCleanStart(false);
+      options.setSessionExpiryInterval(9999L);
+      subscriber.setCallback(new LatchedMqttCallback(subscriberLatch));
+      subscriber.connect(options);
+      subscriber.subscribe(topic, AT_LEAST_ONCE);
+      Queue subscriptionQueue = getSubscriptionQueue(topic, clientId);
+      assertNotNull(subscriptionQueue);
+      MQTTSessionState mqttSessionState = getSessionStates().get(clientId);
+      assertNotNull(mqttSessionState);
+      assertTrue(mqttSessionState.isAttached());
+      assertEquals(1L, subscriptionQueue.getConsumerCount());
+
+      // publish message
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1, 
false);
+      Wait.assertEquals(1L, () -> subscriptionQueue.getMessageCount(), 2000, 
50);
+      Wait.assertEquals(1L, () -> subscriptionQueue.getMessagesAdded(), 2000, 
50);
+
+      // ensure subscriber got message and ack was blocked
+      assertTrue(subscriberLatch.await(500, TimeUnit.MILLISECONDS));
+      assertTrue(interceptorBlockedLatch.await(500, TimeUnit.MILLISECONDS));
+      Wait.assertEquals(1L, () -> 
mqttSessionState.getOutboundStore().getPendingMessages(), 2000, 10);
+      pendingCountCheckLatch.countDown();
+
+      // disconnect subscriber
+      subscriber.disconnect();
+      Wait.assertFalse(() -> mqttSessionState.isAttached(), 2000, 50);
+      assertEquals(0, 
mqttSessionState.getOutboundStore().getPendingMessages());
+      assertEquals(1L, subscriptionQueue.getMessageCount());
+      assertEquals(0L, subscriptionQueue.getMessagesAcknowledged());
+      assertEquals(0L, subscriptionQueue.getConsumerCount());
+
+      // reconnect subscriber w/same options as before
+      subscriberLatch = new CountDownLatch(1);
+      subscriber.setCallback(new LatchedMqttCallback(subscriberLatch));
+      subscriber.connect(options);
+      assertTrue(mqttSessionState.isAttached());
+      subscriber.subscribe(topic, AT_LEAST_ONCE);
+      assertEquals(1L, subscriptionQueue.getConsumerCount());
+
+      // ensure subscriber got message and ack received successfully
+      assertTrue(subscriberLatch.await(500, TimeUnit.MILLISECONDS));
+      Wait.assertEquals(1L, () -> subscriptionQueue.getMessagesAcknowledged(), 
2000, 50);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000, 
50);
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to