This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ba2cbddd6b ARTEMIS-3871 fix MQTT shared sub q naming semantics
ba2cbddd6b is described below
commit ba2cbddd6bb63f34b24a3a1632d91a7a7d9139f0
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Dec 13 14:44:25 2022 -0600
ARTEMIS-3871 fix MQTT shared sub q naming semantics
---
.../artemis/core/protocol/mqtt/MQTTSubscriptionManager.java | 2 +-
docs/user-manual/en/versions.md | 6 +++---
.../artemis/tests/integration/mqtt5/spec/SubscriptionTests.java | 9 +++++----
3 files changed, 9 insertions(+), 8 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 4b0ea51ab1..47f1917a58 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -306,7 +306,7 @@ public class MQTTSubscriptionManager {
int slashIndex = topic.indexOf(SLASH) + 1;
String sharedSubscriptionName = topic.substring(slashIndex,
topic.indexOf(SLASH, slashIndex));
String parsedTopicName = topic.substring(topic.indexOf(SLASH,
slashIndex) + 1);
- return new
SimpleString(sharedSubscriptionName).concat(".").concat(session.getState().getClientId()).concat(".").concat(parsedTopicName);
+ return new
SimpleString(sharedSubscriptionName).concat(".").concat(parsedTopicName);
} else {
return new
SimpleString(session.getState().getClientId()).concat(".").concat(topic);
}
diff --git a/docs/user-manual/en/versions.md b/docs/user-manual/en/versions.md
index 538042471c..471e6c930c 100644
--- a/docs/user-manual/en/versions.md
+++ b/docs/user-manual/en/versions.md
@@ -19,9 +19,9 @@ Highlights:
subscription queues has changed. Previously the subscription queue was
named according to the subscription name
provided in the MQTT `SUBSCRIBE` packet. However, MQTT allows the same name
to be used across multiple subscriptions
whereas queues in the broker must be named uniquely. Now the subscription
queue will be named according to the
- subscription name, client ID, and topic name so that all subscription queue
names will be unique. Before upgrading
- please ensure all MQTT shared subscriptions are empty. When the subscribers
reconnect they will get a new
- subscription queue. If they are not empty you can move the messages to the
new subscription queue administratively.
+ subscription name and topic name so that all subscription queue names will
be unique. Before upgrading please ensure
+ all MQTT shared subscriptions are empty. When the subscribers reconnect
they will get a new subscription queue. If
+ they are not empty you can move the messages to the new subscription queue
administratively.
## 2.27.1
[Full release
notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352610&projectId=12315920)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
index 8e61c17969..dd6f397480 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java
@@ -73,8 +73,9 @@ public class SubscriptionTests extends MQTT5TestSupport {
});
consumer1.subscribe(SHARED_SUB, 0);
- assertNotNull(server.locateQueue(SUB_NAME));
- assertEquals(TOPIC,
server.locateQueue(SUB_NAME).getAddress().toString());
+ Queue sharedSubQueue =
server.locateQueue(SUB_NAME.concat(".").concat(TOPIC));
+ assertNotNull(sharedSubQueue);
+ assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
MqttClient consumer2 = createPahoClient("consumer2");
consumer2.connect();
@@ -90,7 +91,7 @@ public class SubscriptionTests extends MQTT5TestSupport {
});
consumer2.subscribe(SHARED_SUB, 1);
- assertEquals(2, server.locateQueue(SUB_NAME).getConsumerCount());
+ assertEquals(2, sharedSubQueue.getConsumerCount());
MqttClient producer = createPahoClient("producer");
producer.connect();
@@ -127,7 +128,7 @@ public class SubscriptionTests extends MQTT5TestSupport {
consumer1.setCallback(new LatchedMqttCallback(ackLatch));
consumer1.subscribe(SHARED_SUB, 1);
- Queue sharedSubQueue =
server.locateQueue(SUB_NAME.concat(".").concat(consumer1.getClientId()).concat(".").concat(TOPIC));
+ Queue sharedSubQueue =
server.locateQueue(SUB_NAME.concat(".").concat(TOPIC));
assertNotNull(sharedSubQueue);
assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
assertEquals(1, sharedSubQueue.getConsumerCount());