This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c727a2d4f4 ARTEMIS-5121 improve perf of MQTT sub ID matching
new 832a736c67 This closes #5719
c727a2d4f4 is described below
commit c727a2d4f4ec519788edd1810cf6f6f2e83910b8
Author: Evgeniy Devyatykh <[email protected]>
AuthorDate: Thu May 29 13:05:22 2025 +0500
ARTEMIS-5121 improve perf of MQTT sub ID matching
---
.../core/protocol/mqtt/MQTTSessionState.java | 98 ++++++++++++++--------
.../core/protocol/mqtt/MQTTStateManager.java | 18 ++--
.../protocol/mqtt/MQTTSubscriptionManager.java | 8 +-
.../artemis/core/protocol/mqtt/StateSerDeTest.java | 25 +++---
4 files changed, 88 insertions(+), 61 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 7cde4206da..c29163fc03 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -52,7 +53,7 @@ public class MQTTSessionState {
private final String clientId;
- private final ConcurrentMap<String, Pair<MqttTopicSubscription, Integer>>
subscriptions = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, SubscriptionItem> subscriptions = new
ConcurrentHashMap<>();
// Used to store Packet ID of Publish QoS1 and QoS2 message. See spec:
4.3.3 QoS 2: Exactly once delivery. Method B.
private final Map<Integer, MQTTMessageInfo> messageRefStore = new
ConcurrentHashMap<>();
@@ -135,7 +136,7 @@ public class MQTTSessionState {
MqttSubscriptionOption.RetainedHandlingPolicy retainedHandlingPolicy
= MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(buf.readInt());
Integer subscriptionId = buf.readNullableInt();
- subscriptions.put(topicName, new Pair<>(new
MqttTopicSubscription(topicName, new MqttSubscriptionOption(qos, nolocal,
retainAsPublished, retainedHandlingPolicy)), subscriptionId));
+ subscriptions.put(topicName, new SubscriptionItem(new
MqttTopicSubscription(topicName, new MqttSubscriptionOption(qos, nolocal,
retainAsPublished, retainedHandlingPolicy)), subscriptionId));
}
}
@@ -186,35 +187,32 @@ public class MQTTSessionState {
public Collection<MqttTopicSubscription> getSubscriptions() {
Collection<MqttTopicSubscription> result = new HashSet<>();
- for (Pair<MqttTopicSubscription, Integer> pair : subscriptions.values())
{
- result.add(pair.getA());
+ for (SubscriptionItem item : subscriptions.values()) {
+ result.add(item.getSubscription());
}
return result;
}
- public Collection<Pair<MqttTopicSubscription, Integer>>
getSubscriptionsPlusID() {
- return subscriptions.values();
+ public Map<String, SubscriptionItem> getSubscriptionsPlusID() {
+ return new HashMap<>(subscriptions);
}
public boolean addSubscription(MqttTopicSubscription subscription,
WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier)
throws Exception {
// synchronized to prevent race with removeSubscription
synchronized (subscriptions) {
-
addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicName(),
wildcardConfiguration), new ConcurrentHashMap<>());
+
addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicFilter(),
wildcardConfiguration), new ConcurrentHashMap<>());
- Pair<MqttTopicSubscription, Integer> existingSubscription =
subscriptions.get(subscription.topicName());
+ SubscriptionItem existingSubscription =
subscriptions.get(subscription.topicFilter());
if (existingSubscription != null) {
- boolean updated = false;
- if (subscription.qualityOfService().value() >
existingSubscription.getA().qualityOfService().value()) {
- existingSubscription.setA(subscription);
- updated = true;
+ if (subscription.qualityOfService().value() >
existingSubscription.getSubscription().qualityOfService().value()
+ || (subscriptionIdentifier != null &&
!Objects.equals(subscriptionIdentifier, existingSubscription.getId()))) {
+ existingSubscription.update(subscription,
subscriptionIdentifier);
+ return true;
+ } else {
+ return false;
}
- if (subscriptionIdentifier != null &&
!subscriptionIdentifier.equals(existingSubscription.getB())) {
- existingSubscription.setB(subscriptionIdentifier);
- updated = true;
- }
- return updated;
} else {
- subscriptions.put(subscription.topicName(), new
Pair<>(subscription, subscriptionIdentifier));
+ subscriptions.put(subscription.topicFilter(), new
SubscriptionItem(subscription, subscriptionIdentifier));
return true;
}
}
@@ -229,27 +227,23 @@ public class MQTTSessionState {
}
public MqttTopicSubscription getSubscription(String address) {
- return subscriptions.get(address) != null ?
subscriptions.get(address).getA() : null;
+ return subscriptions.get(address) != null ?
subscriptions.get(address).getSubscription() : null;
}
- public Pair<MqttTopicSubscription, Integer> getSubscriptionPlusID(String
address) {
- return subscriptions.get(address) != null ? subscriptions.get(address) :
null;
+ public SubscriptionItem getSubscriptionPlusID(String address) {
+ return subscriptions.get(address);
}
public List<Integer> getMatchingSubscriptionIdentifiers(String address) {
- address = MQTTUtil.getMqttTopicFromCoreAddress(address,
session.getServer().getConfiguration().getWildcardConfiguration());
+ String topic = MQTTUtil.getMqttTopicFromCoreAddress(address,
session.getServer().getConfiguration().getWildcardConfiguration());
List<Integer> result = null;
- for (Pair<MqttTopicSubscription, Integer> pair : subscriptions.values())
{
- Pattern pattern = Match.createPattern(pair.getA().topicName(),
MQTTUtil.MQTT_WILDCARD, true);
- boolean matches = pattern.matcher(address).matches();
- logger.debug("Matching {} with {}: {}", address, pattern, matches);
- if (matches) {
+ for (SubscriptionItem item : subscriptions.values()) {
+ Integer matchingId = item.getMatchingId(topic);
+ if (matchingId != null) {
if (result == null) {
result = new ArrayList<>();
}
- if (pair.getB() != null) {
- result.add(pair.getB());
- }
+ result.add(matchingId);
}
}
return result;
@@ -443,14 +437,14 @@ public class MQTTSessionState {
"]@" + System.identityHashCode(this);
}
- public class OutboundStore {
- private Map<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new
HashMap<>();
+ public static class OutboundStore {
+ private final Map<Pair<Long, Long>, Integer> artemisToMqttMessageMap =
new HashMap<>();
- private Map<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
+ private final Map<Integer, Pair<Long, Long>> mqttToServerIds = new
HashMap<>();
private final Object dataStoreLock = new Object();
- private final int INITIAL_ID = 0;
+ private static final int INITIAL_ID = 0;
private int currentId = INITIAL_ID;
@@ -565,4 +559,40 @@ public class MQTTSessionState {
};
}
}
+
+ public static class SubscriptionItem {
+ private MqttTopicSubscription subscription;
+ private Integer id;
+ private Pattern topicFilterPattern;
+
+ public SubscriptionItem(MqttTopicSubscription subscription, Integer id) {
+ update(subscription, id);
+ }
+
+ public MqttTopicSubscription getSubscription() {
+ return subscription;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public Integer getMatchingId(String topic) {
+ if (id != null && topicFilterPattern.matcher(topic).matches()) {
+ return id;
+ } else {
+ return null;
+ }
+ }
+
+ private void update(MqttTopicSubscription newSub, Integer newId) {
+ if (newId != null && !newId.equals(id)) {
+ if (this.topicFilterPattern == null ||
!subscription.topicFilter().equals(newSub.topicFilter())) {
+ topicFilterPattern = Match.createPattern(newSub.topicFilter(),
MQTTUtil.MQTT_WILDCARD, true);
+ }
+ }
+ subscription = newSub;
+ id = newId;
+ }
+ }
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index d27d992d45..fc2513c918 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@@ -50,10 +48,10 @@ import org.slf4j.LoggerFactory;
public class MQTTStateManager {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private ActiveMQServer server;
+ private final ActiveMQServer server;
private final Map<String, MQTTSessionState> sessionStates = new
ConcurrentHashMap<>();
private final Queue sessionStore;
- private static Map<Integer, MQTTStateManager> INSTANCES = new HashMap<>();
+ private static final Map<Integer, MQTTStateManager> INSTANCES = new
HashMap<>();
private final Map<String, MQTTConnection> connectedClients = new
ConcurrentHashMap<>();
private final long timeout;
@@ -109,7 +107,7 @@ public class MQTTStateManager {
}
public void scanSessions() {
- List<String> toRemove = new ArrayList();
+ List<String> toRemove = new ArrayList<>();
for (Map.Entry<String, MQTTSessionState> entry :
sessionStates.entrySet()) {
MQTTSessionState state = entry.getValue();
logger.debug("Inspecting session: {}", state);
@@ -199,7 +197,7 @@ public class MQTTStateManager {
message.setAddress(MQTTUtil.MQTT_SESSION_STORE);
message.setDurable(true);
message.putStringProperty(Message.HDR_LAST_VALUE_NAME,
state.getClientId());
- Collection<Pair<MqttTopicSubscription, Integer>> subscriptions =
state.getSubscriptionsPlusID();
+ Map<String, MQTTSessionState.SubscriptionItem> subscriptions =
state.getSubscriptionsPlusID();
ActiveMQBuffer buf = message.getBodyBuffer();
/*
@@ -210,14 +208,14 @@ public class MQTTStateManager {
buf.writeInt(subscriptions.size());
logger.debug("Serializing {} subscriptions", subscriptions.size());
- for (Pair<MqttTopicSubscription, Integer> pair : subscriptions) {
- MqttTopicSubscription sub = pair.getA();
- buf.writeString(sub.topicName());
+ for (MQTTSessionState.SubscriptionItem item : subscriptions.values()) {
+ MqttTopicSubscription sub = item.getSubscription();
+ buf.writeString(sub.topicFilter());
buf.writeInt(sub.option().qos().value());
buf.writeBoolean(sub.option().isNoLocal());
buf.writeBoolean(sub.option().isRetainAsPublished());
buf.writeInt(sub.option().retainHandling().value());
- buf.writeNullableInt(pair.getB());
+ buf.writeNullableInt(item.getId());
}
return message;
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 69c5424e3a..68c2765fa8 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -100,9 +100,9 @@ public class MQTTSubscriptionManager {
}
private void addSubscription(MqttTopicSubscription subscription, Integer
subscriptionIdentifier, boolean initialStart) throws Exception {
- String rawTopicName =
CompositeAddress.extractAddressName(subscription.topicName());
+ String rawTopicName =
CompositeAddress.extractAddressName(subscription.topicFilter());
String parsedTopicName =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
- boolean isFullyQualified =
CompositeAddress.isFullyQualified(subscription.topicName());
+ boolean isFullyQualified =
CompositeAddress.isFullyQualified(subscription.topicFilter());
Queue q = createQueueForSubscription(rawTopicName, parsedTopicName,
isFullyQualified);
@@ -299,7 +299,7 @@ public class MQTTSubscriptionManager {
addSubscription(subscriptions.get(i), subscriptionIdentifier,
false);
qos[i] = subscriptions.get(i).qualityOfService().value();
} catch (ActiveMQSecurityException e) {
- // user is not authorized to create subsription
+ // user is not authorized to create subscription
if (session.getVersion() == MQTTVersion.MQTT_5) {
qos[i] = MQTTReasonCodes.NOT_AUTHORIZED;
} else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
@@ -340,7 +340,7 @@ public class MQTTSubscriptionManager {
void clean(boolean enforceSecurity) throws Exception {
List<String> topics = new ArrayList<>();
for (MqttTopicSubscription mqttTopicSubscription :
session.getState().getSubscriptions()) {
- topics.add(mqttTopicSubscription.topicName());
+ topics.add(mqttTopicSubscription.topicFilter());
}
removeSubscriptions(topics, enforceSecurity);
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
index e1721a8e54..1d5630e99a 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java
@@ -18,18 +18,17 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
-import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class StateSerDeTest {
@Test
@@ -52,12 +51,12 @@ public class StateSerDeTest {
MQTTSessionState deserialized = new MQTTSessionState(serializedState);
assertEquals(unserialized.getClientId(), deserialized.getClientId());
- for (Pair<MqttTopicSubscription, Integer> unserializedEntry :
unserialized.getSubscriptionsPlusID()) {
- MqttTopicSubscription unserializedSub = unserializedEntry.getA();
- Integer unserializedSubId = unserializedEntry.getB();
- Pair<MqttTopicSubscription, Integer> deserializedEntry =
deserialized.getSubscriptionPlusID(unserializedSub.topicName());
- MqttTopicSubscription deserializedSub = deserializedEntry.getA();
- Integer deserializedSubId = deserializedEntry.getB();
+ for (MQTTSessionState.SubscriptionItem unserializedItem :
unserialized.getSubscriptionsPlusID().values()) {
+ MqttTopicSubscription unserializedSub =
unserializedItem.getSubscription();
+ Integer unserializedSubId = unserializedItem.getId();
+ MQTTSessionState.SubscriptionItem deserializedEntry =
deserialized.getSubscriptionPlusID(unserializedSub.topicFilter());
+ MqttTopicSubscription deserializedSub =
deserializedEntry.getSubscription();
+ Integer deserializedSubId = deserializedEntry.getId();
assertTrue(compareSubs(unserializedSub, deserializedSub));
assertEquals(unserializedSubId, deserializedSubId);
@@ -72,11 +71,11 @@ public class StateSerDeTest {
if (a == null || b == null) {
return false;
}
- if (a.topicName() == null) {
- if (b.topicName() != null) {
+ if (a.topicFilter() == null) {
+ if (b.topicFilter() != null) {
return false;
}
- } else if (!a.topicName().equals(b.topicName())) {
+ } else if (!a.topicFilter().equals(b.topicFilter())) {
return false;
}
if (a.option() == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact