nandorsoma commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966166779


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import javax.json.Json;
+import javax.json.JsonArray;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
-        StandardMqttMessage lastPublishedMessage = 
mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private TestRunner testRunner;
+    private MqttTestClient mqttTestClient;
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean 
or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final JsonArray testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS,
 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, 
false);
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue 
should be empty.");

Review Comment:
   Changed, thanks for the idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to