This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b5e25eb4fe ARTEMIS-3871 uniquely name MQTT share sub queues
b5e25eb4fe is described below

commit b5e25eb4fe5b49b0562c9b94fbe290c9fd48926b
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Nov 14 11:00:34 2022 -0600

    ARTEMIS-3871 uniquely name MQTT share sub queues
---
 .../protocol/mqtt/MQTTSubscriptionManager.java     |  59 ++++++------
 docs/user-manual/en/versions.md                    |  17 +++-
 .../artemis/tests/integration/mqtt/MQTTTest.java   |   2 +-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 100 +++++++++++++++++++++
 .../tests/integration/mqtt5/MQTT5TestSupport.java  |  10 ++-
 .../integration/mqtt5/spec/SubscriptionTests.java  |   8 +-
 6 files changed, 158 insertions(+), 38 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 83757b7c16..4b0ea51ab1 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -103,37 +103,35 @@ public class MQTTSubscriptionManager {
    }
 
    private void addSubscription(MqttTopicSubscription subscription, Integer 
subscriptionIdentifier, boolean initialStart) throws Exception {
-      String topicName = 
CompositeAddress.extractAddressName(subscription.topicName());
-      String sharedSubscriptionName = null;
-
-      // if using a shared subscription then parse the subscription name and 
topic
-      if (topicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
-         int slashIndex = topicName.indexOf(SLASH) + 1;
-         sharedSubscriptionName = topicName.substring(slashIndex, 
topicName.indexOf(SLASH, slashIndex));
-         topicName = topicName.substring(topicName.indexOf(SLASH, slashIndex) 
+ 1);
+      String rawTopicName = 
CompositeAddress.extractAddressName(subscription.topicName());
+      String parsedTopicName = rawTopicName;
+
+      // if using a shared subscription then parse
+      if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
+         parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH, 
rawTopicName.indexOf(SLASH) + 1) + 1);
       }
       int qos = subscription.qualityOfService().value();
-      String coreAddress = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(topicName, 
session.getWildcardConfiguration());
+      String coreAddress = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(parsedTopicName, 
session.getWildcardConfiguration());
 
