This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c8d685ee87 ARTEMIS-4365 MQTT retain flag not set correctly
c8d685ee87 is described below
commit c8d685ee871272e49a6b542695bb4b16fca3b3a0
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Jul 17 15:14:29 2023 -0500
ARTEMIS-4365 MQTT retain flag not set correctly
---
.../core/protocol/mqtt/MQTTPublishManager.java | 11 +++-
.../protocol/mqtt/MQTTRetainMessageManager.java | 4 +-
.../artemis/core/protocol/mqtt/MQTTUtil.java | 2 +
.../mqtt/MQTTInterceptorPropertiesTest.java | 34 ++++------
.../artemis/tests/integration/mqtt/MQTTTest.java | 72 ++++++++++++++++++++-
.../tests/integration/mqtt/MQTTTestSupport.java | 22 +++++++
.../mqtt5/spec/controlpackets/PublishTests.java | 74 ++++++++++++++++------
7 files changed, 174 insertions(+), 45 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 31a5409a44..57b9605d21 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -29,6 +29,7 @@ import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -60,6 +61,7 @@ import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCR
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CONTENT_TYPE_KEY;
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CORRELATION_DATA_KEY;
+import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY;
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY;
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_PAYLOAD_FORMAT_INDICATOR_KEY;
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_RESPONSE_TOPIC_KEY;
@@ -404,12 +406,15 @@ public class MQTTPublishManager {
// [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages.
boolean redelivery = qos == 0 ? false : (deliveryCount > 1);
- boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
+ boolean isRetain =
message.containsProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY);
MqttProperties mqttProperties = getPublishProperties(message);
if (session.getVersion() == MQTTVersion.MQTT_5) {
- if (session.getState().getSubscription(message.getAddress()) != null
&&
!session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished())
{
- isRetain = false;
+ if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY))
{
+ MqttTopicSubscription sub =
session.getState().getSubscription(message.getAddress());
+ if (sub != null && sub.option().isRetainAsPublished()) {
+ isRetain = true;
+ }
}
if (session.getState().getClientTopicAliasMaximum() != null) {
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 6b3e03a5d4..1a21aa2c9f 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -28,6 +28,8 @@ import
org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY;
+
public class MQTTRetainMessageManager {
private MQTTSession session;
@@ -59,7 +61,6 @@ public class MQTTRetainMessageManager {
Message message =
LargeServerMessageImpl.checkLargeMessage(messageParameter,
session.getServer().getStorageManager());
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()),
queue, tx);
}
-
}
void addRetainedMessagesToQueue(Queue queue, String address) throws
Exception {
@@ -82,6 +83,7 @@ public class MQTTRetainMessageManager {
}
}
Message message =
ref.getMessage().copy(session.getServer().getStorageManager().generateID());
+
message.putStringProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY,
(String) null);
sendToQueue(message, queue, tx);
}
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index b7bc1f8b9d..5ac19a0f0a 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -96,6 +96,8 @@ public class MQTTUtil {
public static final SimpleString MQTT_MESSAGE_RETAIN_KEY =
SimpleString.toSimpleString("mqtt.message.retain");
+ public static final SimpleString
MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY =
SimpleString.toSimpleString("mqtt.message.retain.initial.distribution");
+
public static final SimpleString MQTT_PAYLOAD_FORMAT_INDICATOR_KEY =
SimpleString.toSimpleString("mqtt.payload.format.indicator");
public static final SimpleString MQTT_RESPONSE_TOPIC_KEY =
SimpleString.toSimpleString("mqtt.response.topic");
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
index 279c38afb7..e1ef5ffc13 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
@@ -64,12 +64,6 @@ public class MQTTInterceptorPropertiesTest extends
MQTTTestSupport {
expectedProperties.put(MESSAGE_TEXT, msgText);
expectedProperties.put(RETAINED, retained);
-
- final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
- initializeConnection(subscribeProvider);
-
- subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
-
final CountDownLatch latch = new CountDownLatch(1);
MQTTInterceptor incomingInterceptor = (packet, connection) -> {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
@@ -89,25 +83,25 @@ public class MQTTInterceptorPropertiesTest extends
MQTTTestSupport {
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
+ final MQTTClientProvider publishProvider = getMQTTClientProvider();
+ initializeConnection(publishProvider);
+ publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE,
retained);
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- byte[] payload = subscribeProvider.receive(10000);
- assertNotNull("Should get a message", payload);
- latch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
+ initializeConnection(subscribeProvider);
+ subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
+
+ Thread thread = new Thread(() -> {
+ try {
+ byte[] payload = subscribeProvider.receive(10000);
+ assertNotNull("Should get a message", payload);
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
}
});
thread.start();
- final MQTTClientProvider publishProvider = getMQTTClientProvider();
- initializeConnection(publishProvider);
- publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE,
retained);
-
latch.await(10, TimeUnit.SECONDS);
subscribeProvider.disconnect();
publishProvider.disconnect();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index 7ceb43fa87..964a70f87f 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -23,8 +23,10 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.EOFException;
+import java.lang.invoke.MethodHandles;
import java.net.ProtocolException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -61,6 +63,8 @@ import
org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
@@ -74,7 +78,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
import static
org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
@@ -2219,4 +2222,71 @@ public class MQTTTest extends MQTTTestSupport {
Wait.assertTrue(() ->
server.locateQueue(RETAINED_QUEUE).getMessageCount() == 0, 3000, 50);
Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE) == null, 3000,
50);
}
+
+ /*
+ * [MQTT-3.3.1-9] When sending a PUBLISH Packet to a Client the
Server...MUST set the RETAIN flag to 0 when a PUBLISH
+ * Packet is sent to a Client because it matches an *established*
subscription regardless of how the flag was set in
+ * the message it received.
+ */
+ @Test(timeout = 60 * 1000)
+ public void testRetainFlagOnEstablishedSubscription() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ final String topic = getTopicName();
+
+ MqttClient subscriber = createPaho3_1_1Client("subscriber");
+ subscriber.setCallback(new DefaultMqtt3Callback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ if (!message.isRetained()) {
+ latch.countDown();
+ }
+ }
+ });
+ subscriber.connect();
+ subscriber.subscribe(topic, 1);
+
+ MqttClient publisher = createPaho3_1_1Client("publisher");
+ publisher.connect();
+ publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1,
true);
+ publisher.disconnect();
+ publisher.close();
+
+ assertTrue("Did not receive expected message within timeout",
latch.await(1, TimeUnit.SECONDS));
+
+ subscriber.disconnect();
+ subscriber.close();
+ }
+
+ /*
+ * [MQTT-3.3.1-8] When sending a PUBLISH Packet to a Client the Server MUST
set the RETAIN flag to 1 if a message is
+ * sent as a result of a new subscription being made by a Client.
+ */
+ @Test(timeout = 60 * 1000)
+ public void testRetainFlagOnNewSubscription() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ final String topic = getTopicName();
+
+ MqttClient publisher = createPaho3_1_1Client("publisher");
+ publisher.connect();
+ publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1,
true);
+ publisher.disconnect();
+ publisher.close();
+
+ MqttClient subscriber = createPaho3_1_1Client("subscriber");
+ subscriber.setCallback(new DefaultMqtt3Callback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ if (message.isRetained()) {
+ latch.countDown();
+ }
+ }
+ });
+ subscriber.connect();
+ subscriber.subscribe(topic, 1);
+
+ assertTrue("Did not receive expected message within timeout",
latch.await(1, TimeUnit.SECONDS));
+
+ subscriber.disconnect();
+ subscriber.close();
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
index 82e9a337b4..9f8bcd0d0a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
@@ -56,6 +56,10 @@ import
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.fusesource.hawtdispatch.DispatchPriority;
import org.fusesource.hawtdispatch.internal.DispatcherConfig;
import org.fusesource.mqtt.client.MQTT;
@@ -374,6 +378,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return mqtt;
}
+ protected MqttClient createPaho3_1_1Client(String clientId) throws
org.eclipse.paho.client.mqttv3.MqttException {
+ return new MqttClient("tcp://localhost:" + port, clientId, new
MemoryPersistence());
+ }
+
public Map<String, MQTTSessionState> getSessions() {
Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT");
if (acceptor instanceof AbstractAcceptor) {
@@ -481,4 +489,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return messageCount;
}
}
+
+ protected interface DefaultMqtt3Callback extends MqttCallback {
+ @Override
+ default void connectionLost(Throwable cause) {
+ }
+
+ @Override
+ default void messageArrived(String topic,
org.eclipse.paho.client.mqttv3.MqttMessage message) throws Exception {
+ }
+
+ @Override
+ default void deliveryComplete(IMqttDeliveryToken token) {
+ }
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index b06999fddb..84ad469961 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -339,6 +339,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
+ * When a new Non‑shared Subscription is made, the last retained message,
if any, on each matching topic name is sent
+ * to the Client as directed by the Retain Handling Subscription Option.
These messages are sent with the RETAIN flag
+ * set to 1. Which retained messages are sent is controlled by the Retain
Handling Subscription Option. At the time
+ * of the Subscription...
+ *
* [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the
retained messages matching the Topic Filter
* of the subscription to the Client.
*
@@ -350,6 +355,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
+ * When a new Non‑shared Subscription is made, the last retained message,
if any, on each matching topic name is sent
+ * to the Client as directed by the Retain Handling Subscription Option.
These messages are sent with the RETAIN flag
+ * set to 1. Which retained messages are sent is controlled by the Retain
Handling Subscription Option. At the time
+ * of the Subscription...
+ *
* [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the
retained messages matching the Topic Filter
* of the subscription to the Client.
*
@@ -361,6 +371,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
+ * When a new Non‑shared Subscription is made, the last retained message,
if any, on each matching topic name is sent
+ * to the Client as directed by the Retain Handling Subscription Option.
These messages are sent with the RETAIN flag
+ * set to 1. Which retained messages are sent is controlled by the Retain
Handling Subscription Option. At the time
+ * of the Subscription...
+ *
* [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the
retained messages matching the Topic Filter
* of the subscription to the Client.
*
@@ -408,6 +423,7 @@ public class PublishTests extends MQTT5TestSupport {
}
}
assertTrue(payloadMatched);
+ assertTrue(message.isRetained());
latch.countDown();
}
});
@@ -432,6 +448,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
+ * When a new Non‑shared Subscription is made, the last retained message,
if any, on each matching topic name is sent
+ * to the Client as directed by the Retain Handling Subscription Option.
These messages are sent with the RETAIN flag
+ * set to 1. Which retained messages are sent is controlled by the Retain
Handling Subscription Option. At the time
+ * of the Subscription...
+ *
* [MQTT-3.3.1-10] If Retain Handling is set to 1 then if the subscription
did not already exist, the Server MUST
* send all retained message matching the Topic Filter of the subscription
to the Client, and if the subscription did
* exist the Server MUST NOT send the retained messages.
@@ -462,8 +483,9 @@ public class PublishTests extends MQTT5TestSupport {
final CountDownLatch latch = new CountDownLatch(1);
consumer.setCallback(new DefaultMqttCallback() {
@Override
- public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ public void messageArrived(String topic, MqttMessage message) {
assertEqualsByteArrays("retained".getBytes(StandardCharsets.UTF_8),
message.getPayload());
+ assertTrue(message.isRetained());
latch.countDown();
}
});
@@ -492,7 +514,12 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
- * [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send
the retained
+ * When a new Non‑shared Subscription is made, the last retained message,
if any, on each matching topic name is sent
+ * to the Client as directed by the Retain Handling Subscription Option.
These messages are sent with the RETAIN flag
+ * set to 1. Which retained messages are sent is controlled by the Retain
Handling Subscription Option. At the time
+ * of the Subscription...
+ *
+ * [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send
the retained messages
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testRetainHandlingTwo() throws Exception {
@@ -527,23 +554,18 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
+ * The setting of the RETAIN flag in an Application Message forwarded by
the Server from an *established* connection
+ * is controlled by the Retain As Published subscription option.
+ *
* [MQTT-3.3.1-12] If the value of Retain As Published subscription option
is set to 0, the Server MUST set the
* RETAIN flag to 0 when forwarding an Application Message regardless of
how the RETAIN flag was set in the received
* PUBLISH packet.
*/
@Test(timeout = DEFAULT_TIMEOUT)
- public void testRetainAsPublishedZero() throws Exception {
+ public void testRetainAsPublishedZeroOnEstablishedSubscription() throws
Exception {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
- // send retained message
- MqttClient producer = createPahoClient("producer");
- producer.connect();
- producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
- producer.disconnect();
- producer.close();
-
// create consumer
final CountDownLatch latch = new CountDownLatch(1);
MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -560,28 +582,31 @@ public class PublishTests extends MQTT5TestSupport {
subscription.setRetainAsPublished(false);
consumer.subscribe(new MqttSubscription[]{subscription});
+ // send retained message
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(TOPIC, "retained".getBytes(), 2, true);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ producer.disconnect();
+ producer.close();
+
assertTrue(latch.await(2, TimeUnit.SECONDS));
consumer.disconnect();
consumer.close();
}
/*
+ * The setting of the RETAIN flag in an Application Message forwarded by
the Server from an *established* connection
+ * is controlled by the Retain As Published subscription option.
+ *
* [MQTT-3.3.1-13] If the value of Retain As Published subscription option
is set to 1, the Server MUST set the
* RETAIN flag equal to the RETAIN flag in the received PUBLISH packet.
*/
@Test(timeout = DEFAULT_TIMEOUT)
- public void testRetainAsPublishedOne() throws Exception {
+ public void testRetainAsPublishedOneOnEstablishedSubscription() throws
Exception {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
- // send retained message
- MqttClient producer = createPahoClient("producer");
- producer.connect();
- producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
- producer.disconnect();
- producer.close();
-
// create consumer
final CountDownLatch latchOne = new CountDownLatch(1);
final CountDownLatch latchTwo = new CountDownLatch(1);
@@ -605,6 +630,15 @@ public class PublishTests extends MQTT5TestSupport {
MqttSubscription subscription = new MqttSubscription(TOPIC, 2);
subscription.setRetainAsPublished(true);
consumer.subscribe(new MqttSubscription[]{subscription});
+
+ // send retained message
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(TOPIC, "retained".getBytes(), 2, true);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ producer.disconnect();
+ producer.close();
+
assertTrue(latchOne.await(2, TimeUnit.SECONDS));
producer = createPahoClient("producer");