This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fde9d223ae ARTEMIS-4249 failure to create internal MQTT consumer can 
orphan sub q
fde9d223ae is described below

commit fde9d223aeff198b1d6529d7457a732606112f11
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Apr 21 11:30:18 2023 -0500

    ARTEMIS-4249 failure to create internal MQTT consumer can orphan sub q
---
 .../protocol/mqtt/MQTTSubscriptionManager.java     | 35 ++++++++++++----------
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 24 +++++++++++++++
 2 files changed, 44 insertions(+), 15 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index bc762e88e9..176a1c18ca 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -108,26 +108,31 @@ public class MQTTSubscriptionManager {
 
       Queue q = createQueueForSubscription(coreAddress, 
getQueueNameForTopic(rawTopicName));
 
-      if (initialStart) {
-         createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), null);
-      } else {
-         MqttTopicSubscription existingSubscription = 
session.getState().getSubscription(parsedTopicName);
-         if (existingSubscription == null) {
+      try {
+         if (initialStart) {
             createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), null);
          } else {
-            Long existingConsumerId = consumers.get(parsedTopicName).getID();
-            consumerQoSLevels.put(existingConsumerId, qos);
-            if (existingSubscription.option().isNoLocal() != 
subscription.option().isNoLocal()) {
-               createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), existingConsumerId);
+            MqttTopicSubscription existingSubscription = 
session.getState().getSubscription(parsedTopicName);
+            if (existingSubscription == null) {
+               createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), null);
+            } else {
+               Long existingConsumerId = 
consumers.get(parsedTopicName).getID();
+               consumerQoSLevels.put(existingConsumerId, qos);
+               if (existingSubscription.option().isNoLocal() != 
subscription.option().isNoLocal()) {
+                  createConsumerForSubscriptionQueue(q, parsedTopicName, qos, 
subscription.option().isNoLocal(), existingConsumerId);
+               }
             }
-         }
 
-         if (subscription.option().retainHandling() == 
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE ||
-            (subscription.option().retainHandling() == 
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS
 && existingSubscription == null)) {
-            session.getRetainMessageManager().addRetainedMessagesToQueue(q, 
parsedTopicName);
-         }
+            if (subscription.option().retainHandling() == 
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE || 
(subscription.option().retainHandling() == 
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS
 && existingSubscription == null)) {
+               session.getRetainMessageManager().addRetainedMessagesToQueue(q, 
parsedTopicName);
+            }
 
-         session.getState().addSubscription(subscription, 
session.getWildcardConfiguration(), subscriptionIdentifier);
+            session.getState().addSubscription(subscription, 
session.getWildcardConfiguration(), subscriptionIdentifier);
+         }
+      } catch (Exception e) {
+         // if anything broke during the creation of the consumer (or 
otherwise) then ensure the subscription queue is removed
+         q.deleteQueue();
+         throw e;
       }
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 7ac7639304..85e0e3369a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -472,4 +472,28 @@ public class MQTT5Test extends MQTT5TestSupport {
       client.disconnect();
       client.close();
    }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testQueueCleanedUpOnConsumerFail() throws Exception {
+      final String topic = getName();
+      final String clientID = getName();
+
+      // force the creation of the consumer to fail
+      server.getAddressSettingsRepository().addMatch(topic, new 
AddressSettings().setDefaultMaxConsumers(0));
+
+      MqttClient client = createPahoClient(clientID);
+      client.connect();
+      try {
+         client.subscribe(topic, 1);
+      } catch (Exception e) {
+         // ignore
+      }
+
+      Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) == null, 
2000, 100);
+
+      if (client.isConnected()) {
+         client.disconnect();
+      }
+      client.close();
+   }
 }

Reply via email to