-      Queue q = createQueueForSubscription(coreAddress, 
sharedSubscriptionName);
+      Queue q = createQueueForSubscription(coreAddress, 
getQueueNameForTopic(rawTopicName));
 
       if (initialStart) {
-         createConsumerForSubscriptionQueue(q, topicName, qos, 
subscription.option().isNoLocal(), null);
+         createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), null);
       } else {
-         MqttTopicSubscription existingSubscription = 
session.getState().getSubscription(topicName);
+         MqttTopicSubscription existingSubscription = 
session.getState().getSubscription(parsedTopicName);
          if (existingSubscription == null) {
-            createConsumerForSubscriptionQueue(q, topicName, qos, 
subscription.option().isNoLocal(), null);
+            createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), null);
          } else {
-            Long existingConsumerId = consumers.get(topicName).getID();
+            Long existingConsumerId = consumers.get(parsedTopicName).getID();
             consumerQoSLevels.put(existingConsumerId, qos);
             if (existingSubscription.option().isNoLocal() != 
subscription.option().isNoLocal()) {
-               createConsumerForSubscriptionQueue(q, topicName, qos, 
subscription.option().isNoLocal(), existingConsumerId);
+               createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), existingConsumerId);
             }
          }
 
          if (subscription.option().retainHandling() == 
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE ||
             (subscription.option().retainHandling() == 
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS
 && existingSubscription == null)) {
-            session.getRetainMessageManager().addRetainedMessagesToQueue(q, 
topicName);
+            session.getRetainMessageManager().addRetainedMessagesToQueue(q, 
parsedTopicName);
          }
 
          session.getState().addSubscription(subscription, 
session.getWildcardConfiguration(), subscriptionIdentifier);
@@ -149,17 +147,9 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   private Queue createQueueForSubscription(String address, String 
sharedSubscriptionName) throws Exception {
-      // determine the proper queue name
-      SimpleString queue;
-      if (sharedSubscriptionName != null) {
-         queue = SimpleString.toSimpleString(sharedSubscriptionName);
-      } else {
-         queue = getQueueNameForTopic(address);
-      }
-
+   private Queue createQueueForSubscription(String address, SimpleString 
queueName) throws Exception {
       // check to see if a subscription queue already exists.
-      Queue q = session.getServer().locateQueue(queue);
+      Queue q = session.getServer().locateQueue(queueName);
 
       // The queue does not exist so we need to create it.
       if (q == null) {
@@ -180,7 +170,7 @@ public class MQTTSubscriptionManager {
             addressInfo = 
session.getServerSession().createAddress(SimpleString.toSimpleString(address),
                                                                    
RoutingType.MULTICAST, true);
          }
-         return findOrCreateQueue(bindingQueryResult, addressInfo, queue);
+         return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
       }
       return q;
    }
@@ -269,13 +259,11 @@ public class MQTTSubscriptionManager {
       short reasonCode = MQTTReasonCodes.SUCCESS;
 
       try {
-         String internalAddress = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(address, 
session.getWildcardConfiguration());
-         SimpleString internalQueueName = 
getQueueNameForTopic(internalAddress);
+         SimpleString internalQueueName = getQueueNameForTopic(address);
          session.getState().removeSubscription(address);
 
          Queue queue = session.getServer().locateQueue(internalQueueName);
-         SimpleString sAddress = SimpleString.toSimpleString(internalAddress);
-         AddressInfo addressInfo = 
session.getServerSession().getAddress(sAddress);
+         AddressInfo addressInfo = 
session.getServerSession().getAddress(SimpleString.toSimpleString(MQTTUtil.convertMqttTopicFilterToCoreAddress(address,
 session.getWildcardConfiguration())));
          if (addressInfo != null && 
addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
             ServerConsumer consumer = consumers.get(address);
             consumers.remove(address);
@@ -314,7 +302,14 @@ public class MQTTSubscriptionManager {
    }
 
    private SimpleString getQueueNameForTopic(String topic) {
-      return new SimpleString(session.getState().getClientId() + "." + topic);
+      if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
+         int slashIndex = topic.indexOf(SLASH) + 1;
+         String sharedSubscriptionName = topic.substring(slashIndex, 
topic.indexOf(SLASH, slashIndex));
+         String parsedTopicName = topic.substring(topic.indexOf(SLASH, 
slashIndex) + 1);
+         return new 
SimpleString(sharedSubscriptionName).concat(".").concat(session.getState().getClientId()).concat(".").concat(parsedTopicName);
+      } else {
+         return new 
SimpleString(session.getState().getClientId()).concat(".").concat(topic);
+      }
    }
 
    /**
diff --git a/docs/user-manual/en/versions.md b/docs/user-manual/en/versions.md
index 3454c97317..538042471c 100644
--- a/docs/user-manual/en/versions.md
+++ b/docs/user-manual/en/versions.md
@@ -8,6 +8,21 @@ This chapter provides the following information for each 
release:
   - **Note:** Follow the general upgrade procedure outlined in the [Upgrading 
the Broker](upgrading.md) 
     chapter in addition to any version-specific upgrade instructions outlined 
here.
 
+## 2.28.0
+[Full release notes]()
+
+Highlights:
+- ...
+
+#### Upgrading from older versions
+1. Due to [ARTEMIS-3871](https://issues.apache.org/jira/browse/ARTEMIS-3871) 
the naming pattern used for MQTT _shared_
+   subscription queues has changed. Previously the subscription queue was 
named according to the subscription name
+   provided in the MQTT `SUBSCRIBE` packet. However, MQTT allows the same name 
to be used across multiple subscriptions
+   whereas queues in the broker must be named uniquely. Now the subscription 
queue will be named according to the
+   subscription name, client ID, and topic name so that all subscription queue 
names will be unique. Before upgrading
+   please ensure all MQTT shared subscriptions are empty. When the subscribers 
reconnect they will get a new
+   subscription queue. If they are not empty you can move the messages to the 
new subscription queue administratively.
+
 ## 2.27.1
 [Full release 
notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352610&projectId=12315920)
 
@@ -17,7 +32,7 @@ Highlights:
 - AMQP Large Message over Bridges were broken
 - Rollback of massive transactions would take a long time to process
 - Improvements to auto-create and auto-delete queues.
- 
+
 ## 2.27.0
 [Full release 
notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352246&projectId=12315920)
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index adc39a276c..a68d0f66d6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1842,7 +1842,7 @@ public class MQTTTest extends MQTTTestSupport {
       Exception peerDisconnectedException = null;
       try {
          String clientId = "test.client";
-         SimpleString coreAddress = new SimpleString("foo.bar");
+         SimpleString coreAddress = new SimpleString("foo/bar");
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)};
 
          getServer().createQueue(new QueueConfiguration(new 
SimpleString(clientId + "." + 
coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 67deeba240..d88f4a6981 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -32,6 +32,7 @@ import 
org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
 import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
@@ -242,4 +243,103 @@ public class MQTT5Test extends MQTT5TestSupport {
          AssertionLoggerHandler.stopCapture();
       }
    }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testSharedSubscriptionsWithSameName() throws Exception {
+      final String TOPIC1 = "myTopic1";
+      final String TOPIC2 = "myTopic2";
+      final String SUB_NAME = "mySub";
+      final String SHARED_SUB1 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + 
SUB_NAME + "/" + TOPIC1;
+      final String SHARED_SUB2 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + 
SUB_NAME + "/" + TOPIC2;
+      CountDownLatch ackLatch1 = new CountDownLatch(1);
+      CountDownLatch ackLatch2 = new CountDownLatch(1);
+
+      MqttClient consumer1 = createPahoClient("consumer1");
+      consumer1.connect();
+      consumer1.setCallback(new LatchedMqttCallback(ackLatch1));
+      consumer1.subscribe(SHARED_SUB1, 1);
+
+      
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
+      Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+      assertNotNull(q1);
+      assertEquals(TOPIC1, q1.getAddress().toString());
+      assertEquals(1, q1.getConsumerCount());
+
+      MqttClient consumer2 = createPahoClient("consumer2");
+      consumer2.connect();
+      consumer2.setCallback(new LatchedMqttCallback(ackLatch2));
+      consumer2.subscribe(SHARED_SUB2, 1);
+
+      
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
+      Queue q2 = getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME);
+      assertNotNull(q2);
+      assertEquals(TOPIC2, q2.getAddress().toString());
+      assertEquals(1, q2.getConsumerCount());
+
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(TOPIC1, new byte[0], 1, false);
+      producer.publish(TOPIC2, new byte[0], 1, false);
+      producer.disconnect();
+      producer.close();
+
+      assertTrue(ackLatch1.await(2, TimeUnit.SECONDS));
+      assertTrue(ackLatch2.await(2, TimeUnit.SECONDS));
+
+      consumer1.unsubscribe(SHARED_SUB1);
+      assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
+
+      consumer2.unsubscribe(SHARED_SUB2);
+      assertNull(getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME));
+
+      consumer1.disconnect();
+      consumer1.close();
+      consumer2.disconnect();
+      consumer2.close();
+   }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testSharedSubscriptionsWithSameName2() throws Exception {
+      final String TOPIC1 = "myTopic1";
+      final String TOPIC2 = "myTopic2";
+      final String SUB_NAME = "mySub";
+      final String[] SHARED_SUBS = new String[]{
+         MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1,
+         MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2
+      };
+      CountDownLatch ackLatch = new CountDownLatch(2);
+
+      MqttClient consumer = createPahoClient("consumer1");
+      consumer.connect();
+      consumer.setCallback(new LatchedMqttCallback(ackLatch));
+      consumer.subscribe(SHARED_SUBS, new int[]{1, 1});
+
+      
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
+      Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+      assertNotNull(q1);
+      assertEquals(TOPIC1, q1.getAddress().toString());
+      assertEquals(1, q1.getConsumerCount());
+
+      
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
+      Queue q2 = getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME);
+      assertNotNull(q2);
+      assertEquals(TOPIC2, q2.getAddress().toString());
+      assertEquals(1, q2.getConsumerCount());
+
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(TOPIC1, new byte[0], 1, false);
+      producer.publish(TOPIC2, new byte[0], 1, false);
+      producer.disconnect();
+      producer.close();
+
+      assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
+
+      consumer.unsubscribe(SHARED_SUBS);
+      assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
+      assertNull(getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME));
+
+      consumer.disconnect();
+      consumer.close();
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index 1f4349271d..6891d0311e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -346,9 +346,17 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
    }
 
    protected Queue getSubscriptionQueue(String TOPIC, String clientId) {
+      return getSubscriptionQueue(TOPIC, clientId, null);
+   }
+
+   protected Queue getSubscriptionQueue(String TOPIC, String clientId, String 
sharedSubscriptionName) {
       try {
          for (Binding b : 
server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(TOPIC))) 
{
-            if 
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId)))
 {
+            if (sharedSubscriptionName != null) {
+               if 
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(sharedSubscriptionName)))
 {
+                  return ((LocalQueueBinding)b).getQueue();
+               }
+            } else if 
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId)))
 {
                return ((LocalQueueBinding)b).getQueue();
             }
          }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
index 0619589843..8e61c17969 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
 import org.apache.activemq.artemis.utils.Wait;
 import org.eclipse.paho.mqttv5.client.MqttClient;
@@ -126,9 +127,10 @@ public class SubscriptionTests extends MQTT5TestSupport {
       consumer1.setCallback(new LatchedMqttCallback(ackLatch));
       consumer1.subscribe(SHARED_SUB, 1);
 
-      assertNotNull(server.locateQueue(SUB_NAME));
-      assertEquals(TOPIC, 
server.locateQueue(SUB_NAME).getAddress().toString());
-      assertEquals(1, server.locateQueue(SUB_NAME).getConsumerCount());
+      Queue sharedSubQueue = 
server.locateQueue(SUB_NAME.concat(".").concat(consumer1.getClientId()).concat(".").concat(TOPIC));
+      assertNotNull(sharedSubQueue);
+      assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
+      assertEquals(1, sharedSubQueue.getConsumerCount());
 
       MqttClient producer = createPahoClient("producer");
       producer.connect();

Reply via email to