This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ac202810f9 ARTEMIS-5469 reformulate previous solution
ac202810f9 is described below

commit ac202810f965b81cf7a0e9fd6a8cc981a16769c6
Author: Justin Bertram <[email protected]>
AuthorDate: Thu May 15 11:19:41 2025 -0500

    ARTEMIS-5469 reformulate previous solution
    
    The previously committed solution to this problem caused
    `testPacketIdGeneratorNonCleanSession` in `o.a.a.a.t.i.m.MQTTTest` to
    fail. The problem with that solution is that it cleared out too much
    state.
    
    This commit fixes that by only clearing out the send quota and leaving
    the other state in tact.
---
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  2 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  2 +-
 .../core/protocol/mqtt/MQTTSessionState.java       | 25 ++++++++++++++++------
 .../artemis/tests/integration/mqtt5/MQTT5Test.java |  4 ++--
 4 files changed, 23 insertions(+), 10 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 769156c9ed..1f8b9ecd4c 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -133,7 +133,7 @@ public class MQTTSession {
          state.setAttached(false);
          state.setDisconnectedTime(System.currentTimeMillis());
          state.clearTopicAliases();
-         state.getOutboundStore().clear();
+         state.getOutboundStore().resetSendQuota();
 
          if (getVersion() == MQTTVersion.MQTT_5) {
             if (state.getClientSessionExpiryInterval() == 0) {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 08ef7976ba..7b4b03aea0 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -107,7 +107,7 @@ public class MQTTSessionCallback implements SessionCallback 
{
        *
        * Therefore, enforce flow-control based on the number of pending QoS 1 
& 2 messages
        */
-      if (ref != null && ref.isDurable() == true && 
connection.getReceiveMaximum() != -1 && 
session.getState().getOutboundStore().getPendingMessages() >= 
connection.getReceiveMaximum()) {
+      if (ref != null && ref.isDurable() == true && 
connection.getReceiveMaximum() != -1 && 
session.getState().getOutboundStore().getSendQuota() >= 
connection.getReceiveMaximum()) {
          return false;
       } else {
          return true;
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 6d7a53daf0..7cde4206da 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -454,6 +454,9 @@ public class MQTTSessionState {
 
       private int currentId = INITIAL_ID;
 
+      // track send quota independently because it's reset when the client 
disconnects, but other state must remain in tact
+      private int sendQuota = 0;
+
       private Pair<Long, Long> generateKey(long messageId, long consumerID) {
          return new Pair<>(messageId, consumerID);
       }
@@ -489,6 +492,7 @@ public class MQTTSessionState {
             Pair<Long, Long> key = generateKey(messageId, consumerId);
             artemisToMqttMessageMap.put(key, mqtt);
             mqttToServerIds.put(mqtt, key);
+            sendQuota++;
          }
       }
 
@@ -496,6 +500,7 @@ public class MQTTSessionState {
          synchronized (dataStoreLock) {
             Pair<Long, Long> p = mqttToServerIds.remove(mqtt);
             if (p != null) {
+               sendQuota--;
                artemisToMqttMessageMap.remove(p);
             }
             return p;
@@ -509,6 +514,7 @@ public class MQTTSessionState {
       public void publishReleasedSent(int mqttId, long serverMessageId) {
          synchronized (dataStoreLock) {
             mqttToServerIds.put(mqttId, new Pair<>(serverMessageId, 0L));
+            sendQuota++;
          }
       }
 
@@ -516,17 +522,24 @@ public class MQTTSessionState {
          return publishAckd(mqtt);
       }
 
-      public int getPendingMessages() {
-         synchronized (dataStoreLock) {
-            return mqttToServerIds.size();
-         }
-      }
-
       public void clear() {
          synchronized (dataStoreLock) {
             artemisToMqttMessageMap.clear();
             mqttToServerIds.clear();
             currentId = INITIAL_ID;
+            sendQuota = 0;
+         }
+      }
+
+      public int getSendQuota() {
+         synchronized (dataStoreLock) {
+            return sendQuota;
+         }
+      }
+
+      public void resetSendQuota() {
+         synchronized (dataStoreLock) {
+            sendQuota = 0;
          }
       }
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index ef1ddd2396..d23a3959d8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -847,13 +847,13 @@ public class MQTT5Test extends MQTT5TestSupport {
       // ensure subscriber got message and ack was blocked
       assertTrue(subscriberLatch.await(500, TimeUnit.MILLISECONDS));
       assertTrue(interceptorBlockedLatch.await(500, TimeUnit.MILLISECONDS));
-      Wait.assertEquals(1L, () -> 
mqttSessionState.getOutboundStore().getPendingMessages(), 2000, 10);
+      Wait.assertEquals(1L, () -> 
mqttSessionState.getOutboundStore().getSendQuota(), 2000, 10);
       pendingCountCheckLatch.countDown();
 
       // disconnect subscriber
       subscriber.disconnect();
       Wait.assertFalse(() -> mqttSessionState.isAttached(), 2000, 50);
-      assertEquals(0, 
mqttSessionState.getOutboundStore().getPendingMessages());
+      assertEquals(0, mqttSessionState.getOutboundStore().getSendQuota());
       assertEquals(1L, subscriptionQueue.getMessageCount());
       assertEquals(0L, subscriptionQueue.getMessagesAcknowledged());
       assertEquals(0L, subscriptionQueue.getConsumerCount());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to