This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c8d685ee87 ARTEMIS-4365 MQTT retain flag not set correctly
c8d685ee87 is described below

commit c8d685ee871272e49a6b542695bb4b16fca3b3a0
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Jul 17 15:14:29 2023 -0500

    ARTEMIS-4365 MQTT retain flag not set correctly
---
 .../core/protocol/mqtt/MQTTPublishManager.java     | 11 +++-
 .../protocol/mqtt/MQTTRetainMessageManager.java    |  4 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java       |  2 +
 .../mqtt/MQTTInterceptorPropertiesTest.java        | 34 ++++------
 .../artemis/tests/integration/mqtt/MQTTTest.java   | 72 ++++++++++++++++++++-
 .../tests/integration/mqtt/MQTTTestSupport.java    | 22 +++++++
 .../mqtt5/spec/controlpackets/PublishTests.java    | 74 ++++++++++++++++------
 7 files changed, 174 insertions(+), 45 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 31a5409a44..57b9605d21 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -29,6 +29,7 @@ import io.netty.handler.codec.mqtt.MqttProperties;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -60,6 +61,7 @@ import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCR
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CONTENT_TYPE_KEY;
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CORRELATION_DATA_KEY;
+import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY;
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY;
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_PAYLOAD_FORMAT_INDICATOR_KEY;
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_RESPONSE_TOPIC_KEY;
@@ -404,12 +406,15 @@ public class MQTTPublishManager {
       // [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages.
       boolean redelivery = qos == 0 ? false : (deliveryCount > 1);
 
-      boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
+      boolean isRetain = 
message.containsProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY);
       MqttProperties mqttProperties = getPublishProperties(message);
 
       if (session.getVersion() == MQTTVersion.MQTT_5) {
-         if (session.getState().getSubscription(message.getAddress()) != null 
&& 
!session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished())
 {
-            isRetain = false;
+         if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY)) 
{
+            MqttTopicSubscription sub = 
session.getState().getSubscription(message.getAddress());
+            if (sub != null && sub.option().isRetainAsPublished()) {
+               isRetain = true;
+            }
          }
 
          if (session.getState().getClientTopicAliasMaximum() != null) {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 6b3e03a5d4..1a21aa2c9f 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -28,6 +28,8 @@ import 
org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 
+import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY;
+
 public class MQTTRetainMessageManager {
 
    private MQTTSession session;
@@ -59,7 +61,6 @@ public class MQTTRetainMessageManager {
          Message message = 
LargeServerMessageImpl.checkLargeMessage(messageParameter, 
session.getServer().getStorageManager());
          
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), 
queue, tx);
       }
-
    }
 
    void addRetainedMessagesToQueue(Queue queue, String address) throws 
Exception {
@@ -82,6 +83,7 @@ public class MQTTRetainMessageManager {
                      }
                   }
                   Message message = 
