This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eb68ffad70 NIFI-10411 Add record processing feature to PublishMQTT 
processor
eb68ffad70 is described below

commit eb68ffad7091c15c864f60410399d0ac6990d7f8
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Tue Aug 30 14:14:21 2022 +0200

    NIFI-10411 Add record processing feature to PublishMQTT processor
    
    This closes #6373.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml  |   5 +
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   |  42 +-
 .../apache/nifi/processors/mqtt/PublishMQTT.java   | 139 ++++-
 .../mqtt/common/AbstractMQTTProcessor.java         |  31 +-
 .../nifi/processors/mqtt/TestConsumeMQTT.java      | 636 +++++++++++++++++++--
 .../nifi/processors/mqtt/TestPublishMQTT.java      | 370 +++++++++++-
 .../processors/mqtt/common/MqttTestClient.java     |  22 +-
 .../nifi/processors/mqtt/common/MqttTestUtil.java  |  44 ++
 .../mqtt/common/TestConsumeMqttCommon.java         | 587 -------------------
 .../mqtt/common/TestPublishMqttCommon.java         | 140 -----
 10 files changed, 1155 insertions(+), 861 deletions(-)

diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index 3e9bed91ef..6810b8837a 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -93,5 +93,10 @@
             <artifactId>nifi-schema-registry-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 46d1452cea..c9e318ac86 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -161,22 +161,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
-            .name("record-reader")
-            .displayName("Record Reader")
-            .description("The Record Reader to use for received messages")
-            .identifiesControllerService(RecordReaderFactory.class)
-            .required(false)
-            .build();
-
-    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
-            .name("record-writer")
-            .displayName("Record Writer")
-            .description("The Record Writer to use in order to serialize the 
data before writing it to a FlowFile")
-            .identifiesControllerService(RecordSetWriterFactory.class)
-            .required(false)
-            .build();
-
     public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new 
PropertyDescriptor.Builder()
             .name("add-attributes-as-fields")
             .displayName("Add attributes as fields")
@@ -201,6 +185,16 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
                     + "character such as 'new line' use CTRL+Enter or 
Shift+Enter depending on the OS.")
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_READER)
+            .description("The Record Reader to use for parsing received MQTT 
Messages into Records.")
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_WRITER)
+            .description("The Record Writer to use for serializing Records 
before writing them to a FlowFile.")
+            .build();
+
     private volatile int qos;
     private volatile String topicPrefix = "";
     private volatile String topicFilter;
@@ -229,10 +223,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         innerDescriptorsList.add(PROP_TOPIC_FILTER);
         innerDescriptorsList.add(PROP_QOS);
         innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
-        innerDescriptorsList.add(RECORD_READER);
-        innerDescriptorsList.add(RECORD_WRITER);
         innerDescriptorsList.add(ADD_ATTRIBUTES_AS_FIELDS);
         innerDescriptorsList.add(MESSAGE_DEMARCATOR);
+        innerDescriptorsList.add(RECORD_READER);
+        innerDescriptorsList.add(RECORD_WRITER);
         descriptors = Collections.unmodifiableList(innerDescriptorsList);
 
         final Set<Relationship> innerRelationshipsSet = new HashSet<>();
@@ -292,16 +286,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         }
 
         final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
