This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c727a2d4f4 ARTEMIS-5121 improve perf of MQTT sub ID matching
     new 832a736c67 This closes #5719
c727a2d4f4 is described below

commit c727a2d4f4ec519788edd1810cf6f6f2e83910b8
Author: Evgeniy Devyatykh <[email protected]>
AuthorDate: Thu May 29 13:05:22 2025 +0500

    ARTEMIS-5121 improve perf of MQTT sub ID matching
---
 .../core/protocol/mqtt/MQTTSessionState.java       | 98 ++++++++++++++--------
 .../core/protocol/mqtt/MQTTStateManager.java       | 18 ++--
 .../protocol/mqtt/MQTTSubscriptionManager.java     |  8 +-
 .../artemis/core/protocol/mqtt/StateSerDeTest.java | 25 +++---
 4 files changed, 88 insertions(+), 61 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 7cde4206da..c29163fc03 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -52,7 +53,7 @@ public class MQTTSessionState {
 
    private final String clientId;
 
-   private final ConcurrentMap<String, Pair<MqttTopicSubscription, Integer>> 
subscriptions = new ConcurrentHashMap<>();
+   private final ConcurrentMap<String, SubscriptionItem> subscriptions = new 
ConcurrentHashMap<>();
 
    // Used to store Packet ID of Publish QoS1 and QoS2 message.  See spec: 
4.3.3 QoS 2: Exactly once delivery.  Method B.
    private final Map<Integer, MQTTMessageInfo> messageRefStore = new 
ConcurrentHashMap<>();
@@ -135,7 +136,7 @@ public class MQTTSessionState {
          MqttSubscriptionOption.RetainedHandlingPolicy retainedHandlingPolicy 
= MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(buf.readInt());
          Integer subscriptionId = buf.readNullableInt();
 
-         subscriptions.put(topicName, new Pair<>(new 
MqttTopicSubscription(topicName, new MqttSubscriptionOption(qos, nolocal, 
retainAsPublished, retainedHandlingPolicy)), subscriptionId));
+         subscriptions.put(topicName, new SubscriptionItem(new 
MqttTopicSubscription(topicName, new MqttSubscriptionOption(qos, nolocal, 
retainAsPublished, retainedHandlingPolicy)), subscriptionId));
       }
    }
 