ref.getMessage().copy(session.getServer().getStorageManager().generateID());
+                  
message.putStringProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY, 
(String) null);
                   sendToQueue(message, queue, tx);
                }
             }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index b7bc1f8b9d..5ac19a0f0a 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -96,6 +96,8 @@ public class MQTTUtil {
 
    public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = 
SimpleString.toSimpleString("mqtt.message.retain");
 
+   public static final SimpleString 
MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY = 
SimpleString.toSimpleString("mqtt.message.retain.initial.distribution");
+
    public static final SimpleString MQTT_PAYLOAD_FORMAT_INDICATOR_KEY = 
SimpleString.toSimpleString("mqtt.payload.format.indicator");
 
    public static final SimpleString MQTT_RESPONSE_TOPIC_KEY = 
SimpleString.toSimpleString("mqtt.response.topic");
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
index 279c38afb7..e1ef5ffc13 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java
@@ -64,12 +64,6 @@ public class MQTTInterceptorPropertiesTest extends 
MQTTTestSupport {
       expectedProperties.put(MESSAGE_TEXT, msgText);
       expectedProperties.put(RETAINED, retained);
 
-
-      final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
-      initializeConnection(subscribeProvider);
-
-      subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
-
       final CountDownLatch latch = new CountDownLatch(1);
       MQTTInterceptor incomingInterceptor = (packet, connection) -> {
          if (packet.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
@@ -89,25 +83,25 @@ public class MQTTInterceptorPropertiesTest extends 
MQTTTestSupport {
       server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
 
+      final MQTTClientProvider publishProvider = getMQTTClientProvider();
+      initializeConnection(publishProvider);
+      publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, 
retained);
 
-      Thread thread = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               byte[] payload = subscribeProvider.receive(10000);
-               assertNotNull("Should get a message", payload);
-               latch.countDown();
-            } catch (Exception e) {
-               e.printStackTrace();
-            }
+      final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
+      initializeConnection(subscribeProvider);
+      subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
+
+      Thread thread = new Thread(() -> {
+         try {
+            byte[] payload = subscribeProvider.receive(10000);
+            assertNotNull("Should get a message", payload);
+            latch.countDown();
+         } catch (Exception e) {
+            e.printStackTrace();
          }
       });
       thread.start();
 
-      final MQTTClientProvider publishProvider = getMQTTClientProvider();
-      initializeConnection(publishProvider);
-      publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, 
retained);
-
       latch.await(10, TimeUnit.SECONDS);
       subscribeProvider.disconnect();
       publishProvider.disconnect();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index 7ceb43fa87..964a70f87f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -23,8 +23,10 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import java.io.EOFException;
+import java.lang.invoke.MethodHandles;
 import java.net.ProtocolException;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -61,6 +63,8 @@ import 
org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
@@ -74,7 +78,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 import static 
org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
 
@@ -2219,4 +2222,71 @@ public class MQTTTest extends MQTTTestSupport {
       Wait.assertTrue(() -> 
server.locateQueue(RETAINED_QUEUE).getMessageCount() == 0, 3000, 50);
       Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE) == null, 3000, 
50);
    }
+
+   /*
+    * [MQTT-3.3.1-9] When sending a PUBLISH Packet to a Client the 
Server...MUST set the RETAIN flag to 0 when a PUBLISH
+    * Packet is sent to a Client because it matches an *established* 
subscription regardless of how the flag was set in
+    * the message it received.
+    */
+   @Test(timeout = 60 * 1000)
+   public void testRetainFlagOnEstablishedSubscription() throws Exception {
+      CountDownLatch latch = new CountDownLatch(1);
+      final String topic = getTopicName();
+
+      MqttClient subscriber = createPaho3_1_1Client("subscriber");
+      subscriber.setCallback(new DefaultMqtt3Callback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            if (!message.isRetained()) {
+               latch.countDown();
+            }
+         }
+      });
+      subscriber.connect();
+      subscriber.subscribe(topic, 1);
+
+      MqttClient publisher = createPaho3_1_1Client("publisher");
+      publisher.connect();
+      publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, 
true);
+      publisher.disconnect();
+      publisher.close();
+
+      assertTrue("Did not receive expected message within timeout", 
latch.await(1, TimeUnit.SECONDS));
+
+      subscriber.disconnect();
+      subscriber.close();
+   }
+
+   /*
+    * [MQTT-3.3.1-8] When sending a PUBLISH Packet to a Client the Server MUST 
set the RETAIN flag to 1 if a message is
+    * sent as a result of a new subscription being made by a Client.
+    */
+   @Test(timeout = 60 * 1000)
+   public void testRetainFlagOnNewSubscription() throws Exception {
+      CountDownLatch latch = new CountDownLatch(1);
+      final String topic = getTopicName();
+
+      MqttClient publisher = createPaho3_1_1Client("publisher");
+      publisher.connect();
+      publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, 
true);
+      publisher.disconnect();
+      publisher.close();
+
+      MqttClient subscriber = createPaho3_1_1Client("subscriber");
+      subscriber.setCallback(new DefaultMqtt3Callback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            if (message.isRetained()) {
+               latch.countDown();
+            }
+         }
+      });
+      subscriber.connect();
+      subscriber.subscribe(topic, 1);
+
+      assertTrue("Did not receive expected message within timeout", 
latch.await(1, TimeUnit.SECONDS));
+
+      subscriber.disconnect();
+      subscriber.close();
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
index 82e9a337b4..9f8bcd0d0a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
@@ -56,6 +56,10 @@ import 
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.fusesource.hawtdispatch.DispatchPriority;
 import org.fusesource.hawtdispatch.internal.DispatcherConfig;
 import org.fusesource.mqtt.client.MQTT;
