This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fde9d223ae ARTEMIS-4249 failure to create internal MQTT consumer can
orphan sub q
fde9d223ae is described below
commit fde9d223aeff198b1d6529d7457a732606112f11
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Apr 21 11:30:18 2023 -0500
ARTEMIS-4249 failure to create internal MQTT consumer can orphan sub q
---
.../protocol/mqtt/MQTTSubscriptionManager.java | 35 ++++++++++++----------
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 24 +++++++++++++++
2 files changed, 44 insertions(+), 15 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index bc762e88e9..176a1c18ca 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -108,26 +108,31 @@ public class MQTTSubscriptionManager {
Queue q = createQueueForSubscription(coreAddress,
getQueueNameForTopic(rawTopicName));
- if (initialStart) {
- createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), null);
- } else {
- MqttTopicSubscription existingSubscription =
session.getState().getSubscription(parsedTopicName);
- if (existingSubscription == null) {
+ try {
+ if (initialStart) {
createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), null);
} else {
- Long existingConsumerId = consumers.get(parsedTopicName).getID();
- consumerQoSLevels.put(existingConsumerId, qos);
- if (existingSubscription.option().isNoLocal() !=
subscription.option().isNoLocal()) {
- createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), existingConsumerId);
+ MqttTopicSubscription existingSubscription =
session.getState().getSubscription(parsedTopicName);
+ if (existingSubscription == null) {
+ createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), null);
+ } else {
+ Long existingConsumerId =
consumers.get(parsedTopicName).getID();
+ consumerQoSLevels.put(existingConsumerId, qos);
+ if (existingSubscription.option().isNoLocal() !=
subscription.option().isNoLocal()) {
+ createConsumerForSubscriptionQueue(q, parsedTopicName, qos,
subscription.option().isNoLocal(), existingConsumerId);
+ }
}
- }
- if (subscription.option().retainHandling() ==
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE ||
- (subscription.option().retainHandling() ==
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS
&& existingSubscription == null)) {
- session.getRetainMessageManager().addRetainedMessagesToQueue(q,
parsedTopicName);
- }
+ if (subscription.option().retainHandling() ==
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE ||
(subscription.option().retainHandling() ==
MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS
&& existingSubscription == null)) {
+ session.getRetainMessageManager().addRetainedMessagesToQueue(q,
parsedTopicName);
+ }
- session.getState().addSubscription(subscription,
session.getWildcardConfiguration(), subscriptionIdentifier);
+ session.getState().addSubscription(subscription,
session.getWildcardConfiguration(), subscriptionIdentifier);
+ }
+ } catch (Exception e) {
+ // if anything broke during the creation of the consumer (or
otherwise) then ensure the subscription queue is removed
+ q.deleteQueue();
+ throw e;
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 7ac7639304..85e0e3369a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -472,4 +472,28 @@ public class MQTT5Test extends MQTT5TestSupport {
client.disconnect();
client.close();
}
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testQueueCleanedUpOnConsumerFail() throws Exception {
+ final String topic = getName();
+ final String clientID = getName();
+
+ // force the creation of the consumer to fail
+ server.getAddressSettingsRepository().addMatch(topic, new
AddressSettings().setDefaultMaxConsumers(0));
+
+ MqttClient client = createPahoClient(clientID);
+ client.connect();
+ try {
+ client.subscribe(topic, 1);
+ } catch (Exception e) {
+ // ignore
+ }
+
+ Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) == null,
2000, 100);
+
+ if (client.isConnected()) {
+ client.disconnect();
+ }
+ client.close();
+ }
}