@@ -186,35 +187,32 @@ public class MQTTSessionState {
 
    public Collection<MqttTopicSubscription> getSubscriptions() {
       Collection<MqttTopicSubscription> result = new HashSet<>();
-      for (Pair<MqttTopicSubscription, Integer> pair : subscriptions.values()) 
{
-         result.add(pair.getA());
+      for (SubscriptionItem item : subscriptions.values()) {
+         result.add(item.getSubscription());
       }
       return result;
    }
 
-   public Collection<Pair<MqttTopicSubscription, Integer>> 
getSubscriptionsPlusID() {
-      return subscriptions.values();
+   public Map<String, SubscriptionItem> getSubscriptionsPlusID() {
+      return new HashMap<>(subscriptions);
    }
 
    public boolean addSubscription(MqttTopicSubscription subscription, 
WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) 
throws Exception {
       // synchronized to prevent race with removeSubscription
       synchronized (subscriptions) {
-         
addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicName(),
 wildcardConfiguration), new ConcurrentHashMap<>());
+         
addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicFilter(),
 wildcardConfiguration), new ConcurrentHashMap<>());
 
-         Pair<MqttTopicSubscription, Integer> existingSubscription = 
subscriptions.get(subscription.topicName());
+         SubscriptionItem existingSubscription = 
subscriptions.get(subscription.topicFilter());
          if (existingSubscription != null) {
-            boolean updated = false;
-            if (subscription.qualityOfService().value() > 
existingSubscription.getA().qualityOfService().value()) {
-               existingSubscription.setA(subscription);
-               updated = true;
+            if (subscription.qualityOfService().value() > 
existingSubscription.getSubscription().qualityOfService().value()
+               || (subscriptionIdentifier != null && 
!Objects.equals(subscriptionIdentifier, existingSubscription.getId()))) {
+               existingSubscription.update(subscription, 
subscriptionIdentifier);
+               return true;
+            } else {
+               return false;
             }
-            if (subscriptionIdentifier != null && 
!subscriptionIdentifier.equals(existingSubscription.getB())) {
-               existingSubscription.setB(subscriptionIdentifier);
-               updated = true;
-            }
-            return updated;
          } else {
-            subscriptions.put(subscription.topicName(), new 
Pair<>(subscription, subscriptionIdentifier));
+            subscriptions.put(subscription.topicFilter(), new 
SubscriptionItem(subscription, subscriptionIdentifier));
             return true;
          }
       }
@@ -229,27 +227,23 @@ public class MQTTSessionState {
    }
 
    public MqttTopicSubscription getSubscription(String address) {
-      return subscriptions.get(address) != null ? 
subscriptions.get(address).getA() : null;
+      return subscriptions.get(address) != null ? 
subscriptions.get(address).getSubscription() : null;
    }
 
-   public Pair<MqttTopicSubscription, Integer> getSubscriptionPlusID(String 
address) {
-      return subscriptions.get(address) != null ? subscriptions.get(address) : 
null;
+   public SubscriptionItem getSubscriptionPlusID(String address) {
+      return subscriptions.get(address);
    }
 
    public List<Integer> getMatchingSubscriptionIdentifiers(String address) {
-      address = MQTTUtil.getMqttTopicFromCoreAddress(address, 
session.getServer().getConfiguration().getWildcardConfiguration());
+      String topic = MQTTUtil.getMqttTopicFromCoreAddress(address, 
session.getServer().getConfiguration().getWildcardConfiguration());
       List<Integer> result = null;
-      for (Pair<MqttTopicSubscription, Integer> pair : subscriptions.values()) 
{
-         Pattern pattern = Match.createPattern(pair.getA().topicName(), 
MQTTUtil.MQTT_WILDCARD, true);
-         boolean matches = pattern.matcher(address).matches();
-         logger.debug("Matching {} with {}: {}", address, pattern, matches);
-         if (matches) {
+      for (SubscriptionItem item : subscriptions.values()) {
+         Integer matchingId = item.getMatchingId(topic);
+         if (matchingId != null) {
             if (result == null) {
                result = new ArrayList<>();
             }
-            if (pair.getB() != null) {
-               result.add(pair.getB());
-            }
+            result.add(matchingId);
          }
       }
       return result;
@@ -443,14 +437,14 @@ public class MQTTSessionState {
          "]@" + System.identityHashCode(this);
    }
 
-   public class OutboundStore {
-      private Map<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new 
HashMap<>();
+   public static class OutboundStore {
+      private final Map<Pair<Long, Long>, Integer> artemisToMqttMessageMap = 
new HashMap<>();
 
-      private Map<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
+      private final Map<Integer, Pair<Long, Long>> mqttToServerIds = new 
HashMap<>();
 
       private final Object dataStoreLock = new Object();
 
-      private final int INITIAL_ID = 0;
+      private static final int INITIAL_ID = 0;
 
       private int currentId = INITIAL_ID;
 
@@ -565,4 +559,40 @@ public class MQTTSessionState {
          };
       }
    }
+
+   public static class SubscriptionItem {
+      private MqttTopicSubscription subscription;
+      private Integer id;
+      private Pattern topicFilterPattern;
+
+      public SubscriptionItem(MqttTopicSubscription subscription, Integer id) {
+         update(subscription, id);
+      }
+
+      public MqttTopicSubscription getSubscription() {
+         return subscription;
+      }
+
+      public Integer getId() {
+         return id;
+      }
+
+      public Integer getMatchingId(String topic) {
+         if (id != null && topicFilterPattern.matcher(topic).matches()) {
+            return id;
+         } else {
+            return null;
+         }
+      }
+
+      private void update(MqttTopicSubscription newSub, Integer newId) {
+         if (newId != null && !newId.equals(id)) {
+            if (this.topicFilterPattern == null || 
!subscription.topicFilter().equals(newSub.topicFilter())) {
+               topicFilterPattern = Match.createPattern(newSub.topicFilter(), 
MQTTUtil.MQTT_WILDCARD, true);
+            }
+         }
+         subscription = newSub;
+         id = newId;
+      }
+   }
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index d27d992d45..fc2513c918 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@@ -50,10 +48,10 @@ import org.slf4j.LoggerFactory;
 public class MQTTStateManager {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-   private ActiveMQServer server;
+   private final ActiveMQServer server;
    private final Map<String, MQTTSessionState> sessionStates = new 
ConcurrentHashMap<>();
    private final Queue sessionStore;
-   private static Map<Integer, MQTTStateManager> INSTANCES = new HashMap<>();
+   private static final Map<Integer, MQTTStateManager> INSTANCES = new 
HashMap<>();
    private final Map<String, MQTTConnection> connectedClients  = new 
ConcurrentHashMap<>();
    private final long timeout;
 
@@ -109,7 +107,7 @@ public class MQTTStateManager {
    }
 
    public void scanSessions() {
-      List<String> toRemove = new ArrayList();
+      List<String> toRemove = new ArrayList<>();
       for (Map.Entry<String, MQTTSessionState> entry : 
sessionStates.entrySet()) {
          MQTTSessionState state = entry.getValue();
          logger.debug("Inspecting session: {}", state);
@@ -199,7 +197,7 @@ public class MQTTStateManager {
       message.setAddress(MQTTUtil.MQTT_SESSION_STORE);
       message.setDurable(true);
       message.putStringProperty(Message.HDR_LAST_VALUE_NAME, 
state.getClientId());
-      Collection<Pair<MqttTopicSubscription, Integer>> subscriptions = 
state.getSubscriptionsPlusID();
+      Map<String, MQTTSessionState.SubscriptionItem> subscriptions = 
state.getSubscriptionsPlusID();
       ActiveMQBuffer buf = message.getBodyBuffer();
 
       /*
@@ -210,14 +208,14 @@ public class MQTTStateManager {
 
       buf.writeInt(subscriptions.size());
       logger.debug("Serializing {} subscriptions", subscriptions.size());
-      for (Pair<MqttTopicSubscription, Integer> pair : subscriptions) {
-         MqttTopicSubscription sub = pair.getA();
-         buf.writeString(sub.topicName());
+      for (MQTTSessionState.SubscriptionItem item : subscriptions.values()) {
+         MqttTopicSubscription sub = item.getSubscription();
+         buf.writeString(sub.topicFilter());
          buf.writeInt(sub.option().qos().value());
          buf.writeBoolean(sub.option().isNoLocal());
          buf.writeBoolean(sub.option().isRetainAsPublished());
          buf.writeInt(sub.option().retainHandling().value());
-         buf.writeNullableInt(pair.getB());
+         buf.writeNullableInt(item.getId());
       }
 
       return message;
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 69c5424e3a..68c2765fa8 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -100,9 +100,9 @@ public class MQTTSubscriptionManager {
    }
 
    private void addSubscription(MqttTopicSubscription subscription, Integer 
subscriptionIdentifier, boolean initialStart) throws Exception {
-      String rawTopicName = 
CompositeAddress.extractAddressName(subscription.topicName());
+      String rawTopicName = 
CompositeAddress.extractAddressName(subscription.topicFilter());
       String parsedTopicName = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
-      boolean isFullyQualified = 
CompositeAddress.isFullyQualified(subscription.topicName());
+      boolean isFullyQualified = 
CompositeAddress.isFullyQualified(subscription.topicFilter());
 
       Queue q = createQueueForSubscription(rawTopicName, parsedTopicName, 
isFullyQualified);
 
@@ -299,7 +299,7 @@ public class MQTTSubscriptionManager {
                addSubscription(subscriptions.get(i), subscriptionIdentifier, 
false);
                qos[i] = subscriptions.get(i).qualityOfService().value();
             } catch (ActiveMQSecurityException e) {
-               // user is not authorized to create subsription
+               // user is not authorized to create subscription
                if (session.getVersion() == MQTTVersion.MQTT_5) {
                   qos[i] = MQTTReasonCodes.NOT_AUTHORIZED;
                } else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
@@ -340,7 +340,7 @@ public class MQTTSubscriptionManager {
    void clean(boolean enforceSecurity) throws Exception {
       List<String> topics = new ArrayList<>();
       for (MqttTopicSubscription mqttTopicSubscription : 
session.getState().getSubscriptions()) {
-         topics.add(mqttTopicSubscription.topicName());
+         topics.add(mqttTopicSubscription.topicFilter());
       }
       removeSubscriptions(topics, enforceSecurity);
    }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
index e1721a8e54..1d5630e99a 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
@@ -18,18 +18,17 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
-import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class StateSerDeTest {
 
    @Test
@@ -52,12 +51,12 @@ public class StateSerDeTest {
          MQTTSessionState deserialized = new MQTTSessionState(serializedState);
 
          assertEquals(unserialized.getClientId(), deserialized.getClientId());
-         for (Pair<MqttTopicSubscription, Integer> unserializedEntry : 
unserialized.getSubscriptionsPlusID()) {
-            MqttTopicSubscription unserializedSub = unserializedEntry.getA();
-            Integer unserializedSubId = unserializedEntry.getB();
-            Pair<MqttTopicSubscription, Integer> deserializedEntry = 
deserialized.getSubscriptionPlusID(unserializedSub.topicName());
-            MqttTopicSubscription deserializedSub = deserializedEntry.getA();
-            Integer deserializedSubId = deserializedEntry.getB();
+         for (MQTTSessionState.SubscriptionItem unserializedItem : 
unserialized.getSubscriptionsPlusID().values()) {
+            MqttTopicSubscription unserializedSub = 
unserializedItem.getSubscription();
+            Integer unserializedSubId = unserializedItem.getId();
+            MQTTSessionState.SubscriptionItem deserializedEntry = 
deserialized.getSubscriptionPlusID(unserializedSub.topicFilter());
+            MqttTopicSubscription deserializedSub = 
deserializedEntry.getSubscription();
+            Integer deserializedSubId = deserializedEntry.getId();
 
             assertTrue(compareSubs(unserializedSub, deserializedSub));
             assertEquals(unserializedSubId, deserializedSubId);
@@ -72,11 +71,11 @@ public class StateSerDeTest {
       if (a == null || b == null) {
          return false;
       }
-      if (a.topicName() == null) {
-         if (b.topicName() != null) {
+      if (a.topicFilter() == null) {
+         if (b.topicFilter() != null) {
             return false;
          }
-      } else if (!a.topicName().equals(b.topicName())) {
+      } else if (!a.topicFilter().equals(b.topicFilter())) {
          return false;
       }
       if (a.option() == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to