This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 7d11cf81ba0c89934eb48ec2653a5ee79e485e7e Author: Justin Bertram <[email protected]> AuthorDate: Wed Apr 13 11:45:14 2022 -0500 ARTEMIS-3777 fix MQTT request/response + nolocal Removing the connection ID property from the actual *message* breaks the nolocal functionality. Removing the property isn't necessary in the first place so this commit reomves that code. --- .../core/protocol/mqtt/MQTTPublishManager.java | 4 -- .../mqtt5/spec/controlpackets/SubscribeTests.java | 71 ++++++++++++++++++++++ 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 17230a4cb6..bcf278e542 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.reader.MessageUtil; import org.jboss.logging.Logger; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE; @@ -395,9 +394,6 @@ public class MQTTPublishManager { isRetain = false; } - // [MQTT-3.8.3-3] remove property used for no-local implementation - message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); - if (session.getState().getClientTopicAliasMaximum() != null) { Integer alias = session.getState().getServerTopicAlias(address); if (alias == null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java index 75c2f91064..a7e9ccafc1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTests.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +35,7 @@ import org.eclipse.paho.mqttv5.client.MqttAsyncClient; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.MqttSubAck; import org.jboss.logging.Logger; import org.junit.Test; @@ -82,6 +84,75 @@ public class SubscribeTests extends MQTT5TestSupport { client.close(); } + /* + * [MQTT-3.8.3-3] + * + * This test was adapted from Test.test_request_response in client_test5.py at https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability + * + * It involves 2 clients subscribing to and performing a request/response on the same topic so it's imperative they + * don't receive the messages that they send themselves. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testRequestResponseNoLocal() throws Exception { + final String TOPIC = RandomUtil.randomString(); + final String REQUEST = "request"; + final String RESPONSE = "response"; + final CountDownLatch aclientLatch = new CountDownLatch(2); + final CountDownLatch bclientLatch = new CountDownLatch(1); + + MqttClient aclient = createPahoClient("aclientid"); + aclient.connect(); + aclient.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + assertEquals(RESPONSE, new String(message.getPayload())); + aclientLatch.countDown(); + } + }); + + MqttClient bclient = createPahoClient("bclientid"); + bclient.connect(); + bclient.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + assertEquals(REQUEST, new String(message.getPayload())); + bclientLatch.countDown(); + MqttMessage m = new MqttMessage(); + m.setPayload(RESPONSE.getBytes(StandardCharsets.UTF_8)); + m.setQos(1); + MqttProperties properties = new MqttProperties(); + properties.setResponseTopic(TOPIC); + properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8)); + m.setProperties(properties); + bclient.publish(TOPIC, m); + } + }); + + MqttSubscription sub = new MqttSubscription(TOPIC, 2); + sub.setNoLocal(true); + aclient.subscribe(new MqttSubscription[]{sub}); + bclient.subscribe(new MqttSubscription[]{sub}); + + MqttMessage m = new MqttMessage(); + m.setPayload(REQUEST.getBytes(StandardCharsets.UTF_8)); + m.setQos(1); + MqttProperties properties = new MqttProperties(); + properties.setResponseTopic(TOPIC); + properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8)); + m.setProperties(properties); + aclient.publish(TOPIC, m); + + assertTrue(bclientLatch.await(2, TimeUnit.SECONDS)); + + Wait.assertEquals(1L, () -> aclientLatch.getCount(), 2000, 100); + assertFalse(aclientLatch.await(2, TimeUnit.SECONDS)); + + aclient.disconnect(); + aclient.close(); + bclient.disconnect(); + bclient.close(); + } + /* * [MQTT-3.8.4-1] When the Server receives a SUBSCRIBE packet from a Client, the Server MUST respond with a SUBACK packet. */
