turcsanyip commented on code in PR #6373:
URL: https://github.com/apache/nifi/pull/6373#discussion_r965946668
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
package org.apache.nifi.processors.mqtt;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import javax.json.Json;
+import javax.json.JsonArray;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
- @Override
- public void verifyPublishedMessage(byte[] payload, int qos, boolean
retain) {
- StandardMqttMessage lastPublishedMessage =
mqttTestClient.getLastPublishedMessage();
- String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+ private static final String BROKER_URI = "tcp://localhost:1883";
+ private static final String TOPIC = "testTopic";
+ private static final String RETAIN = "false";
+
+ private TestRunner testRunner;
+ private MqttTestClient mqttTestClient;
+
+ @Test
+ public void testQoS0() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ assertProvenanceEvent();
+
+ verifyPublishedMessage(testMessage.getBytes(), 0, false);
+ }
+
+ @Test
+ public void testQoS1() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 1, false);
+ }
+
+ @Test
+ public void testQoS2NotCleanSession() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ // Publisher executes synchronously so the only time whether its Clean
or Not matters is when the processor stops in the middle of the publishing
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION,
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, false);
+ }
+
+ @Test
+ public void testQoS2() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, false);
+ }
+
+ @Test
+ public void testRetainQoS2() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, true);
+ }
+
+ @Test
+ public void testPublishRecordSet() throws InitializationException {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER,
createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER,
createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final JsonArray testInput = createTestJsonInput();
+
+ testRunner.enqueue(testInput.toString().getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS,
3));
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2,
false);
+ verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2,
false);
+ verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2,
false);
+ assertNull(mqttTestClient.getLastPublished(), "TestClient's queue
should be empty.");
Review Comment:
Minor: might make sense to extract this assertion
to`verifyNoMorePublished()` (more descriptive and used in all record tests)
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java:
##########
@@ -17,52 +17,373 @@
package org.apache.nifi.processors.mqtt;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import javax.json.Json;
+import javax.json.JsonArray;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
- @Override
- public void verifyPublishedMessage(byte[] payload, int qos, boolean
retain) {
- StandardMqttMessage lastPublishedMessage =
mqttTestClient.getLastPublishedMessage();
- String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+ private static final String BROKER_URI = "tcp://localhost:1883";
+ private static final String TOPIC = "testTopic";
+ private static final String RETAIN = "false";
+
+ private TestRunner testRunner;
+ private MqttTestClient mqttTestClient;
+
+ @Test
+ public void testQoS0() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ assertProvenanceEvent();
+
+ verifyPublishedMessage(testMessage.getBytes(), 0, false);
+ }
+
+ @Test
+ public void testQoS1() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 1, false);
+ }
+
+ @Test
+ public void testQoS2NotCleanSession() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ // Publisher executes synchronously so the only time whether its Clean
or Not matters is when the processor stops in the middle of the publishing
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION,
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, false);
+ }
+
+ @Test
+ public void testQoS2() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, false);
+ }
+
+ @Test
+ public void testRetainQoS2() {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, true);
+ }
+
+ @Test
+ public void testPublishRecordSet() throws InitializationException {
+ mqttTestClient = new
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER,
createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER,
createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final JsonArray testInput = createTestJsonInput();
+
+ testRunner.enqueue(testInput.toString().getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS,
3));
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2,
false);
+ verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2,
false);
+ verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2,
false);
+ assertNull(mqttTestClient.getLastPublished(), "TestClient's queue
should be empty.");
+
+ final List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile successfulFlowFile = flowFiles.get(0);
+ final String publishFailedIndexAttributeName =
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName),
"Failed attribute should not be present on the FlowFile");
+
+ // clean runner by removing records reader/writer
+ testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
+ testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
Review Comment:
Minor: the `TestRunner` instance is used only in this test method (which has
its own `TestPublishMQTT` instance with the `TestRunner` field at runtime, if
I'm not wrong), so this clean-up is not necessary and can be omitted.
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java:
##########
@@ -17,105 +17,583 @@
package org.apache.nifi.processors.mqtt;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
-import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.net.ssl.SSLContext;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
- private static TlsConfiguration tlsConfiguration;
+public class TestConsumeMQTT {
- public MqttTestClient mqttTestClient;
+ private static final int PUBLISH_WAIT_MS = 0;
+ private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
+ private static final String BROKER_URI = "tcp://localhost:1883";
+ private static final String CLIENT_ID = "TestClient";
+ private static final String TOPIC_NAME = "testTopic";
+ private static final String INTERNAL_QUEUE_SIZE = "100";
- public class UnitTestableConsumeMqtt extends ConsumeMQTT {
+ private static final String STRING_MESSAGE = "testMessage";
+ private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
- public UnitTestableConsumeMqtt(){
- super();
- }
+ private static final int MOST_ONE = 0;
+ private static final int LEAST_ONE = 1;
+ private static final int EXACTLY_ONCE = 2;
Review Comment:
I know it comes from an old commit but could you please fix the names to
match the proper terminology?
```suggestion
private static final int AT_MOST_ONCE = 0;
private static final int AT_LEAST_ONCE = 1;
private static final int EXACTLY_ONCE = 2;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]