This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3cf670d8d3 ARTEMIS-5467 high CPU usage in OutboundStore.generateMqttId
3cf670d8d3 is described below

commit 3cf670d8d3cff7e13d91d52553b0f388ff3490f9
Author: Justin Bertram <[email protected]>
AuthorDate: Tue May 13 11:32:18 2025 -0500

    ARTEMIS-5467 high CPU usage in OutboundStore.generateMqttId
---
 .../artemis/core/protocol/mqtt/MQTTBundle.java     |  3 +
 .../core/protocol/mqtt/MQTTSessionState.java       | 25 +++++---
 .../core/protocol/mqtt/MQTTSessionStateTest.java   | 66 ++++++++++++++++++++++
 3 files changed, 87 insertions(+), 7 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java
index 10bed9c38d..42e5d472e5 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java
@@ -30,4 +30,7 @@ public interface MQTTBundle {
 
    @Message(id = 850000, value = "Unable to store MQTT state within given 
timeout: {}ms")
    IllegalStateException unableToStoreMqttState(long timeout);
+
+   @Message(id = 850001, value = "Unable to generate MQTT packet ID. All valid 
values between 1 and 65535 are in use. IDs will become available as messages 
are acknowledged by the client that has received them.")
+   IllegalStateException unableToGenerateID();
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 1535c7792a..6d7a53daf0 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,7 +41,6 @@ import 
org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.settings.impl.Match;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 public class MQTTSessionState {
 
@@ -450,7 +450,9 @@ public class MQTTSessionState {
 
       private final Object dataStoreLock = new Object();
 
-      private int currentId = 0;
+      private final int INITIAL_ID = 0;
+
+      private int currentId = INITIAL_ID;
 
       private Pair<Long, Long> generateKey(long messageId, long consumerID) {
          return new Pair<>(messageId, consumerID);
@@ -460,13 +462,22 @@ public class MQTTSessionState {
          synchronized (dataStoreLock) {
             Integer id = artemisToMqttMessageMap.get(generateKey(messageId, 
consumerId));
             if (id == null) {
+               final int start = currentId;
                do {
-                  if (currentId == MQTTUtil.TWO_BYTE_INT_MAX) {
-                     currentId = 0;
+                  // wrap around to the start if we reach the max
+                  if (++currentId > MQTTUtil.TWO_BYTE_INT_MAX) {
+                     currentId = INITIAL_ID;
+                  }
+                  // check to see if we looped all the way back around to 
where we started
+                  if (start == currentId) {
+                     // this detects an edge case where the same ID is acked & 
then generated again
+                     if (currentId != INITIAL_ID && 
!mqttToServerIds.containsKey(currentId)) {
+                        break;
+                     }
+                     throw MQTTBundle.BUNDLE.unableToGenerateID();
                   }
-                  ++currentId;
                }
-               while (mqttToServerIds.containsKey(currentId));
+               while (mqttToServerIds.containsKey(currentId) || currentId == 
INITIAL_ID);
                id = currentId;
             }
             return id;
@@ -515,7 +526,7 @@ public class MQTTSessionState {
          synchronized (dataStoreLock) {
             artemisToMqttMessageMap.clear();
             mqttToServerIds.clear();
-            currentId = 0;
+            currentId = INITIAL_ID;
          }
       }
    }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionStateTest.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionStateTest.java
new file mode 100644
index 0000000000..7e37bd3e9a
--- /dev/null
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionStateTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class MQTTSessionStateTest {
+
+   @Test
+   public void testGenerateMqttIdOrder() {
+      MQTTSessionState mqttSessionState = new 
MQTTSessionState(RandomUtil.randomUUIDString());
+      for (int i = 1; i <= MQTTUtil.TWO_BYTE_INT_MAX; i++) {
+         assertEquals(i, 
mqttSessionState.getOutboundStore().generateMqttId(RandomUtil.randomLong(), 
RandomUtil.randomLong()));
+         mqttSessionState.getOutboundStore().publish(i, 
RandomUtil.randomLong(), RandomUtil.randomLong());
+      }
+   }
+
+   @Test
+   public void testGenerateMqttIdWithRandomAcks() {
+      MQTTSessionState mqttSessionState = 
createMqttSessionStateWithFullOutboundStore();
+      for (int i = 0; i < 10_000; i++) {
+         int random = RandomUtil.randomInterval(1, MQTTUtil.TWO_BYTE_INT_MAX);
+         // acknowledge a random ID
+         mqttSessionState.getOutboundStore().publishAckd(random);
+         // the ID that was acked is now the only one available so ensure 
generator finds it
+         assertEquals(random, 
mqttSessionState.getOutboundStore().generateMqttId(RandomUtil.randomLong(), 
RandomUtil.randomLong()));
+         // "publish" a new message with the ID to ensure the store is full 
for the next loop
+         mqttSessionState.getOutboundStore().publish(random, 
RandomUtil.randomLong(), RandomUtil.randomLong());
+      }
+   }
+
+   @Test
+   public void testGenerateMqttIdExhausted() {
+      MQTTSessionState mqttSessionState = 
createMqttSessionStateWithFullOutboundStore();
+      // ensure we throw an exception when we try generate a new ID once the 
store is full
+      assertThrows(IllegalStateException.class, () -> 
mqttSessionState.getOutboundStore().generateMqttId(RandomUtil.randomLong(), 
RandomUtil.randomLong()));
+   }
+
+   private static MQTTSessionState 
createMqttSessionStateWithFullOutboundStore() {
+      MQTTSessionState mqttSessionState = new 
MQTTSessionState(RandomUtil.randomUUIDString());
+      for (int i = 1; i <= MQTTUtil.TWO_BYTE_INT_MAX; i++) {
+         mqttSessionState.getOutboundStore().publish(i, 
RandomUtil.randomLong(), RandomUtil.randomLong());
+      }
+      return mqttSessionState;
+   }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to