-        final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
-        if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
-            results.add(new ValidationResult.Builder().subject("Reader and 
Writer").valid(false)
-                    .explanation("both Record Reader and Writer must be set 
when used.").build());
-        }
-
         final boolean demarcatorIsSet = 
context.getProperty(MESSAGE_DEMARCATOR).isSet();
         if (readerIsSet && demarcatorIsSet) {
             results.add(new ValidationResult.Builder().subject("Reader and 
Writer").valid(false)
-                    .explanation("Message Demarcator and Record Reader/Writer 
cannot be used at the same time.").build());
+                    .explanation("message Demarcator and Record Reader/Writer 
cannot be used at the same time.").build());
         }
 
         return results;
@@ -461,7 +449,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
     private FlowFile createFlowFileAndPopulateAttributes(ProcessSession 
session, ReceivedMqttMessage mqttMessage) {
         FlowFile messageFlowfile = session.create();
 
-        Map<String, String> attrs = new HashMap<>();
+        final Map<String, String> attrs = new HashMap<>();
         attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
         attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
         attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
@@ -476,7 +464,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        FlowFile flowFile = session.create();
+        final FlowFile flowFile = session.create();
         session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, 
clientProperties.getBroker());
 
         final Map<String, String> attributes = new HashMap<>();
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index 2ea80b1ded..86fa051f71 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -39,18 +39,30 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.processors.mqtt.common.MqttCallback;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.ByteArrayOutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
 
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -86,6 +98,16 @@ public class PublishMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
             .addValidator(RETAIN_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_READER)
+            .description("The Record Reader to use for parsing the incoming 
FlowFile into Records.")
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_WRITER)
+            .description("The Record Writer to use for serializing Records 
before publishing them as an MQTT Message.")
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles that are sent successfully to the 
destination are transferred to this relationship.")
@@ -103,6 +125,8 @@ public class PublishMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         innerDescriptorsList.add(PROP_TOPIC);
         innerDescriptorsList.add(PROP_QOS);
         innerDescriptorsList.add(PROP_RETAIN);
+        innerDescriptorsList.add(RECORD_READER);
+        innerDescriptorsList.add(RECORD_WRITER);
         descriptors = Collections.unmodifiableList(innerDescriptorsList);
 
         final Set<Relationship> innerRelationshipsSet = new HashSet<>();
@@ -111,9 +135,17 @@ public class PublishMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
         relationships = Collections.unmodifiableSet(innerRelationshipsSet);
     }
 
+    static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = 
"Publish failed after %d successfully published records.";
+    static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = 
"Successfully finished publishing previously failed records. Total record 
count: %d";
+    static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = 
"Successfully published all records. Total record count: %d";
+
+    static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = 
".mqtt.publish.failed.index";
+    private String publishFailedIndexAttributeName;
+
     @Override
     protected void init(final ProcessorInitializationContext context) {
         logger = getLogger();
+        publishFailedIndexAttributeName = getIdentifier() + 
ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
     }
 
     @Override
@@ -162,30 +194,107 @@ public class PublishMQTT extends AbstractMQTTProcessor 
implements MqttCallback {
             return;
         }
 
-        // do the read
-        final byte[] messageContent = new byte[(int) flowfile.getSize()];
-        session.read(flowfile, in -> StreamUtils.fillBuffer(in, 
messageContent, true));
+        if (context.getProperty(RECORD_READER).isSet()) {
+            processRecordSet(context, session, flowfile, topic);
+        } else {
+            processStandardFlowFile(context, session, flowfile, topic);
+        }
+    }
 
-        int qos = 
context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
-        boolean retained = 
context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean();
-        final StandardMqttMessage mqttMessage = new 
StandardMqttMessage(messageContent, qos, retained);
+    private void processRecordSet(ProcessContext context, ProcessSession 
session, final FlowFile flowfile, String topic) {
+        final StopWatch stopWatch = new StopWatch(true);
+        final AtomicInteger processedRecords = new AtomicInteger();
 
         try {
-            final StopWatch stopWatch = new StopWatch(true);
-            /*
-             * Underlying method waits for the message to publish (according 
to set QoS), so it executes synchronously:
-             *     MqttClient.java:361 aClient.publish(topic, message, null, 
null).waitForCompletion(getTimeToWait());
-             */
-            mqttClient.publish(topic, mqttMessage);
+            final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+            final Long previousProcessFailedAt = 
ofNullable(flowfile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+            session.read(flowfile, in -> {
+                try (final RecordReader reader = 
readerFactory.createRecordReader(flowfile, in, logger)) {
+                    final RecordSet recordSet = reader.createRecordSet();
+
+                    final RecordSchema schema = 
writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
+
+                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream(1024);
+
+                    Record record;
+                    while ((record = recordSet.next()) != null) {
+                        if (previousProcessFailedAt != null && 
processedRecords.get() < previousProcessFailedAt) {
+                            processedRecords.getAndIncrement();
+                            continue;
+                        }
+
+                        baos.reset();
+
+                        try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos, flowfile)) {
+                            writer.write(record);
+                            writer.flush();
+                        }
+
+                        final byte[] messageContent = baos.toByteArray();
+
+                        publishMessage(context, flowfile, topic, 
messageContent);
+                        processedRecords.getAndIncrement();
+                    }
+                } catch (SchemaNotFoundException | MalformedRecordException e) 
{
+                    throw new ProcessException("An error happened during 
creating components for serialization.", e);
+                }
+            });
+
+            FlowFile successFlowFile = flowfile;
+
+            String provenanceEventDetails;
+            if (previousProcessFailedAt != null) {
+                successFlowFile = session.removeAttribute(flowfile, 
publishFailedIndexAttributeName);
+                provenanceEventDetails = 
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, 
processedRecords.get());
+            } else {
+                provenanceEventDetails = 
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 
processedRecords.get());
+            }
+
+            session.getProvenanceReporter().send(flowfile, 
clientProperties.getBroker(), provenanceEventDetails, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(successFlowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            logger.error("An error happened during publishing records. Routing 
to failure.", e);
+
+            FlowFile failedFlowFile = session.putAttribute(flowfile, 
publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+            if (processedRecords.get() > 0) {
+                session.getProvenanceReporter().send(
+                        failedFlowFile,
+                        clientProperties.getBroker(),
+                        
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 
processedRecords.get()),
+                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+
+            session.transfer(failedFlowFile, REL_FAILURE);
+        }
+    }
+
+    private void processStandardFlowFile(ProcessContext context, 
ProcessSession session, FlowFile flowfile, String topic) {
+        try {
+            final byte[] messageContent = new byte[(int) flowfile.getSize()];
+            session.read(flowfile, in -> StreamUtils.fillBuffer(in, 
messageContent, true));
 
+            final StopWatch stopWatch = new StopWatch(true);
+            publishMessage(context, flowfile, topic, messageContent);
             session.getProvenanceReporter().send(flowfile, 
clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowfile, REL_SUCCESS);
-        } catch (MqttException me) {
-            logger.error("Failed to publish message.", me);
+        } catch (Exception e) {
+            logger.error("An error happened during publishing a message. 
Routing to failure.", e);
             session.transfer(flowfile, REL_FAILURE);
         }
     }
 
+    private void publishMessage(ProcessContext context, FlowFile flowfile, 
String topic, byte[] messageContent) {
+        int qos = 
context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
+        boolean retained = 
context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean();
+        final StandardMqttMessage mqttMessage = new 
StandardMqttMessage(messageContent, qos, retained);
+
+        mqttClient.publish(topic, mqttMessage);
+    }
+
     private void initializeClient(ProcessContext context) {
         // NOTE: This method is called when isConnected returns false which 
can happen when the client is null, or when it is
         // non-null but not connected, so we need to handle each case and only 
create a new client when it is null
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 8b8c186360..0fd799b8a5 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -32,6 +32,8 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.ssl.SSLContextService;
 
 import java.net.URI;
@@ -42,6 +44,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
 import static org.apache.commons.lang3.StringUtils.EMPTY;
@@ -93,7 +96,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
     };
 
     private static String getSupportedSchemeList() {
-        return String.join(", ", 
Arrays.stream(MqttProtocolScheme.values()).map(value -> 
value.name().toLowerCase()).toArray(String[]::new));
+        return Arrays.stream(MqttProtocolScheme.values()).map(value -> 
value.name().toLowerCase()).collect(Collectors.joining(", "));
     }
 
     public static final Validator RETAIN_VALIDATOR = (subject, input, context) 
-> {
@@ -232,6 +235,20 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor BASE_RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor BASE_RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
     public static List<PropertyDescriptor> getAbstractPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(PROP_BROKER_URI);
@@ -287,6 +304,13 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
             results.add(new 
ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false).explanation("it
 is not valid URI syntax.").build());
         }
 
+        final boolean readerIsSet = 
validationContext.getProperty(BASE_RECORD_READER).isSet();
+        final boolean writerIsSet = 
validationContext.getProperty(BASE_RECORD_WRITER).isSet();
+        if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
+            results.add(new ValidationResult.Builder().subject("Reader and 
Writer").valid(false)
+                    .explanation("both Record Reader and Writer must be set 
when used.").build());
+        }
+
         return results;
     }
 
@@ -366,9 +390,8 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
         
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
         
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
 
-        final PropertyValue sslProp = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE);
-        if (sslProp.isSet()) {
-            final SSLContextService sslContextService = (SSLContextService) 
sslProp.asControllerService();
+        final SSLContextService sslContextService = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
             
clientProperties.setTlsConfiguration(sslContextService.createTlsConfiguration());
         }
 
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 81ea6d7db1..3cb1c2fedf 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -17,105 +17,575 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
-import org.apache.nifi.security.util.TlsConfiguration;
 import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import javax.net.ssl.SSLContext;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
-    private static TlsConfiguration tlsConfiguration;
+public class TestConsumeMQTT {
 
-    public MqttTestClient mqttTestClient;
+    private static final int PUBLISH_WAIT_MS = 0;
+    private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String CLIENT_ID = "TestClient";
+    private static final String TOPIC_NAME = "testTopic";
+    private static final String INTERNAL_QUEUE_SIZE = "100";
 
-    public class UnitTestableConsumeMqtt extends ConsumeMQTT {
+    private static final String STRING_MESSAGE = "testMessage";
+    private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
 
-        public UnitTestableConsumeMqtt(){
-            super();
-        }
+    private static final int AT_MOST_ONCE = 0;
+    private static final int AT_LEAST_ONCE = 1;
+    private static final int EXACTLY_ONCE = 2;
+
+    private MqttTestClient mqttTestClient;
+    private TestRunner testRunner;
+
+    @AfterEach
+    public void cleanup() {
+        testRunner = null;
+        mqttTestClient = null;
+    }
+
+    @Test
+    public void testClientIDConfiguration() {
+        testRunner = initializeTestRunner();
+        testRunner.assertValid();
+
+        testRunner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
+        testRunner.assertValid();
+
+        testRunner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
+        testRunner.assertValid();
+    }
+
+    @Test
+    public void testLastWillConfig() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill 
message");
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill 
topic");
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
+        testRunner.assertValid();
+    }
+
+
+    @Test
+    public void testQoS2() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+        assertProvenanceEvents(1);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        final MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+        testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+        assertProvenanceEvents(1);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        final MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testQoS1() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
 
-        @Override
-        protected MqttClient createMqttClient() {
-            mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
-            return mqttTestClient;
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(STRING_MESSAGE, AT_LEAST_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() > 0);
+        assertProvenanceEvents(flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testQoS1NotCleanSession() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
+        testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+        publishMessage(STRING_MESSAGE, AT_LEAST_ONCE);
+
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() > 0);
+        assertProvenanceEvents(flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testQoS0() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(STRING_MESSAGE, AT_MOST_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() < 2);
+        assertProvenanceEvents(flowFiles.size());
+
+        if(flowFiles.size() == 1) {
+            MockFlowFile flowFile = flowFiles.get(0);
+
+            flowFile.assertContentEquals("testMessage");
+            flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+            flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+            flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
+            flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, 
"false");
+            flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
         }
     }
 
-    @BeforeAll
-    public static void setTlsConfiguration() {
-        tlsConfiguration = new TemporaryKeyStoreBuilder().build();
+    @Test
+    public void testOnStoppedFinish() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final byte[] content = 
ByteBuffer.wrap("testMessage".getBytes()).array();
+        final ReceivedMqttMessage testMessage = new 
ReceivedMqttMessage(content, 2, false, TOPIC_NAME);
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        consumeMQTT.processSessionFactory = 
testRunner.getProcessSessionFactory();
+
+        final Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
+        f.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        final LinkedBlockingQueue<ReceivedMqttMessage> queue = 
(LinkedBlockingQueue<ReceivedMqttMessage>) f.get(consumeMQTT);
+        queue.add(testMessage);
+
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+        consumeMQTT.onStopped(testRunner.getProcessContext());
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+        assertProvenanceEvents(1);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        final MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testResizeBuffer() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
+        testRunner.assertValid();
+
+        testRunner.run(1);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
+        assertProvenanceEvents(2);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        final MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testConsumeRecordsWithAddedFields() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+        publishMessage(THIS_IS_NOT_JSON, AT_MOST_ONCE);
+        publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertEquals(1, flowFiles.size());
+        assertEquals("[{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+                        + "{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
+                new String(flowFiles.get(0).toByteArray()));
+
+        final List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertEquals(1, badFlowFiles.size());
+        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
+    }
+
+    @Test
+    public void testConsumeDemarcator() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+        publishMessage(THIS_IS_NOT_JSON, AT_MOST_ONCE);
+        publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertEquals(flowFiles.size(), 1);
+        assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+                        + THIS_IS_NOT_JSON + "\\n"
+                        + "{\"name\":\"Apache NiFi\"}\\n",
+                new String(flowFiles.get(0).toByteArray()));
+
+        final List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertEquals(0, badFlowFiles.size());
     }
 
-    @BeforeEach
-    public void init() {
-        PUBLISH_WAIT_MS = 0;
+    @Test
+    public void testConsumeRecordsWithoutAddedFields() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(JSON_PAYLOAD, AT_LEAST_ONCE);
+        publishMessage(THIS_IS_NOT_JSON, AT_LEAST_ONCE);
+        publishMessage(JSON_PAYLOAD, AT_LEAST_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertEquals(1, flowFiles.size());
+        assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache 
NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
+
+        final List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertEquals(1, badFlowFiles.size());
+        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
+    }
 
-        broker = "tcp://localhost:1883";
-        UnitTestableConsumeMqtt proc = new UnitTestableConsumeMqtt();
-        testRunner = TestRunners.newTestRunner(proc);
-        testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
-        testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
-        testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
-        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
+    @Test
+    public void testConsumeRecordsOnlyBadData() throws Exception {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
+
+        testRunner.assertValid();
+
+        final ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT, testRunner.getProcessContext());
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        final List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+        assertEquals(1, badFlowFiles.size());
+        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
     }
 
     @Test
     public void testSslContextService() throws InitializationException, 
TlsException {
-        String brokerURI = "ssl://localhost:8883";
-        TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
-        runner.setVariable("brokerURI", brokerURI);
-        runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
-        runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
-        runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
-        runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
+        testRunner = initializeTestRunner();
+        testRunner.setVariable("brokerURI",  "ssl://localhost:8883");
+        testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
 
         final SSLContextService sslContextService = 
mock(SSLContextService.class);
         final String identifier = SSLContextService.class.getSimpleName();
         when(sslContextService.getIdentifier()).thenReturn(identifier);
-        final SSLContext sslContext = 
SslContextFactory.createSslContext(tlsConfiguration);
+        final SSLContext sslContext = SslContextFactory.createSslContext(new 
TemporaryKeyStoreBuilder().build());
         when(sslContextService.createContext()).thenReturn(sslContext);
 
-        runner.addControllerService(identifier, sslContextService);
-        runner.enableControllerService(sslContextService);
-        runner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, identifier);
+        testRunner.addControllerService(identifier, sslContextService);
+        testRunner.enableControllerService(sslContextService);
+        testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, 
identifier);
 
-        ConsumeMQTT processor = (ConsumeMQTT) runner.getProcessor();
-        processor.onScheduled(runner.getProcessContext());
+        final ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
+        processor.onScheduled(testRunner.getProcessContext());
     }
 
     @Test
     public void testMessageNotConsumedOnCommitFail() throws 
NoSuchFieldException, IllegalAccessException {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+        testRunner = initializeTestRunner(mqttTestClient);
+
         testRunner.run(1, false);
-        ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
-        ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
+        final ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
+        final ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
         when(mock.getPayload()).thenReturn(new byte[0]);
-        when(mock.getTopic()).thenReturn("testTopic");
-        BlockingQueue<ReceivedMqttMessage> mqttQueue = getMqttQueue(processor);
+        when(mock.getTopic()).thenReturn(TOPIC_NAME);
+        final BlockingQueue<ReceivedMqttMessage> mqttQueue = 
getMqttQueue(processor);
         mqttQueue.add(mock);
 
-        ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
+        final ProcessSession session = 
testRunner.getProcessSessionFactory().createSession();
 
         assertThrows(InvocationTargetException.class, () -> 
transferQueue(processor,
             (ProcessSession) 
Proxy.newProxyInstance(getClass().getClassLoader(), new 
Class[]{ProcessSession.class}, (proxy, method, args) -> {
@@ -128,8 +598,76 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon 
{
         assertTrue(mqttQueue.contains(mock));
     }
 
-    @Override
-    public void internalPublish(final StandardMqttMessage message, final 
String topicName) {
-        mqttTestClient.publish(topicName, message);
+    private TestRunner initializeTestRunner() {
+        if (mqttTestClient != null) {
+            throw new IllegalStateException("mqttTestClient should be null, 
using ConsumeMQTT's default client!");
+        }
+
+        final TestRunner testRunner = 
TestRunners.newTestRunner(ConsumeMQTT.class);
+
+        setCommonProperties(testRunner);
+
+        return testRunner;
+    }
+
+    private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
ConsumeMQTT() {
+            @Override
+            protected MqttClient createMqttClient() {
+                return mqttTestClient;
+            }
+        });
+
+        setCommonProperties(testRunner);
+
+        return testRunner;
+    }
+
+    private void setCommonProperties(TestRunner testRunner) {
+        testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, BROKER_URI);
+        testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, CLIENT_ID);
+        testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, TOPIC_NAME);
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, 
INTERNAL_QUEUE_SIZE);
+    }
+
+    private static boolean isConnected(AbstractMQTTProcessor processor) throws 
NoSuchFieldException, IllegalAccessException {
+        final Field f = 
AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
+        f.setAccessible(true);
+        final MqttClient mqttClient = (MqttClient) f.get(processor);
+        return mqttClient.isConnected();
+    }
+
+
+    public static void reconnect(ConsumeMQTT processor, ProcessContext 
context) throws IllegalAccessException, NoSuchMethodException, 
InvocationTargetException {
+        final Method method = 
ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
+        method.setAccessible(true);
+        method.invoke(processor, context);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static BlockingQueue<ReceivedMqttMessage> getMqttQueue(ConsumeMQTT 
consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
+        final Field mqttQueueField = 
ConsumeMQTT.class.getDeclaredField("mqttQueue");
+        mqttQueueField.setAccessible(true);
+        return (BlockingQueue<ReceivedMqttMessage>) 
mqttQueueField.get(consumeMQTT);
+    }
+
+    public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession 
session) throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        final Method transferQueue = 
ConsumeMQTT.class.getDeclaredMethod("transferQueue", ProcessSession.class);
+        transferQueue.setAccessible(true);
+        transferQueue.invoke(consumeMQTT, session);
+    }
+
+    private void assertProvenanceEvents(int count){
+        final List<ProvenanceEventRecord> provenanceEvents = 
testRunner.getProvenanceEvents();
+        assertNotNull(provenanceEvents);
+        assertEquals(count, provenanceEvents.size());
+        if (count > 0) {
+            assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvents.get(0).getEventType());
+        }
+    }
+
+    private void publishMessage(final String payload, final int qos) {
+        final StandardMqttMessage message = new 
StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
+        mqttTestClient.publish(TOPIC_NAME, message);
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index 41181c65ed..b9815339da 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -17,52 +17,368 @@
 
 package org.apache.nifi.processors.mqtt;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
 import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static java.util.Arrays.asList;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static 
org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
 
-    @Override
-    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
-        StandardMqttMessage lastPublishedMessage = 
mqttTestClient.getLastPublishedMessage();
-        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+    private static final String BROKER_URI = "tcp://localhost:1883";
+    private static final String TOPIC = "testTopic";
+    private static final String RETAIN = "false";
+
+    private MqttTestClient mqttTestClient;
+    private TestRunner testRunner;
+
+    @AfterEach
+    public void cleanup() {
+        testRunner = null;
+        mqttTestClient = null;
+    }
+
+    @Test
+    public void testQoS0() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        assertProvenanceEvent();
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        // Publisher executes synchronously so the only time whether its Clean 
or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        final String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        assertProvenanceEvent();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+
+    @Test
+    public void testPublishRecordSet() throws InitializationException {
+        mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS,
 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName),
 "Failed attribute should not be present on the FlowFile");
+    }
+
+    @Test
+    public void testPublishRecordSetFailed() throws InitializationException {
+        mqttTestClient = Mockito.spy(new 
MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE,
 1));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        assertEquals("1", 
failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record 
is expected to be published successfully.");
+    }
+
+    @Test
+    public void 
testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws 
InitializationException {
+        mqttTestClient = Mockito.spy(new 
MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod()
+                .doThrow(new RuntimeException("Second publish failed."))
+                .when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE,
 2));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        assertEquals("2", 
failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record 
is expected to be published successfully.");
+    }
+
+    @Test
+    public void 
testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws 
InitializationException {
+        mqttTestClient = Mockito.spy(new 
MqttTestClient(MqttTestClient.ConnectType.Publisher));
+        Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
+        testRunner = initializeTestRunner(mqttTestClient);
+
+        testRunner.setProperty(ConsumeMQTT.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER,
 3));
+
+        verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+        verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, 
false);
+        verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, 
false);
+        verifyNoMorePublished();
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        
assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+                publishFailedIndexAttributeName + " is expected to be removed 
after all remaining records have been published successfully.");
+    }
+
+    private void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
+        final Pair<String, StandardMqttMessage> lastPublished = 
mqttTestClient.getLastPublished();
+        final String lastPublishedTopic = lastPublished.getLeft();
+        final StandardMqttMessage lastPublishedMessage = 
lastPublished.getRight();
         assertEquals(Arrays.toString(payload), 
Arrays.toString(lastPublishedMessage.getPayload()));
         assertEquals(qos, lastPublishedMessage.getQos());
         assertEquals(retain, lastPublishedMessage.isRetained());
-        assertEquals(topic, lastPublishedTopic);
+        assertEquals(TOPIC, lastPublishedTopic);
     }
 
-    private MqttTestClient mqttTestClient;
+    private void verifyNoMorePublished() {
+        assertNull(mqttTestClient.getLastPublished(), "TestClient's queue 
should be empty.");
+    }
+
+    private ProvenanceEventRecord assertProvenanceEvent() {
+        final List<ProvenanceEventRecord> provenanceEvents = 
testRunner.getProvenanceEvents();
+        assertNotNull(provenanceEvents);
+        assertEquals(1, provenanceEvents.size());
 
-    public class UnitTestablePublishMqtt extends PublishMQTT {
+        final ProvenanceEventRecord event = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, event.getEventType());
 
-        public UnitTestablePublishMqtt(){
-            super();
-        }
+        return event;
+    }
 
-        @Override
-        protected MqttClient createMqttClient() throws MqttException {
-            mqttTestClient = new 
MqttTestClient(MqttTestClient.ConnectType.Publisher);
-            return mqttTestClient;
-        }
+    private void assertProvenanceEvent(String expectedDetails) {
+        final ProvenanceEventRecord event = assertProvenanceEvent();
+        assertEquals(expectedDetails, event.getDetails());
     }
 
-    @BeforeEach
-    public void init() {
-        UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
-        testRunner = TestRunners.newTestRunner(proc);
-        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
-        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
-        topic = "testTopic";
-        testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
+    private static ArrayNode createTestJsonInput() {
+        final ObjectMapper mapper = new ObjectMapper();
+
+        return mapper.createArrayNode().addAll(asList(
+                mapper.createObjectNode()
+                        .put("recordId", 1)
+                        .put("firstAttribute", "foo")
+                        .put("secondAttribute", false),
+                mapper.createObjectNode()
+                        .put("recordId", 2)
+                        .put("firstAttribute", "bar")
+                        .put("secondAttribute", true),
+                mapper.createObjectNode()
+                        .put("recordId", 3)
+                        .put("firstAttribute", "foobar")
+                        .put("secondAttribute", false)
+        ));
+    }
+
+    private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
PublishMQTT() {
+            @Override
+            protected MqttClient createMqttClient() {
+                return mqttTestClient;
+            }
+        });
+
+        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, BROKER_URI);
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, RETAIN);
+        testRunner.setProperty(PublishMQTT.PROP_TOPIC, TOPIC);
+
+        return testRunner;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
index 91997061bc..dcb87b612c 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
@@ -17,23 +17,26 @@
 
 package org.apache.nifi.processors.mqtt.common;
 
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class MqttTestClient implements MqttClient {
 
+    private final Queue<Pair<String, StandardMqttMessage>> publishedMessages = 
new LinkedList<>();
+
     public AtomicBoolean connected = new AtomicBoolean(false);
 
     public MqttCallback mqttCallback;
     public ConnectType type;
-    public enum ConnectType {Publisher, Subscriber}
 
-    private StandardMqttMessage lastPublishedMessage;
-    private String lastPublishedTopic;
+    public enum ConnectType {Publisher, Subscriber}
 
     public String subscribedTopic;
     public int subscribedQos;
 
-
     public MqttTestClient(ConnectType type) {
         this.type = type;
     }
@@ -62,8 +65,7 @@ public class MqttTestClient implements MqttClient {
     public void publish(String topic, StandardMqttMessage message) {
         switch (type) {
             case Publisher:
-                lastPublishedMessage = message;
-                lastPublishedTopic = topic;
+                publishedMessages.add(Pair.of(topic, message));
                 break;
             case Subscriber:
                 mqttCallback.messageArrived(new 
ReceivedMqttMessage(message.getPayload(), message.getQos(), 
message.isRetained(), topic));
@@ -82,11 +84,7 @@ public class MqttTestClient implements MqttClient {
         this.mqttCallback = callback;
     }
 
-    public StandardMqttMessage getLastPublishedMessage() {
-        return lastPublishedMessage;
-    }
-
-    public String getLastPublishedTopic() {
-        return lastPublishedTopic;
+    public Pair<String, StandardMqttMessage> getLastPublished() {
+        return publishedMessages.poll();
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtil.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtil.java
new file mode 100644
index 0000000000..d2e8de0382
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.TestRunner;
+
+public class MqttTestUtil {
+
+    public static String createJsonRecordSetReaderService(TestRunner 
testRunner) throws InitializationException {
+        final String id = "record-reader";
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        testRunner.addControllerService(id, jsonReader);
+        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        testRunner.enableControllerService(jsonReader);
+        return id;
+    }
+
+    public static String createJsonRecordSetWriterService(TestRunner 
testRunner) throws InitializationException {
+        final String id = "record-writer";
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService(id, jsonWriter);
+        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        testRunner.enableControllerService(jsonWriter);
+        return id;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
deleted file mode 100644
index 7cbaa11c8d..0000000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.common;
-
-import org.apache.nifi.json.JsonRecordSetWriter;
-import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.mqtt.ConsumeMQTT;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.schema.access.SchemaAccessUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
-import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
-import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
-import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public abstract class TestConsumeMqttCommon {
-
-    public int PUBLISH_WAIT_MS = 1000;
-    public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
-
-    public TestRunner testRunner;
-    public String broker;
-
-    private static final String STRING_MESSAGE = "testMessage";
-    private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
-
-    private static final int MOST_ONE = 0;
-    private static final int LEAST_ONE = 1;
-    private static final int EXACTLY_ONCE = 2;
-
-    public abstract void internalPublish(StandardMqttMessage message, String 
topicName);
-
-    @Test
-    public void testClientIDConfiguration() {
-        TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
-        runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
-        runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
-        runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
-        runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
-        runner.assertValid();
-
-        runner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
-        runner.assertNotValid();
-
-        runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
-        runner.assertValid();
-
-        runner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
-        runner.assertValid();
-    }
-
-    @Test
-    public void testLastWillConfig() {
-        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill 
message");
-        testRunner.assertNotValid();
-        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill 
topic");
-        testRunner.assertNotValid();
-        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
-        testRunner.assertNotValid();
-        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
-        testRunner.assertValid();
-    }
-
-
-    @Test
-    public void testQoS2() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-        assertProvenanceEvents(1);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-    }
-
-    @Test
-    public void testQoS2NotCleanSession() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-        testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
-
-        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
-
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        testRunner.run(1, false, false);
-
-        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-        assertProvenanceEvents(1);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-    }
-
-
-    @Test
-    public void testQoS1() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(STRING_MESSAGE, LEAST_ONE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertTrue(flowFiles.size() > 0);
-        assertProvenanceEvents(flowFiles.size());
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-    }
-
-    @Test
-    public void testQoS1NotCleanSession() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
-        testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
-
-        publishMessage(STRING_MESSAGE, LEAST_ONE);
-
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        testRunner.run(1, false, false);
-
-        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertTrue(flowFiles.size() > 0);
-        assertProvenanceEvents(flowFiles.size());
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-    }
-
-    @Test
-    public void testQoS0() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(STRING_MESSAGE, MOST_ONE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertTrue(flowFiles.size() < 2);
-        assertProvenanceEvents(flowFiles.size());
-
-        if(flowFiles.size() == 1) {
-            MockFlowFile flowFile = flowFiles.get(0);
-
-            flowFile.assertContentEquals("testMessage");
-            flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-            flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-            flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
-            flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, 
"false");
-            flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-        }
-    }
-
-    @Test
-    public void testOnStoppedFinish() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-
-        testRunner.assertValid();
-
-        final byte[] content = 
ByteBuffer.wrap("testMessage".getBytes()).array();
-        ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, 
false, "testTopic");
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        consumeMQTT.processSessionFactory = 
testRunner.getProcessSessionFactory();
-
-        Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
-        f.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        LinkedBlockingQueue<ReceivedMqttMessage> queue = 
(LinkedBlockingQueue<ReceivedMqttMessage>) f.get(consumeMQTT);
-        queue.add(testMessage);
-
-        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
-        consumeMQTT.onStopped(testRunner.getProcessContext());
-
-        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-        assertProvenanceEvents(1);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-    }
-
-    @Test
-    public void testResizeBuffer() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
-        publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
-
-        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
-        testRunner.assertValid();
-
-        testRunner.run(1);
-
-        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
-        assertProvenanceEvents(2);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        MockFlowFile flowFile = flowFiles.get(0);
-
-        flowFile.assertContentEquals("testMessage");
-        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
-        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
-        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
-        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
-        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
-    }
-
-    @Test
-    public void testConsumeRecordsWithAddedFields() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
-        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
-
-        final JsonTreeReader jsonReader = new JsonTreeReader();
-        testRunner.addControllerService("record-reader", jsonReader);
-        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
-        testRunner.enableControllerService(jsonReader);
-
-        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
-        testRunner.addControllerService("record-writer", jsonWriter);
-        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
-        testRunner.enableControllerService(jsonWriter);
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(JSON_PAYLOAD, MOST_ONE);
-        publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
-        publishMessage(JSON_PAYLOAD, MOST_ONE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertEquals(1, flowFiles.size());
-        assertEquals("[{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
-                + "{\"name\":\"Apache 
NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
-                new String(flowFiles.get(0).toByteArray()));
-
-        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertEquals(1, badFlowFiles.size());
-        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
-
-        // clean runner by removing records reader/writer
-        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
-        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
-    }
-
-    @Test
-    public void testConsumeDemarcator() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(JSON_PAYLOAD, MOST_ONE);
-        publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
-        publishMessage(JSON_PAYLOAD, MOST_ONE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertEquals(flowFiles.size(), 1);
-        assertEquals("{\"name\":\"Apache NiFi\"}\\n"
-                + THIS_IS_NOT_JSON + "\\n"
-                + "{\"name\":\"Apache NiFi\"}\\n",
-                new String(flowFiles.get(0).toByteArray()));
-
-        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertEquals(0, badFlowFiles.size());
-
-        // clean runner by removing message demarcator
-        testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
-    }
-
-    @Test
-    public void testConsumeRecordsWithoutAddedFields() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
-        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
-        testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
-
-        final JsonTreeReader jsonReader = new JsonTreeReader();
-        testRunner.addControllerService("record-reader", jsonReader);
-        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
-        testRunner.enableControllerService(jsonReader);
-
-        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
-        testRunner.addControllerService("record-writer", jsonWriter);
-        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
-        testRunner.enableControllerService(jsonWriter);
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(JSON_PAYLOAD, LEAST_ONE);
-        publishMessage(THIS_IS_NOT_JSON, LEAST_ONE);
-        publishMessage(JSON_PAYLOAD, LEAST_ONE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
-        assertEquals(1, flowFiles.size());
-        assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache 
NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
-
-        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertEquals(1, badFlowFiles.size());
-        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
-
-        // clean runner by removing records reader/writer
-        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
-        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
-    }
-
-    @Test
-    public void testConsumeRecordsOnlyBadData() throws Exception {
-        testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
-        testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
-        testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
-
-        final JsonTreeReader jsonReader = new JsonTreeReader();
-        testRunner.addControllerService("record-reader", jsonReader);
-        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
-        testRunner.enableControllerService(jsonReader);
-
-        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
-        testRunner.addControllerService("record-writer", jsonWriter);
-        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
-        testRunner.enableControllerService(jsonWriter);
-
-        testRunner.assertValid();
-
-        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
-        consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT, testRunner.getProcessContext());
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        assertTrue(isConnected(consumeMQTT));
-
-        publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
-
-        Thread.sleep(PUBLISH_WAIT_MS);
-
-        testRunner.run(1, false, false);
-
-        List<MockFlowFile> badFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
-        assertEquals(1, badFlowFiles.size());
-        assertEquals(THIS_IS_NOT_JSON, new 
String(badFlowFiles.get(0).toByteArray()));
-
-        // clean runner by removing records reader/writer
-        testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
-        testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
-    }
-
-    private static boolean isConnected(AbstractMQTTProcessor processor) throws 
NoSuchFieldException, IllegalAccessException {
-        Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
-        f.setAccessible(true);
-        MqttClient mqttClient = (MqttClient) f.get(processor);
-        return mqttClient.isConnected();
-    }
-
-
-    public static void reconnect(ConsumeMQTT processor, ProcessContext 
context) throws IllegalAccessException, NoSuchMethodException, 
InvocationTargetException {
-        Method method = 
ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
-        method.setAccessible(true);
-        method.invoke(processor, context);
-    }
-
-    @SuppressWarnings("unchecked")
-    public static BlockingQueue<ReceivedMqttMessage> getMqttQueue(ConsumeMQTT 
consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
-        Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
-        mqttQueueField.setAccessible(true);
-        return (BlockingQueue<ReceivedMqttMessage>) 
mqttQueueField.get(consumeMQTT);
-    }
-
-    public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession 
session) throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
-        Method transferQueue = 
ConsumeMQTT.class.getDeclaredMethod("transferQueue", ProcessSession.class);
-        transferQueue.setAccessible(true);
-        transferQueue.invoke(consumeMQTT, session);
-    }
-
-    private void assertProvenanceEvents(int count){
-        List<ProvenanceEventRecord> provenanceEvents = 
testRunner.getProvenanceEvents();
-        assertNotNull(provenanceEvents);
-        assertEquals(count, provenanceEvents.size());
-        if (count > 0) {
-            assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvents.get(0).getEventType());
-        }
-    }
-
-    private void publishMessage(final String payload, final int qos) {
-        final StandardMqttMessage message = new 
StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
-        internalPublish(message, "testTopic");
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
deleted file mode 100644
index d82bc4da65..0000000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.common;
-
-import org.apache.nifi.processors.mqtt.PublishMQTT;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
-import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-public abstract class TestPublishMqttCommon {
-
-    public TestRunner testRunner;
-    public String topic;
-
-    public abstract void verifyPublishedMessage(byte[] payload, int qos, 
boolean retain);
-
-    @Test
-    public void testQoS0() {
-        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
-
-        testRunner.assertValid();
-
-        String testMessage = "testMessage";
-        testRunner.enqueue(testMessage.getBytes());
-
-        testRunner.run();
-
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
-
-        testRunner.assertTransferCount(REL_SUCCESS, 1);
-        assertProvenanceEvents();
-
-        verifyPublishedMessage(testMessage.getBytes(), 0, false);
-    }
-
-    @Test
-    public void testQoS1() {
-        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
-
-        testRunner.assertValid();
-
-        String testMessage = "testMessage";
-        testRunner.enqueue(testMessage.getBytes());
-
-        testRunner.run();
-
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
-        assertProvenanceEvents();
-
-        testRunner.assertTransferCount(REL_SUCCESS, 1);
-        verifyPublishedMessage(testMessage.getBytes(), 1, false);
-    }
-
-    @Test
-    public void testQoS2NotCleanSession() {
-        // Publisher executes synchronously so the only time whether its Clean 
or Not matters is when the processor stops in the middle of the publishing
-        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
-        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
-
-        testRunner.assertValid();
-
-        String testMessage = "testMessage";
-        testRunner.enqueue(testMessage.getBytes());
-
-        testRunner.run();
-
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
-        assertProvenanceEvents();
-
-        testRunner.assertTransferCount(REL_SUCCESS, 1);
-        verifyPublishedMessage(testMessage.getBytes(), 2, false);
-    }
-
-    @Test
-    public void testQoS2() {
-        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
-
-        testRunner.assertValid();
-
-        String testMessage = "testMessage";
-        testRunner.enqueue(testMessage.getBytes());
-
-        testRunner.run();
-
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
-        assertProvenanceEvents();
-
-        testRunner.assertTransferCount(REL_SUCCESS, 1);
-        verifyPublishedMessage(testMessage.getBytes(), 2, false);
-    }
-
-    @Test
-    public void testRetainQoS2() {
-        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
-        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
-
-        testRunner.assertValid();
-
-        String testMessage = "testMessage";
-        testRunner.enqueue(testMessage.getBytes());
-
-        testRunner.run();
-
-        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
-        assertProvenanceEvents();
-
-        testRunner.assertTransferCount(REL_SUCCESS, 1);
-        verifyPublishedMessage(testMessage.getBytes(), 2, true);
-    }
-
-    private void assertProvenanceEvents(){
-        List<ProvenanceEventRecord> provenanceEvents = 
testRunner.getProvenanceEvents();
-        assertNotNull(provenanceEvents);
-        assertEquals(1, provenanceEvents.size());
-        assertEquals(ProvenanceEventType.SEND, 
provenanceEvents.get(0).getEventType());
-    }
-}

Reply via email to