gemmellr commented on code in PR #4556:
URL: https://github.com/apache/activemq-artemis/pull/4556#discussion_r1266701120


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java:
##########
@@ -2219,4 +2223,71 @@ public void testAutoDeleteRetainedQueue() throws 
Exception {
       Wait.assertTrue(() -> 
server.locateQueue(RETAINED_QUEUE).getMessageCount() == 0, 3000, 50);
       Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE) == null, 3000, 
50);
    }
+
+   /*
+    * [MQTT-3.3.1-9] When sending a PUBLISH Packet to a Client the 
Server...MUST set the RETAIN flag to 0 when a PUBLISH
+    * Packet is sent to a Client because it matches an *established* 
subscription regardless of how the flag was set in
+    * the message it received.
+    */
+   @Test(timeout = 60 * 1000)
+   public void testRetainFlagOnEstablishedSubscription() throws Exception {
+      CountDownLatch latch = new CountDownLatch(1);
+      final String topic = RandomUtil.randomString();
+
+      MqttClient subscriber = createPaho3_1_1Client("subscriber");
+      subscriber.connect();
+      subscriber.subscribe(topic, 1);
+      subscriber.setCallback(new DefaultMqtt3Callback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            if (!message.isRetained()) {
+               latch.countDown();
+            }
+         }
+      });
+
+      MqttClient publisher = createPaho3_1_1Client("publisher");
+      publisher.connect();
+      publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, 
true);
+      publisher.disconnect();
+      publisher.close();
+
+      assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
+
+      subscriber.disconnect();
+      subscriber.close();
+   }
+
+   /*
+    * [MQTT-3.3.1-8] When sending a PUBLISH Packet to a Client the Server MUST 
set the RETAIN flag to 1 if a message is
+    * sent as a result of a new subscription being made by a Client.
+    */
+   @Test(timeout = 60 * 1000)
+   public void testRetainFlagOnNewSubscription() throws Exception {
+      CountDownLatch latch = new CountDownLatch(1);
+      final String topic = RandomUtil.randomString();
+
+      MqttClient publisher = createPaho3_1_1Client("publisher");
+      publisher.connect();
+      publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, 
true);
+      publisher.disconnect();
+      publisher.close();
+
+      MqttClient subscriber = createPaho3_1_1Client("subscriber");
+      subscriber.connect();
+      subscriber.subscribe(topic, 1);
+      subscriber.setCallback(new DefaultMqtt3Callback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            if (message.isRetained()) {
+               latch.countDown();
+            }
+         }
+      });
+
+      assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));

Review Comment:
   I saw this test fail here on a CI run.
   
   Adding a failure message would be good. Its annoying having to go and match 
up line numbers in remote repositories to see what the failure actually meant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to