turcsanyip commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r966731657


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,368 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static java.util.Arrays.asList;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
-        StandardMqttMessage lastPublishedMessage = 
mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private MqttTestClient mqttTestClient;
+    private TestRunner testRunner;
+
+    @AfterEach
+    public void cleanup() {
+        testRunner = null;
+        mqttTestClient = null;
+    }
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean 
or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS,
 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName),
 "Failed attribute should not be present on the FlowFile");
+    }
+
+    @Test
+    public void testPublishRecordSetFailed() throws InitializationException {
+        mqttTestClient = Mockito.spy(new 
MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE,
 1));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertEquals("1", 
failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record 
is expected to be published successfully.");
+    }
+
+    @Test
+    public void 
testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws 
InitializationException {
+        mqttTestClient = Mockito.spy(new 
MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE,
 2));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        assertEquals("2", 
failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record 
is expected to be published successfully.");
+    }
+
+    @Test
+    public void 
testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws 
InitializationException {
+        mqttTestClient = Mockito.spy(new 
MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER,
 3));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        
assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+                publishFailedIndexAttributeName + " is expected to be removed 
after all remaining records have been published successfully.");
+    }
+
+    private void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
+        final Pair<String, StandardMqttMessage> lastPublished = 
mqttTestClient.getLastPublished();
+        final String lastPublishedTopic = lastPublished.getLeft();
+        final StandardMqttMessage lastPublishedMessage = 
lastPublished.getRight();
         assertEquals(Arrays.toString(payload), 
Arrays.toString(lastPublishedMessage.getPayload()));
         assertEquals(qos, lastPublishedMessage.getQos());
         assertEquals(retain, lastPublishedMessage.isRetained());
-        assertEquals(topic, lastPublishedTopic);
+        assertEquals(TOPIC, lastPublishedTopic);
     }
 
-    private MqttTestClient mqttTestClient;
+    private void verifyNoMorePublished() {
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue 
should be empty.");
+    }
+
+    private ProvenanceEventRecord assertProvenanceEvent() {
+        final List<ProvenanceEventRecord> provenanceEvents = 
testRunner.getProvenanceEvents();
+        assertNotNull(provenanceEvents);
+        assertEquals(1, provenanceEvents.size());
 
-    public class UnitTestablePublishMqtt extends PublishMQTT {
+        final ProvenanceEventRecord event = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, event.getEventType());
 
-        public UnitTestablePublishMqtt(){
-            super();
-        }
+        return event;
+    }
 
-        @Override
-        protected MqttClient createMqttClient() throws MqttException {
-            mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
-            return mqttTestClient;
-        }
+    private void assertProvenanceEvent(String expectedDetails) {
+        final ProvenanceEventRecord event = assertProvenanceEvent();
+        assertEquals(expectedDetails, event.getDetails());
     }
 
-    @BeforeEach
-    public void init() {
-        UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
-        testRunner = TestRunners.newTestRunner(proc);
-        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
-        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
-        topic = "testTopic";
-        testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
+    private static ArrayNode createTestJsonInput() {
+        final ObjectMapper mapper = new ObjectMapper();

Review Comment:
   I think it is a best practice to initialize `ObjectMapper` once and store it 
in a `static` field because it seems to be a heavy-weight object but 
thread-safe.
   
   It can be modified in a future commit. Will go ahead with merging this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to