This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new df6e67d1b8 ARTEMIS-5517 MQTT sub ID can't be nullified
     new f40fb79f78 This closes #5740
df6e67d1b8 is described below

commit df6e67d1b8c2df4384fa6c78f4b7a63cfc4d693f
Author: Evgeniy Devyatykh <[email protected]>
AuthorDate: Wed Jun 4 19:11:45 2025 +0500

    ARTEMIS-5517 MQTT sub ID can't be nullified
---
 .../core/protocol/mqtt/MQTTSessionState.java       |  2 +-
 .../mqtt5/spec/controlpackets/PublishTests.java    | 66 +++++++++++++---------
 2 files changed, 41 insertions(+), 27 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 12918bf7b4..ffc9fcac6e 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -205,7 +205,7 @@ public class MQTTSessionState {
          SubscriptionItem existingSubscription = 
subscriptions.get(subscription.topicFilter());
          if (existingSubscription != null) {
             if (subscription.qualityOfService().value() > 
existingSubscription.getSubscription().qualityOfService().value()
-               || (subscriptionIdentifier != null && 
!Objects.equals(subscriptionIdentifier, existingSubscription.getId()))) {
+               || !Objects.equals(subscriptionIdentifier, 
existingSubscription.getId())) {
                existingSubscription.update(subscription, 
subscriptionIdentifier);
                return true;
             } else {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index 840c99b7d9..67f02089ca 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -1468,24 +1468,24 @@ public class PublishTests extends MQTT5TestSupport {
          @Override
          public void messageArrived(String topic, MqttMessage message) throws 
Exception {
             try {
-               List<Integer> subscriptionIdentifers = message.getProperties() 
!= null ? message
+               List<Integer> subscriptionIdentifiers = message.getProperties() 
!= null ? message
                   .getProperties()
                   .getSubscriptionIdentifiers() : null;
-               System.out.println("subscriptionIdentifers: " + 
subscriptionIdentifers + "; message: " + message);
-               if (Arrays.equals(message.getPayload(), 
"foo/a".getBytes(StandardCharsets.UTF_8))) {
-                  assertTrue(subscriptionIdentifers.contains(3));
-                  assertEquals(1, subscriptionIdentifers.size());
+               System.out.println("subscriptionIdentifiers: " + 
subscriptionIdentifiers + "; message: " + message);
+               if (isPayloadEqual(message, "foo/a")) {
+                  assertTrue(subscriptionIdentifiers.contains(3));
+                  assertEquals(1, subscriptionIdentifiers.size());
                } else if (Arrays.equals(message.getPayload(), 
"foo/a/b".getBytes(StandardCharsets.UTF_8))) {
-                  assertTrue(subscriptionIdentifers.contains(2));
-                  assertTrue(subscriptionIdentifers.contains(3));
-                  assertEquals(2, subscriptionIdentifers.size());
+                  assertTrue(subscriptionIdentifiers.contains(2));
+                  assertTrue(subscriptionIdentifiers.contains(3));
+                  assertEquals(2, subscriptionIdentifiers.size());
                } else if (Arrays.equals(message.getPayload(), 
"foo/a/b/c".getBytes(StandardCharsets.UTF_8))) {
-                  assertTrue(subscriptionIdentifers.contains(1));
-                  assertTrue(subscriptionIdentifers.contains(2));
-                  assertTrue(subscriptionIdentifers.contains(3));
-                  assertEquals(3, subscriptionIdentifers.size());
+                  assertTrue(subscriptionIdentifiers.contains(1));
+                  assertTrue(subscriptionIdentifiers.contains(2));
+                  assertTrue(subscriptionIdentifiers.contains(3));
+                  assertEquals(3, subscriptionIdentifiers.size());
                } else {
-                  fail("invalid subscription identifer");
+                  fail("invalid subscription identifier");
                }
 
                consumerLatch.countDown();
@@ -1535,7 +1535,7 @@ public class PublishTests extends MQTT5TestSupport {
    @Test
    @Timeout(DEFAULT_TIMEOUT_SEC)
    public void testSubscriptionIdentifierSingleLevel() throws Exception {
-      final CountDownLatch consumerLatch = new CountDownLatch(3);
+      final CountDownLatch consumerLatch = new CountDownLatch(6);
 
       MqttAsyncClient consumer = 
createAsyncPahoClient(RandomUtil.randomUUIDString());
       consumer.connect().waitForCompletion();
@@ -1543,21 +1543,23 @@ public class PublishTests extends MQTT5TestSupport {
          @Override
          public void messageArrived(String topic, MqttMessage message) throws 
Exception {
             try {
-               List<Integer> subscriptionIdentifers = message.getProperties() 
!= null ? message
+               List<Integer> subscriptionIdentifiers = message.getProperties() 
!= null ? message
                   .getProperties()
                   .getSubscriptionIdentifiers() : null;
-               System.out.println("subscriptionIdentifers: " + 
subscriptionIdentifers + "; message: " + message);
-               if (Arrays.equals(message.getPayload(), 
"foo/a".getBytes(StandardCharsets.UTF_8))) {
-                  assertTrue(subscriptionIdentifers.contains(3));
-                  assertEquals(1, subscriptionIdentifers.size());
-               } else if (Arrays.equals(message.getPayload(), 
"foo/a/b".getBytes(StandardCharsets.UTF_8))) {
-                  assertTrue(subscriptionIdentifers.contains(2));
-                  assertEquals(1, subscriptionIdentifers.size());
-               } else if (Arrays.equals(message.getPayload(), 
"foo/a/b/c".getBytes(StandardCharsets.UTF_8))) {
-                  assertTrue(subscriptionIdentifers.contains(1));
-                  assertEquals(1, subscriptionIdentifers.size());
+               System.out.println("subscriptionIdentifiers: " + 
subscriptionIdentifiers + "; message: " + message);
+               if (isPayloadEqual(message, "foo/a")) {
+                  assertTrue(subscriptionIdentifiers.contains(3));
+                  assertEquals(1, subscriptionIdentifiers.size());
+               } else if (isPayloadEqual(message, "foo/a-reset")) {
+                  assertEquals(0, subscriptionIdentifiers.size());
+               } else if (isPayloadEqual(message, "foo/a/b") || 
isPayloadEqual(message, "foo/a/b-reset")) {
+                  assertTrue(subscriptionIdentifiers.contains(2));
+                  assertEquals(1, subscriptionIdentifiers.size());
+               } else if (isPayloadEqual(message, "foo/a/b/c") || 
isPayloadEqual(message, "foo/a/b/c-reset")) {
+                  assertTrue(subscriptionIdentifiers.contains(1));
+                  assertEquals(1, subscriptionIdentifiers.size());
                } else {
-                  fail("invalid subscription identifer");
+                  fail("invalid subscription identifier");
                }
 
                consumerLatch.countDown();
@@ -1584,6 +1586,14 @@ public class PublishTests extends MQTT5TestSupport {
       producer.publish("foo/a", "foo/a".getBytes(StandardCharsets.UTF_8), 2, 
false);
       producer.publish("foo/a/b", "foo/a/b".getBytes(StandardCharsets.UTF_8), 
2, false);
       producer.publish("foo/a/b/c", 
"foo/a/b/c".getBytes(StandardCharsets.UTF_8), 2, false);
+
+      //remove association with subscription ID from the topic `foo/+`
+      consumer.subscribe(new MqttSubscription[]{new MqttSubscription("foo/+", 
2)}, null, null, null).waitForCompletion();
+      producer.publish("foo/a", 
"foo/a-reset".getBytes(StandardCharsets.UTF_8), 2, false);
+      producer.publish("foo/a/b", 
"foo/a/b-reset".getBytes(StandardCharsets.UTF_8), 2, false);
+      producer.publish("foo/a/b/c", 
"foo/a/b/c-reset".getBytes(StandardCharsets.UTF_8), 2, false);
+
+
       producer.disconnect();
       producer.close();
 
@@ -1592,6 +1602,10 @@ public class PublishTests extends MQTT5TestSupport {
       consumer.close();
    }
 
+   private static boolean isPayloadEqual(MqttMessage message, String compare) {
+      return Arrays.equals(message.getPayload(), 
compare.getBytes(StandardCharsets.UTF_8));
+   }
+
    /**
     * [MQTT-3.3.4-9] The Server MUST NOT send more than Receive Maximum QoS 1 
and QoS 2 PUBLISH packets for which it has
     * not received PUBACK, PUBCOMP, or PUBREC with a Reason Code of 128 or 
greater from the Client.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to