This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7d11cf81ba0c89934eb48ec2653a5ee79e485e7e
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Apr 13 11:45:14 2022 -0500

    ARTEMIS-3777 fix MQTT request/response + nolocal
    
    Removing the connection ID property from the actual *message* breaks the
    nolocal functionality. Removing the property isn't necessary in the
    first place so this commit reomves that code.
---
 .../core/protocol/mqtt/MQTTPublishManager.java     |  4 --
 .../mqtt5/spec/controlpackets/SubscribeTests.java  | 71 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 4 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 17230a4cb6..bcf278e542 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.reader.MessageUtil;
 import org.jboss.logging.Logger;
 
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
@@ -395,9 +394,6 @@ public class MQTTPublishManager {
             isRetain = false;
          }
 
-         // [MQTT-3.8.3-3] remove property used for no-local implementation
-         message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
-
          if (session.getState().getClientTopicAliasMaximum() != null) {
             Integer alias = session.getState().getServerTopicAlias(address);
             if (alias == null) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java
index 75c2f91064..a7e9ccafc1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
 
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +35,7 @@ import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
 import org.eclipse.paho.mqttv5.client.MqttClient;
 import org.eclipse.paho.mqttv5.common.MqttMessage;
 import org.eclipse.paho.mqttv5.common.MqttSubscription;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
 import org.eclipse.paho.mqttv5.common.packet.MqttSubAck;
 import org.jboss.logging.Logger;
 import org.junit.Test;
@@ -82,6 +84,75 @@ public class SubscribeTests extends MQTT5TestSupport {
       client.close();
    }
 
+   /*
+    * [MQTT-3.8.3-3]
+    *
+    * This test was adapted from Test.test_request_response in client_test5.py 
at https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability
+    *
+    * It involves 2 clients subscribing to and performing a request/response 
on the same topic so it's imperative they
+    * don't receive the messages that they send themselves.
+    */
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testRequestResponseNoLocal() throws Exception {
+      final String TOPIC = RandomUtil.randomString();
+      final String REQUEST = "request";
+      final String RESPONSE = "response";
+      final CountDownLatch aclientLatch = new CountDownLatch(2);
+      final CountDownLatch bclientLatch = new CountDownLatch(1);
+
+      MqttClient aclient = createPahoClient("aclientid");
+      aclient.connect();
+      aclient.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            assertEquals(RESPONSE, new String(message.getPayload()));
+            aclientLatch.countDown();
+         }
+      });
+
+      MqttClient bclient = createPahoClient("bclientid");
+      bclient.connect();
+      bclient.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            assertEquals(REQUEST, new String(message.getPayload()));
+            bclientLatch.countDown();
+            MqttMessage m = new MqttMessage();
+            m.setPayload(RESPONSE.getBytes(StandardCharsets.UTF_8));
+            m.setQos(1);
+            MqttProperties properties = new MqttProperties();
+            properties.setResponseTopic(TOPIC);
+            
properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8));
+            m.setProperties(properties);
+            bclient.publish(TOPIC, m);
+         }
+      });
+
+      MqttSubscription sub = new MqttSubscription(TOPIC, 2);
+      sub.setNoLocal(true);
+      aclient.subscribe(new MqttSubscription[]{sub});
+      bclient.subscribe(new MqttSubscription[]{sub});
+
+      MqttMessage m = new MqttMessage();
+      m.setPayload(REQUEST.getBytes(StandardCharsets.UTF_8));
+      m.setQos(1);
+      MqttProperties properties = new MqttProperties();
+      properties.setResponseTopic(TOPIC);
+      properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8));
+      m.setProperties(properties);
+      aclient.publish(TOPIC, m);
+
+      assertTrue(bclientLatch.await(2, TimeUnit.SECONDS));
+
+      Wait.assertEquals(1L, () -> aclientLatch.getCount(), 2000, 100);
+      assertFalse(aclientLatch.await(2, TimeUnit.SECONDS));
+
+      aclient.disconnect();
+      aclient.close();
+      bclient.disconnect();
+      bclient.close();
+   }
+
    /*
     * [MQTT-3.8.4-1] When the Server receives a SUBSCRIBE packet from a 
Client, the Server MUST respond with a SUBACK packet.
     */

Reply via email to