This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b5e25eb4fe ARTEMIS-3871 uniquely name MQTT share sub queues
b5e25eb4fe is described below
commit b5e25eb4fe5b49b0562c9b94fbe290c9fd48926b
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Nov 14 11:00:34 2022 -0600
ARTEMIS-3871 uniquely name MQTT share sub queues
---
.../protocol/mqtt/MQTTSubscriptionManager.java | 59 ++++++------
docs/user-manual/en/versions.md | 17 +++-
.../artemis/tests/integration/mqtt/MQTTTest.java | 2 +-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 100 +++++++++++++++++++++
.../tests/integration/mqtt5/MQTT5TestSupport.java | 10 ++-
.../integration/mqtt5/spec/SubscriptionTests.java | 8 +-
6 files changed, 158 insertions(+), 38 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 83757b7c16..4b0ea51ab1 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -103,37 +103,35 @@ public class MQTTSubscriptionManager {
}
private void addSubscription(MqttTopicSubscription subscription, Integer
subscriptionIdentifier, boolean initialStart) throws Exception {
- String topicName =
CompositeAddress.extractAddressName(subscription.topicName());
- String sharedSubscriptionName = null;
-
- // if using a shared subscription then parse the subscription name and
topic
- if (topicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
- int slashIndex = topicName.indexOf(SLASH) + 1;
- sharedSubscriptionName = topicName.substring(slashIndex,
topicName.indexOf(SLASH, slashIndex));
- topicName = topicName.substring(topicName.indexOf(SLASH, slashIndex)
+ 1);
+ String rawTopicName =
CompositeAddress.extractAddressName(subscription.topicName());
+ String parsedTopicName = rawTopicName;
+
+ // if using a shared subscription then parse
+ if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
+ parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH,
rawTopicName.indexOf(SLASH) + 1) + 1);
}
int qos = subscription.qualityOfService().value();
- String coreAddress =
MQTTUtil.convertMqttTopicFilterToCoreAddress(topicName,
session.getWildcardConfiguration());
+ String coreAddress =
MQTTUtil.convertMqttTopicFilterToCoreAddress(parsedTopicName,
session.getWildcardConfiguration());
- Queue q = createQueueForSubscription(coreAddress,
sharedSubscriptionName);
+ Queue q = createQueueForSubscription(coreAddress,
getQueueNameForTopic(rawTopicName));
if (initialStart) {
- createConsumerForSubscriptionQueue(q, topicName, qos,
subscription.option().isNoLocal(), null);
+ createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), null);
} else {
- MqttTopicSubscription existingSubscription =
session.getState().getSubscription(topicName);
+ MqttTopicSubscription existingSubscription =
session.getState().getSubscription(parsedTopicName);
if (existingSubscription == null) {
- createConsumerForSubscriptionQueue(q, topicName, qos,
subscription.option().isNoLocal(), null);
+ createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), null);
} else {
- Long existingConsumerId = consumers.get(topicName).getID();
+ Long existingConsumerId = consumers.get(parsedTopicName).getID();
consumerQoSLevels.put(existingConsumerId, qos);
if (existingSubscription.option().isNoLocal() !=
subscription.option().isNoLocal()) {
- createConsumerForSubscriptionQueue(q, topicName, qos,
subscription.option().isNoLocal(), existingConsumerId);
+ createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), existingConsumerId);
}
}
if (subscription.option().retainHandling() ==
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE ||
(subscription.option().retainHandling() ==
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS
&& existingSubscription == null)) {
- session.getRetainMessageManager().addRetainedMessagesToQueue(q,
topicName);
+ session.getRetainMessageManager().addRetainedMessagesToQueue(q,
parsedTopicName);
}
session.getState().addSubscription(subscription,
session.getWildcardConfiguration(), subscriptionIdentifier);
@@ -149,17 +147,9 @@ public class MQTTSubscriptionManager {
}
}
- private Queue createQueueForSubscription(String address, String
sharedSubscriptionName) throws Exception {
- // determine the proper queue name
- SimpleString queue;
- if (sharedSubscriptionName != null) {
- queue = SimpleString.toSimpleString(sharedSubscriptionName);
- } else {
- queue = getQueueNameForTopic(address);
- }
-
+ private Queue createQueueForSubscription(String address, SimpleString
queueName) throws Exception {
// check to see if a subscription queue already exists.
- Queue q = session.getServer().locateQueue(queue);
+ Queue q = session.getServer().locateQueue(queueName);
// The queue does not exist so we need to create it.
if (q == null) {
@@ -180,7 +170,7 @@ public class MQTTSubscriptionManager {
addressInfo =
session.getServerSession().createAddress(SimpleString.toSimpleString(address),
RoutingType.MULTICAST, true);
}
- return findOrCreateQueue(bindingQueryResult, addressInfo, queue);
+ return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
}
return q;
}
@@ -269,13 +259,11 @@ public class MQTTSubscriptionManager {
short reasonCode = MQTTReasonCodes.SUCCESS;
try {
- String internalAddress =
MQTTUtil.convertMqttTopicFilterToCoreAddress(address,
session.getWildcardConfiguration());
- SimpleString internalQueueName =
getQueueNameForTopic(internalAddress);
+ SimpleString internalQueueName = getQueueNameForTopic(address);
session.getState().removeSubscription(address);
Queue queue = session.getServer().locateQueue(internalQueueName);
- SimpleString sAddress = SimpleString.toSimpleString(internalAddress);
- AddressInfo addressInfo =
session.getServerSession().getAddress(sAddress);
+ AddressInfo addressInfo =
session.getServerSession().getAddress(SimpleString.toSimpleString(MQTTUtil.convertMqttTopicFilterToCoreAddress(address,
session.getWildcardConfiguration())));
if (addressInfo != null &&
addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
ServerConsumer consumer = consumers.get(address);
consumers.remove(address);
@@ -314,7 +302,14 @@ public class MQTTSubscriptionManager {
}
private SimpleString getQueueNameForTopic(String topic) {
- return new SimpleString(session.getState().getClientId() + "." + topic);
+ if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
+ int slashIndex = topic.indexOf(SLASH) + 1;
+ String sharedSubscriptionName = topic.substring(slashIndex,
topic.indexOf(SLASH, slashIndex));
+ String parsedTopicName = topic.substring(topic.indexOf(SLASH,
slashIndex) + 1);
+ return new
SimpleString(sharedSubscriptionName).concat(".").concat(session.getState().getClientId()).concat(".").concat(parsedTopicName);
+ } else {
+ return new
SimpleString(session.getState().getClientId()).concat(".").concat(topic);
+ }
}
/**
diff --git a/docs/user-manual/en/versions.md b/docs/user-manual/en/versions.md
index 3454c97317..538042471c 100644
--- a/docs/user-manual/en/versions.md
+++ b/docs/user-manual/en/versions.md
@@ -8,6 +8,21 @@ This chapter provides the following information for each
release:
- **Note:** Follow the general upgrade procedure outlined in the [Upgrading
the Broker](upgrading.md)
chapter in addition to any version-specific upgrade instructions outlined
here.
+## 2.28.0
+[Full release notes]()
+
+Highlights:
+- ...
+
+#### Upgrading from older versions
+1. Due to [ARTEMIS-3871](https://issues.apache.org/jira/browse/ARTEMIS-3871)
the naming pattern used for MQTT _shared_
+ subscription queues has changed. Previously the subscription queue was
named according to the subscription name
+ provided in the MQTT `SUBSCRIBE` packet. However, MQTT allows the same name
to be used across multiple subscriptions
+ whereas queues in the broker must be named uniquely. Now the subscription
queue will be named according to the
+ subscription name, client ID, and topic name so that all subscription queue
names will be unique. Before upgrading
+ please ensure all MQTT shared subscriptions are empty. When the subscribers
reconnect they will get a new
+ subscription queue. If they are not empty you can move the messages to the
new subscription queue administratively.
+
## 2.27.1
[Full release
notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352610&projectId=12315920)
@@ -17,7 +32,7 @@ Highlights:
- AMQP Large Message over Bridges were broken
- Rollback of massive transactions would take a long time to process
- Improvements to auto-create and auto-delete queues.
-
+
## 2.27.0
[Full release
notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352246&projectId=12315920)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index adc39a276c..a68d0f66d6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1842,7 +1842,7 @@ public class MQTTTest extends MQTTTestSupport {
Exception peerDisconnectedException = null;
try {
String clientId = "test.client";
- SimpleString coreAddress = new SimpleString("foo.bar");
+ SimpleString coreAddress = new SimpleString("foo/bar");
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar",
QoS.AT_LEAST_ONCE)};
getServer().createQueue(new QueueConfiguration(new
SimpleString(clientId + "." +
coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 67deeba240..d88f4a6981 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -32,6 +32,7 @@ import
org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.RandomUtil;
@@ -242,4 +243,103 @@ public class MQTT5Test extends MQTT5TestSupport {
AssertionLoggerHandler.stopCapture();
}
}
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testSharedSubscriptionsWithSameName() throws Exception {
+ final String TOPIC1 = "myTopic1";
+ final String TOPIC2 = "myTopic2";
+ final String SUB_NAME = "mySub";
+ final String SHARED_SUB1 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX +
SUB_NAME + "/" + TOPIC1;
+ final String SHARED_SUB2 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX +
SUB_NAME + "/" + TOPIC2;
+ CountDownLatch ackLatch1 = new CountDownLatch(1);
+ CountDownLatch ackLatch2 = new CountDownLatch(1);
+
+ MqttClient consumer1 = createPahoClient("consumer1");
+ consumer1.connect();
+ consumer1.setCallback(new LatchedMqttCallback(ackLatch1));
+ consumer1.subscribe(SHARED_SUB1, 1);
+
+
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
+ Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+ assertNotNull(q1);
+ assertEquals(TOPIC1, q1.getAddress().toString());
+ assertEquals(1, q1.getConsumerCount());
+
+ MqttClient consumer2 = createPahoClient("consumer2");
+ consumer2.connect();
+ consumer2.setCallback(new LatchedMqttCallback(ackLatch2));
+ consumer2.subscribe(SHARED_SUB2, 1);
+
+
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
+ Queue q2 = getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME);
+ assertNotNull(q2);
+ assertEquals(TOPIC2, q2.getAddress().toString());
+ assertEquals(1, q2.getConsumerCount());
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(TOPIC1, new byte[0], 1, false);
+ producer.publish(TOPIC2, new byte[0], 1, false);
+ producer.disconnect();
+ producer.close();
+
+ assertTrue(ackLatch1.await(2, TimeUnit.SECONDS));
+ assertTrue(ackLatch2.await(2, TimeUnit.SECONDS));
+
+ consumer1.unsubscribe(SHARED_SUB1);
+ assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
+
+ consumer2.unsubscribe(SHARED_SUB2);
+ assertNull(getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME));
+
+ consumer1.disconnect();
+ consumer1.close();
+ consumer2.disconnect();
+ consumer2.close();
+ }
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testSharedSubscriptionsWithSameName2() throws Exception {
+ final String TOPIC1 = "myTopic1";
+ final String TOPIC2 = "myTopic2";
+ final String SUB_NAME = "mySub";
+ final String[] SHARED_SUBS = new String[]{
+ MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1,
+ MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2
+ };
+ CountDownLatch ackLatch = new CountDownLatch(2);
+
+ MqttClient consumer = createPahoClient("consumer1");
+ consumer.connect();
+ consumer.setCallback(new LatchedMqttCallback(ackLatch));
+ consumer.subscribe(SHARED_SUBS, new int[]{1, 1});
+
+
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
+ Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+ assertNotNull(q1);
+ assertEquals(TOPIC1, q1.getAddress().toString());
+ assertEquals(1, q1.getConsumerCount());
+
+
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
+ Queue q2 = getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME);
+ assertNotNull(q2);
+ assertEquals(TOPIC2, q2.getAddress().toString());
+ assertEquals(1, q2.getConsumerCount());
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(TOPIC1, new byte[0], 1, false);
+ producer.publish(TOPIC2, new byte[0], 1, false);
+ producer.disconnect();
+ producer.close();
+
+ assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
+
+ consumer.unsubscribe(SHARED_SUBS);
+ assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
+ assertNull(getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME));
+
+ consumer.disconnect();
+ consumer.close();
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index 1f4349271d..6891d0311e 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -346,9 +346,17 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
}
protected Queue getSubscriptionQueue(String TOPIC, String clientId) {
+ return getSubscriptionQueue(TOPIC, clientId, null);
+ }
+
+ protected Queue getSubscriptionQueue(String TOPIC, String clientId, String
sharedSubscriptionName) {
try {
for (Binding b :
server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(TOPIC)))
{
- if
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId)))
{
+ if (sharedSubscriptionName != null) {
+ if
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(sharedSubscriptionName)))
{
+ return ((LocalQueueBinding)b).getQueue();
+ }
+ } else if
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId)))
{
return ((LocalQueueBinding)b).getQueue();
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
index 0619589843..8e61c17969 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
@@ -126,9 +127,10 @@ public class SubscriptionTests extends MQTT5TestSupport {
consumer1.setCallback(new LatchedMqttCallback(ackLatch));
consumer1.subscribe(SHARED_SUB, 1);
- assertNotNull(server.locateQueue(SUB_NAME));
- assertEquals(TOPIC,
server.locateQueue(SUB_NAME).getAddress().toString());
- assertEquals(1, server.locateQueue(SUB_NAME).getConsumerCount());
+ Queue sharedSubQueue =
server.locateQueue(SUB_NAME.concat(".").concat(consumer1.getClientId()).concat(".").concat(TOPIC));
+ assertNotNull(sharedSubQueue);
+ assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
+ assertEquals(1, sharedSubQueue.getConsumerCount());
MqttClient producer = createPahoClient("producer");
producer.connect();