This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new df6e67d1b8 ARTEMIS-5517 MQTT sub ID can't be nullified
new f40fb79f78 This closes #5740
df6e67d1b8 is described below
commit df6e67d1b8c2df4384fa6c78f4b7a63cfc4d693f
Author: Evgeniy Devyatykh <[email protected]>
AuthorDate: Wed Jun 4 19:11:45 2025 +0500
ARTEMIS-5517 MQTT sub ID can't be nullified
---
.../core/protocol/mqtt/MQTTSessionState.java | 2 +-
.../mqtt5/spec/controlpackets/PublishTests.java | 66 +++++++++++++---------
2 files changed, 41 insertions(+), 27 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 12918bf7b4..ffc9fcac6e 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -205,7 +205,7 @@ public class MQTTSessionState {
SubscriptionItem existingSubscription =
subscriptions.get(subscription.topicFilter());
if (existingSubscription != null) {
if (subscription.qualityOfService().value() >
existingSubscription.getSubscription().qualityOfService().value()
- || (subscriptionIdentifier != null &&
!Objects.equals(subscriptionIdentifier, existingSubscription.getId()))) {
+ || !Objects.equals(subscriptionIdentifier,
existingSubscription.getId())) {
existingSubscription.update(subscription,
subscriptionIdentifier);
return true;
} else {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index 840c99b7d9..67f02089ca 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -1468,24 +1468,24 @@ public class PublishTests extends MQTT5TestSupport {
@Override
public void messageArrived(String topic, MqttMessage message) throws
Exception {
try {
- List<Integer> subscriptionIdentifers = message.getProperties()
!= null ? message
+ List<Integer> subscriptionIdentifiers = message.getProperties()
!= null ? message
.getProperties()
.getSubscriptionIdentifiers() : null;
- System.out.println("subscriptionIdentifers: " +
subscriptionIdentifers + "; message: " + message);
- if (Arrays.equals(message.getPayload(),
"foo/a".getBytes(StandardCharsets.UTF_8))) {
- assertTrue(subscriptionIdentifers.contains(3));
- assertEquals(1, subscriptionIdentifers.size());
+ System.out.println("subscriptionIdentifiers: " +
subscriptionIdentifiers + "; message: " + message);
+ if (isPayloadEqual(message, "foo/a")) {
+ assertTrue(subscriptionIdentifiers.contains(3));
+ assertEquals(1, subscriptionIdentifiers.size());
} else if (Arrays.equals(message.getPayload(),
"foo/a/b".getBytes(StandardCharsets.UTF_8))) {
- assertTrue(subscriptionIdentifers.contains(2));
- assertTrue(subscriptionIdentifers.contains(3));
- assertEquals(2, subscriptionIdentifers.size());
+ assertTrue(subscriptionIdentifiers.contains(2));
+ assertTrue(subscriptionIdentifiers.contains(3));
+ assertEquals(2, subscriptionIdentifiers.size());
} else if (Arrays.equals(message.getPayload(),
"foo/a/b/c".getBytes(StandardCharsets.UTF_8))) {
- assertTrue(subscriptionIdentifers.contains(1));
- assertTrue(subscriptionIdentifers.contains(2));
- assertTrue(subscriptionIdentifers.contains(3));
- assertEquals(3, subscriptionIdentifers.size());
+ assertTrue(subscriptionIdentifiers.contains(1));
+ assertTrue(subscriptionIdentifiers.contains(2));
+ assertTrue(subscriptionIdentifiers.contains(3));
+ assertEquals(3, subscriptionIdentifiers.size());
} else {
- fail("invalid subscription identifer");
+ fail("invalid subscription identifier");
}
consumerLatch.countDown();
@@ -1535,7 +1535,7 @@ public class PublishTests extends MQTT5TestSupport {
@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testSubscriptionIdentifierSingleLevel() throws Exception {
- final CountDownLatch consumerLatch = new CountDownLatch(3);
+ final CountDownLatch consumerLatch = new CountDownLatch(6);
MqttAsyncClient consumer =
createAsyncPahoClient(RandomUtil.randomUUIDString());
consumer.connect().waitForCompletion();
@@ -1543,21 +1543,23 @@ public class PublishTests extends MQTT5TestSupport {
@Override
public void messageArrived(String topic, MqttMessage message) throws
Exception {
try {
- List<Integer> subscriptionIdentifers = message.getProperties()
!= null ? message
+ List<Integer> subscriptionIdentifiers = message.getProperties()
!= null ? message
.getProperties()
.getSubscriptionIdentifiers() : null;
- System.out.println("subscriptionIdentifers: " +
subscriptionIdentifers + "; message: " + message);
- if (Arrays.equals(message.getPayload(),
"foo/a".getBytes(StandardCharsets.UTF_8))) {
- assertTrue(subscriptionIdentifers.contains(3));
- assertEquals(1, subscriptionIdentifers.size());
- } else if (Arrays.equals(message.getPayload(),
"foo/a/b".getBytes(StandardCharsets.UTF_8))) {
- assertTrue(subscriptionIdentifers.contains(2));
- assertEquals(1, subscriptionIdentifers.size());
- } else if (Arrays.equals(message.getPayload(),
"foo/a/b/c".getBytes(StandardCharsets.UTF_8))) {
- assertTrue(subscriptionIdentifers.contains(1));
- assertEquals(1, subscriptionIdentifers.size());
+ System.out.println("subscriptionIdentifiers: " +
subscriptionIdentifiers + "; message: " + message);
+ if (isPayloadEqual(message, "foo/a")) {
+ assertTrue(subscriptionIdentifiers.contains(3));
+ assertEquals(1, subscriptionIdentifiers.size());
+ } else if (isPayloadEqual(message, "foo/a-reset")) {
+ assertEquals(0, subscriptionIdentifiers.size());
+ } else if (isPayloadEqual(message, "foo/a/b") ||
isPayloadEqual(message, "foo/a/b-reset")) {
+ assertTrue(subscriptionIdentifiers.contains(2));
+ assertEquals(1, subscriptionIdentifiers.size());
+ } else if (isPayloadEqual(message, "foo/a/b/c") ||
isPayloadEqual(message, "foo/a/b/c-reset")) {
+ assertTrue(subscriptionIdentifiers.contains(1));
+ assertEquals(1, subscriptionIdentifiers.size());
} else {
- fail("invalid subscription identifer");
+ fail("invalid subscription identifier");
}
consumerLatch.countDown();
@@ -1584,6 +1586,14 @@ public class PublishTests extends MQTT5TestSupport {
producer.publish("foo/a", "foo/a".getBytes(StandardCharsets.UTF_8), 2,
false);
producer.publish("foo/a/b", "foo/a/b".getBytes(StandardCharsets.UTF_8),
2, false);
producer.publish("foo/a/b/c",
"foo/a/b/c".getBytes(StandardCharsets.UTF_8), 2, false);
+
+ //remove association with subscription ID from the topic `foo/+`
+ consumer.subscribe(new MqttSubscription[]{new MqttSubscription("foo/+",
2)}, null, null, null).waitForCompletion();
+ producer.publish("foo/a",
"foo/a-reset".getBytes(StandardCharsets.UTF_8), 2, false);
+ producer.publish("foo/a/b",
"foo/a/b-reset".getBytes(StandardCharsets.UTF_8), 2, false);
+ producer.publish("foo/a/b/c",
"foo/a/b/c-reset".getBytes(StandardCharsets.UTF_8), 2, false);
+
+
producer.disconnect();
producer.close();
@@ -1592,6 +1602,10 @@ public class PublishTests extends MQTT5TestSupport {
consumer.close();
}
+ private static boolean isPayloadEqual(MqttMessage message, String compare) {
+ return Arrays.equals(message.getPayload(),
compare.getBytes(StandardCharsets.UTF_8));
+ }
+
/**
* [MQTT-3.3.4-9] The Server MUST NOT send more than Receive Maximum QoS 1
and QoS 2 PUBLISH packets for which it has
* not received PUBACK, PUBCOMP, or PUBREC with a Reason Code of 128 or
greater from the Client.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact