[ 
https://issues.apache.org/jira/browse/ARTEMIS-4370?focusedWorklogId=872498&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-872498
 ]

ASF GitHub Bot logged work on ARTEMIS-4370:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Jul/23 11:07
            Start Date: 24/Jul/23 11:07
    Worklog Time Spent: 10m 
      Work Description: brusdev commented on code in PR #4563:
URL: https://github.com/apache/activemq-artemis/pull/4563#discussion_r1272098152


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java:
##########
@@ -1020,6 +1020,74 @@ public void messageArrived(String topic, MqttMessage 
message) throws Exception {
       consumer.disconnect();
    }
 
+   /*
+    * From section 3.3.2.3.4 of the MQTT 5 specification:
+    *
+    * A sender can modify the Topic Alias mapping by sending another PUBLISH 
in the same Network Connection with the
+    * same Topic Alias value and a different non-zero length Topic Name.
+    */
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testModifiedTopicAlias() throws Exception {
+      final String TOPIC_1 = this.getTopicName() + "1";
+      final String TOPIC_2 = this.getTopicName() + "2";
+
+      MqttClient consumer1 = createPahoClient("consumer1");
+      CountDownLatch latch1 = new CountDownLatch(2);
+      consumer1.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            String payload = new String(message.getPayload());
+            if (payload.equals("first") || payload.equals("second")) {
+               latch1.countDown();
+            }
+         }
+      });
+      consumer1.connect();
+      consumer1.subscribe(TOPIC_1, 1);
+
+      MqttClient consumer2 = createPahoClient("consumer2");
+      CountDownLatch latch2 = new CountDownLatch(2);
+      consumer2.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            String payload = new String(message.getPayload());
+            if (payload.equals("third") || payload.equals("fourth")) {
+               latch2.countDown();
+            }
+         }
+      });
+      consumer2.connect();
+      consumer2.subscribe(TOPIC_2, 1);
+
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+
+      MqttProperties properties = new MqttProperties();
+      properties.setTopicAlias(1);
+      MqttMessage m = new MqttMessage();
+      m.setProperties(properties);
+      m.setQos(1);
+      m.setRetained(false);
+      m.setPayload("first".getBytes(StandardCharsets.UTF_8));
+      producer.publish(TOPIC_1, m);
+      m.setPayload("second".getBytes(StandardCharsets.UTF_8));
+      producer.publish("", m);

Review Comment:
   The test `testModifiedTopicAlias` is failing in my env with the following 
error:
   ```
   [main] 12:51:06,802 INFO  
[org.apache.activemq.artemis.tests.util.ActiveMQTestBase] **** end #test 
testModifiedTopicAlias[protocol=tcp]() ***
   
   java.lang.IllegalArgumentException: Invalid topic length, should be in 
range[1, 65535]!
   
        at 
org.eclipse.paho.mqttv5.common.util.MqttTopicValidator.validate(MqttTopicValidator.java:69)
        at 
org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1519)
        at 
org.eclipse.paho.mqttv5.client.MqttClient.publish(MqttClient.java:564)
        at 
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.testModifiedTopicAlias(PublishTests.java:1074)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:829)
   ```



##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java:
##########
@@ -189,27 +189,31 @@ void sendToQueue(MqttPublishMessage message, boolean 
internal) throws Exception
          String topic = message.variableHeader().topicName();
          if (session.getVersion() == MQTTVersion.MQTT_5) {
             Integer alias = MQTTUtil.getProperty(Integer.class, 
message.variableHeader().properties(), TOPIC_ALIAS);
-            Integer topicAliasMax = 
session.getProtocolManager().getTopicAliasMaximum();
             if (alias != null) {
+               Integer topicAliasMax = 
session.getProtocolManager().getTopicAliasMaximum();
                if (alias == 0) {
                   // [MQTT-3.3.2-8]
                   throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
                } else if (topicAliasMax != null && alias > topicAliasMax) {
                   // [MQTT-3.3.2-9]
                   throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
-               } else {
-                  topic = session.getState().getClientTopicAlias(alias);
-                  if (topic == null) {
-                     topic = message.variableHeader().topicName();
-                     if (topic == null || topic.length() == 0) {
-                        // using a topic alias with no matching topic in the 
state; potentially [MQTT-3.3.2-7]
-                        throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
-                     }
-                     session.getState().addClientTopicAlias(alias, topic);
+               }
+
+               String existingTopicMapping = 
session.getState().getClientTopicAlias(alias);
+               if (existingTopicMapping == null) {
+                  if (topic == null || topic.length() == 0) {
+                     // using a topic alias with no matching topic in the 
state; potentially [MQTT-3.3.2-7]
+                     throw new 
DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
                   }
+                  logger.debug("Adding new alias {} for topic {}", alias, 
topic);
+                  session.getState().addClientTopicAlias(alias, topic);
+               } else if (topic != null && topic.length() > 0) {
+                  logger.debug("Modifying existing alias {}. New value: {}; 
old value: {}", alias, topic, existingTopicMapping);
+                  session.getState().addClientTopicAlias(alias, topic);

Review Comment:
   The add verb is a bit confusing, what about put?
   ```suggestion
                     session.getState().putClientTopicAlias(alias, topic);
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 872498)
    Time Spent: 20m  (was: 10m)

> Publishing message with existing topic alias and different topic causes 
> message to be sent to incorrect topic
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4370
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4370
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 2.29.0
>            Reporter: Adam Zyzak
>            Assignee: Justin Bertram
>            Priority: Major
>             Fix For: 2.31.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> h3. Description
> When sending MQTT 5.0 publish message with topic alias number that has 
> existing mapping for given connection and new topic, Artemis broker will 
> incorrectly route message to topic from alias mapping instead of topic from 
> publish message. This is incorrect behavior because broker should route 
> message to topic from publish message and update alias mapping with new 
> topic. 
> Section 3.3.2.3.4 Topic Alias in MQTT 5.0 specification states the following:
> {code:java}
> A sender can modify the Topic Alias mapping by sending another PUBLISH in the 
> same Network Connection with the same Topic Alias value and a different 
> non-zero length Topic Name. {code}
>  
> h3. Steps to reproduce
>  # Create 2 MQTT subscribers: first on topic1 and second on topic2
>  # Create 1 MQTT publisher
>  # From publisher send publish message on topic1 with MQTT property 
> TOPIC_ALIAS  set to 1
>  # From publisher send publish message on topic2 with MQTT property 
> TOPIC_ALIAS  set to 1
> h3. Current result
> First message is correctly routed to topic1, but second message is 
> incorrectly routed to topic1 instead of topic2
> Method 
> org.apache.activemq.artemis.core.protocol.mqtt.MQTTPublishManager#sendToQueue 
> adds mapping only if alias does not exist
> h3. Expected result 
> Topic field in MQTT publish should have priority over TOPIC_ALIAS property. 
> Topic alias stored under existing alias number should be updated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to