@@ -374,6 +378,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       return mqtt;
    }
 
+   protected MqttClient createPaho3_1_1Client(String clientId) throws 
org.eclipse.paho.client.mqttv3.MqttException {
+      return new MqttClient("tcp://localhost:" + port, clientId, new 
MemoryPersistence());
+   }
+
    public Map<String, MQTTSessionState> getSessions() {
       Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT");
       if (acceptor instanceof AbstractAcceptor) {
@@ -481,4 +489,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
          return messageCount;
       }
    }
+
+   protected interface DefaultMqtt3Callback extends MqttCallback {
+      @Override
+      default void connectionLost(Throwable cause) {
+      }
+
+      @Override
+      default void messageArrived(String topic, 
org.eclipse.paho.client.mqttv3.MqttMessage message) throws Exception {
+      }
+
+      @Override
+      default void deliveryComplete(IMqttDeliveryToken token) {
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index b06999fddb..84ad469961 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -339,6 +339,11 @@ public class PublishTests extends MQTT5TestSupport {
    }
 
    /*
+    * When a new Non‑shared Subscription is made, the last retained message, 
if any, on each matching topic name is sent
+    * to the Client as directed by the Retain Handling Subscription Option. 
These messages are sent with the RETAIN flag
+    * set to 1. Which retained messages are sent is controlled by the Retain 
Handling Subscription Option. At the time
+    * of the Subscription...
+    *
     * [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the 
retained messages matching the Topic Filter
     * of the subscription to the Client.
     *
@@ -350,6 +355,11 @@ public class PublishTests extends MQTT5TestSupport {
    }
 
    /*
+    * When a new Non‑shared Subscription is made, the last retained message, 
if any, on each matching topic name is sent
+    * to the Client as directed by the Retain Handling Subscription Option. 
These messages are sent with the RETAIN flag
+    * set to 1. Which retained messages are sent is controlled by the Retain 
Handling Subscription Option. At the time
+    * of the Subscription...
+    *
     * [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the 
retained messages matching the Topic Filter
     * of the subscription to the Client.
     *
@@ -361,6 +371,11 @@ public class PublishTests extends MQTT5TestSupport {
    }
 
    /*
+    * When a new Non‑shared Subscription is made, the last retained message, 
if any, on each matching topic name is sent
+    * to the Client as directed by the Retain Handling Subscription Option. 
These messages are sent with the RETAIN flag
+    * set to 1. Which retained messages are sent is controlled by the Retain 
Handling Subscription Option. At the time
+    * of the Subscription...
+    *
     * [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the 
retained messages matching the Topic Filter
     * of the subscription to the Client.
     *
@@ -408,6 +423,7 @@ public class PublishTests extends MQTT5TestSupport {
                }
             }
             assertTrue(payloadMatched);
+            assertTrue(message.isRetained());
             latch.countDown();
          }
       });
@@ -432,6 +448,11 @@ public class PublishTests extends MQTT5TestSupport {
    }
 
    /*
+    * When a new Non‑shared Subscription is made, the last retained message, 
if any, on each matching topic name is sent
+    * to the Client as directed by the Retain Handling Subscription Option. 
These messages are sent with the RETAIN flag
+    * set to 1. Which retained messages are sent is controlled by the Retain 
Handling Subscription Option. At the time
+    * of the Subscription...
+    *
     * [MQTT-3.3.1-10]  If Retain Handling is set to 1 then if the subscription 
did not already exist, the Server MUST
     * send all retained message matching the Topic Filter of the subscription 
to the Client, and if the subscription did
     * exist the Server MUST NOT send the retained messages.
@@ -462,8 +483,9 @@ public class PublishTests extends MQTT5TestSupport {
       final CountDownLatch latch = new CountDownLatch(1);
       consumer.setCallback(new DefaultMqttCallback() {
          @Override
-         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+         public void messageArrived(String topic, MqttMessage message) {
             
assertEqualsByteArrays("retained".getBytes(StandardCharsets.UTF_8), 
message.getPayload());
+            assertTrue(message.isRetained());
             latch.countDown();
          }
       });
@@ -492,7 +514,12 @@ public class PublishTests extends MQTT5TestSupport {
    }
 
    /*
-    * [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send 
the retained
+    * When a new Non‑shared Subscription is made, the last retained message, 
if any, on each matching topic name is sent
+    * to the Client as directed by the Retain Handling Subscription Option. 
These messages are sent with the RETAIN flag
+    * set to 1. Which retained messages are sent is controlled by the Retain 
Handling Subscription Option. At the time
+    * of the Subscription...
+    *
+    * [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send 
the retained messages
     */
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testRetainHandlingTwo() throws Exception {
@@ -527,23 +554,18 @@ public class PublishTests extends MQTT5TestSupport {
    }
 
    /*
+    * The setting of the RETAIN flag in an Application Message forwarded by 
the Server from an *established* connection
+    * is controlled by the Retain As Published subscription option.
+    *
     * [MQTT-3.3.1-12] If the value of Retain As Published subscription option 
is set to 0, the Server MUST set the
     * RETAIN flag to 0 when forwarding an Application Message regardless of 
how the RETAIN flag was set in the received
     * PUBLISH packet.
     */
    @Test(timeout = DEFAULT_TIMEOUT)
-   public void testRetainAsPublishedZero() throws Exception {
+   public void testRetainAsPublishedZeroOnEstablishedSubscription() throws 
Exception {
       final String CONSUMER_ID = RandomUtil.randomString();
       final String TOPIC = this.getTopicName();
 
-      // send retained message
-      MqttClient producer = createPahoClient("producer");
-      producer.connect();
-      producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
-      producer.disconnect();
-      producer.close();
-
       // create consumer
       final CountDownLatch latch = new CountDownLatch(1);
       MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -560,28 +582,31 @@ public class PublishTests extends MQTT5TestSupport {
       subscription.setRetainAsPublished(false);
       consumer.subscribe(new MqttSubscription[]{subscription});
 
+      // send retained message
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(TOPIC, "retained".getBytes(), 2, true);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      producer.disconnect();
+      producer.close();
+
       assertTrue(latch.await(2, TimeUnit.SECONDS));
       consumer.disconnect();
       consumer.close();
    }
 
    /*
+    * The setting of the RETAIN flag in an Application Message forwarded by 
the Server from an *established* connection
+    * is controlled by the Retain As Published subscription option.
+    *
     * [MQTT-3.3.1-13] If the value of Retain As Published subscription option 
is set to 1, the Server MUST set the
     * RETAIN flag equal to the RETAIN flag in the received PUBLISH packet.
     */
    @Test(timeout = DEFAULT_TIMEOUT)
-   public void testRetainAsPublishedOne() throws Exception {
+   public void testRetainAsPublishedOneOnEstablishedSubscription() throws 
Exception {
       final String CONSUMER_ID = RandomUtil.randomString();
       final String TOPIC = this.getTopicName();
 
-      // send retained message
-      MqttClient producer = createPahoClient("producer");
-      producer.connect();
-      producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
-      producer.disconnect();
-      producer.close();
-
       // create consumer
       final CountDownLatch latchOne = new CountDownLatch(1);
       final CountDownLatch latchTwo = new CountDownLatch(1);
@@ -605,6 +630,15 @@ public class PublishTests extends MQTT5TestSupport {
       MqttSubscription subscription = new MqttSubscription(TOPIC, 2);
       subscription.setRetainAsPublished(true);
       consumer.subscribe(new MqttSubscription[]{subscription});
+
+      // send retained message
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(TOPIC, "retained".getBytes(), 2, true);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      producer.disconnect();
+      producer.close();
+
       assertTrue(latchOne.await(2, TimeUnit.SECONDS));
 
       producer = createPahoClient("producer");

Reply via email to