This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 32df0fa484 NIFI-11137 Add record support to Consume/PublishJMS (#6987)
32df0fa484 is described below

commit 32df0fa4849325d742d36e57b686cb88bd65d2b6
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Mon Apr 3 21:42:27 2023 +0200

    NIFI-11137 Add record support to Consume/PublishJMS (#6987)
---
 .../nifi-jms-bundle/nifi-jms-processors/pom.xml    |  22 ++
 .../nifi/jms/processors/AbstractJMSProcessor.java  |  16 ++
 .../org/apache/nifi/jms/processors/ConsumeJMS.java | 204 ++++++++++++++---
 .../apache/nifi/jms/processors/JMSConsumer.java    | 194 ++++++++++------
 .../apache/nifi/jms/processors/JMSPublisher.java   |   2 +-
 .../org/apache/nifi/jms/processors/PublishJMS.java | 140 +++++++++---
 .../ioconcept/reader/FlowFileReader.java           |  24 ++
 .../ioconcept/reader/FlowFileReaderCallback.java   |  24 ++
 .../ioconcept/reader/MessageHandler.java           |  21 ++
 .../reader/StateTrackingFlowFileReader.java        |  73 ++++++
 .../reader/record/ProvenanceEventTemplates.java    |  25 +++
 .../ioconcept/reader/record/RecordSupplier.java    |  82 +++++++
 .../ioconcept/writer/AttributeSource.java          |  23 ++
 .../ioconcept/writer/FlowFileWriter.java           |  25 +++
 .../ioconcept/writer/FlowFileWriterCallback.java   |  27 +++
 .../processors/ioconcept/writer/Marshaller.java    |  21 ++
 .../ioconcept/writer/record/OutputStrategy.java    |  57 +++++
 .../ioconcept/writer/record/RecordUtils.java       |  78 +++++++
 .../ioconcept/writer/record/RecordWriter.java      | 205 +++++++++++++++++
 .../apache/nifi/jms/processors/ConsumeJMSIT.java   | 199 +++++++++++++++++
 .../jms/processors/JMSPublisherConsumerIT.java     | 164 ++++++--------
 .../apache/nifi/jms/processors/PublishJMSIT.java   | 248 ++++++++++++++++++++-
 .../nifi/jms/processors/helpers/JMSTestUtil.java   |  44 ++++
 23 files changed, 1679 insertions(+), 239 deletions(-)

diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
index b05909b360..ebfe0f6dbc 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
@@ -58,6 +58,14 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-client</artifactId>
@@ -83,6 +91,20 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index aa8fa4dd64..470260cfc9 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -37,6 +37,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.connection.SingleConnectionFactory;
 import 
org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
@@ -151,6 +153,20 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
                     .collect(Collectors.toList())
     );
 
+    static final PropertyDescriptor BASE_RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor BASE_RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .dependsOn(BASE_RECORD_READER)
+            .required(true)
+            .build();
 
     private volatile IJMSConnectionFactoryProvider connectionFactoryProvider;
     private volatile BlockingQueue<T> workerPool;
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 6e99da3807..72d1cae292 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -32,13 +32,18 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
-import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
 import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
+import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
+import org.apache.nifi.jms.processors.ioconcept.writer.record.RecordWriter;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.connection.SingleConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
@@ -48,6 +53,7 @@ import javax.jms.Session;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -55,6 +61,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 /**
  * Consuming JMS processor which upon each invocation of
@@ -88,19 +95,24 @@ import java.util.concurrent.TimeUnit;
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
 public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
+
     public static final String JMS_MESSAGETYPE = "jms.messagetype";
 
+    private final static String COUNTER_PARSE_FAILURES = "Parse Failures";
+    private final static String COUNTER_RECORDS_RECEIVED = "Records Received";
+    private final static String COUNTER_RECORDS_PROCESSED = "Records 
Processed";
+
     static final AllowableValue AUTO_ACK = new 
AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE),
-            "AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + 
")",
+            "AUTO_ACKNOWLEDGE (" + Session.AUTO_ACKNOWLEDGE + ")",
             "Automatically acknowledges a client's receipt of a message, 
regardless if NiFi session has been commited. "
                     + "Can result in data loss in the event where NiFi 
abruptly stopped before session was commited.");
 
     static final AllowableValue CLIENT_ACK = new 
AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE),
-            "CLIENT_ACKNOWLEDGE (" + 
String.valueOf(Session.CLIENT_ACKNOWLEDGE) + ")",
+            "CLIENT_ACKNOWLEDGE (" + Session.CLIENT_ACKNOWLEDGE + ")",
             "(DEFAULT) Manually acknowledges a client's receipt of a message 
after NiFi Session was commited, thus ensuring no data loss");
 
     static final AllowableValue DUPS_OK = new 
AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
-            "DUPS_OK_ACKNOWLEDGE (" + 
String.valueOf(Session.DUPS_OK_ACKNOWLEDGE) + ")",
+            "DUPS_OK_ACKNOWLEDGE (" + Session.DUPS_OK_ACKNOWLEDGE + ")",
             "This acknowledgment mode instructs the session to lazily 
acknowledge the delivery of messages. May result in both data "
                     + "duplication and data loss while achieving the best 
throughput.");
 
@@ -170,11 +182,38 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_READER)
+            .description("The Record Reader to use for parsing received JMS 
Messages into Records.")
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_WRITER)
+            .description("The Record Writer to use for serializing Records 
before writing them to a FlowFile.")
+            .build();
+
+    static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("output-strategy")
+            .displayName("Output Strategy")
+            .description("The format used to output the JMS message into a 
FlowFile record.")
+            .dependsOn(RECORD_READER)
+            .required(true)
+            .defaultValue(OutputStrategy.USE_VALUE.getValue())
+            .allowableValues(OutputStrategy.class)
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are received from the JMS 
Destination are routed to this relationship")
             .build();
 
+    public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
+            .name("parse.failure")
+            .description("If a message cannot be parsed using the configured 
Record Reader, the contents of the "
+                    + "message will be routed to this Relationship as its own 
individual FlowFile.")
+            .autoTerminateDefault(true) // to make sure flow are still valid 
after upgrades
+            .build();
+
     private final static Set<Relationship> relationships;
 
     private final static List<PropertyDescriptor> propertyDescriptors;
@@ -205,6 +244,10 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         _propertyDescriptors.add(TIMEOUT);
         _propertyDescriptors.add(ERROR_QUEUE);
 
+        _propertyDescriptors.add(RECORD_READER);
+        _propertyDescriptors.add(RECORD_WRITER);
+        _propertyDescriptors.add(OUTPUT_STRATEGY);
+
         _propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
         _propertyDescriptors.addAll(JMS_CF_PROPERTIES);
 
@@ -212,6 +255,7 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
 
         Set<Relationship> _relationships = new HashSet<>();
         _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_PARSE_FAILURE);
         relationships = Collections.unmodifiableSet(_relationships);
     }
 
@@ -268,35 +312,11 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
 
         try {
-            consumer.consume(destinationName, errorQueueName, durable, shared, 
subscriptionName, messageSelector, charset, new ConsumerCallback() {
-                @Override
-                public void accept(final JMSResponse response) {
-                    if (response == null) {
-                        return;
-                    }
-
-                    try {
-                        FlowFile flowFile = processSession.create();
-                        flowFile = processSession.write(flowFile, out -> 
out.write(response.getMessageBody()));
-
-                        final Map<String, String> jmsHeaders = 
response.getMessageHeaders();
-                        final Map<String, String> jmsProperties = 
response.getMessageProperties();
-
-                        flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, 
processSession);
-                        flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, 
flowFile, processSession);
-                        flowFile = processSession.putAttribute(flowFile, 
JMS_SOURCE_DESTINATION_NAME, destinationName);
-
-                        
processSession.getProvenanceReporter().receive(flowFile, destinationName);
-                        processSession.putAttribute(flowFile, JMS_MESSAGETYPE, 
response.getMessageType());
-                        processSession.transfer(flowFile, REL_SUCCESS);
-
-                        processSession.commitAsync(() -> 
acknowledge(response), throwable -> response.reject());
-                    } catch (final Throwable t) {
-                        response.reject();
-                        throw t;
-                    }
-                }
-            });
+            if (context.getProperty(RECORD_READER).isSet()) {
+                processMessageSet(context, processSession, consumer, 
destinationName, errorQueueName, durable, shared, subscriptionName, 
messageSelector, charset);
+            } else {
+                processSingleMessage(processSession, consumer, 
destinationName, errorQueueName, durable, shared, subscriptionName, 
messageSelector, charset);
+            }
         } catch(Exception e) {
             getLogger().error("Error while trying to process JMS message", e);
             consumer.setValid(false);
@@ -305,6 +325,92 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         }
     }
 
+    private void processSingleMessage(ProcessSession processSession, 
JMSConsumer consumer, String destinationName, String errorQueueName,
+                                      boolean durable, boolean shared, String 
subscriptionName, String messageSelector, String charset) {
+
+        consumer.consumeSingleMessage(destinationName, errorQueueName, 
durable, shared, subscriptionName, messageSelector, charset, response -> {
+            if (response == null) {
+                return;
+            }
+
+            try {
+                final FlowFile flowFile = 
createFlowFileFromMessage(processSession, destinationName, response);
+
+                processSession.getProvenanceReporter().receive(flowFile, 
destinationName);
+                processSession.transfer(flowFile, REL_SUCCESS);
+                processSession.commitAsync(
+                        () -> withLog(() -> acknowledge(response)),
+                        __ -> withLog(() -> response.reject()));
+            } catch (final Throwable t) {
+                response.reject();
+                throw t;
+            }
+        });
+    }
+
+    private FlowFile createFlowFileFromMessage(ProcessSession processSession, 
String destinationName, JMSResponse response) {
+        FlowFile flowFile = processSession.create();
+        flowFile = processSession.write(flowFile, out -> 
out.write(response.getMessageBody()));
+
+        final Map<String, String> jmsHeaders = response.getMessageHeaders();
+        final Map<String, String> jmsProperties = 
response.getMessageProperties();
+
+        flowFile = 
updateFlowFileAttributesWithJMSAttributes(mergeJmsAttributes(jmsHeaders, 
jmsProperties), flowFile, processSession);
+        flowFile = processSession.putAttribute(flowFile, 
JMS_SOURCE_DESTINATION_NAME, destinationName);
+        flowFile = processSession.putAttribute(flowFile, JMS_MESSAGETYPE, 
response.getMessageType());
+
+        return flowFile;
+    }
+
+    private void processMessageSet(ProcessContext context, ProcessSession 
session, JMSConsumer consumer, String destinationName,String errorQueueName,
+                                   boolean durable, boolean shared, String 
subscriptionName, String messageSelector, String charset) {
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final OutputStrategy outputStrategy = 
OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
+
+        final FlowFileWriter<JMSResponse> flowFileWriter = new RecordWriter<>(
+                readerFactory,
+                writerFactory,
+                message -> message.getMessageBody() == null ? new byte[0] : 
message.getMessageBody(),
+                message -> mergeJmsAttributes(message.getMessageHeaders(), 
message.getMessageProperties()),
+                outputStrategy,
+                getLogger()
+        );
+
+        consumer.consumeMessageSet(destinationName, errorQueueName, durable, 
shared, subscriptionName, messageSelector, charset, jmsResponses -> {
+            flowFileWriter.write(session, jmsResponses, new 
FlowFileWriterCallback<>() {
+                @Override
+                public void onSuccess(FlowFile flowFile, List<JMSResponse> 
processedMessages, List<JMSResponse> failedMessages) {
+                    session.getProvenanceReporter().receive(flowFile, 
destinationName);
+                    session.adjustCounter(COUNTER_RECORDS_RECEIVED, 
processedMessages.size() + failedMessages.size(), false);
+                    session.adjustCounter(COUNTER_RECORDS_PROCESSED, 
processedMessages.size(), false);
+
+                    session.transfer(flowFile, REL_SUCCESS);
+                    session.commitAsync(
+                            () -> withLog(() -> acknowledge(processedMessages, 
failedMessages)),
+                            __ -> withLog(() -> reject(processedMessages, 
failedMessages))
+                    );
+                }
+
+                @Override
+                public void onParseFailure(FlowFile flowFile, JMSResponse 
message, Exception e) {
+                    session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+
+                    final FlowFile failedMessage = 
createFlowFileFromMessage(session, destinationName, message);
+                    session.transfer(failedMessage, REL_PARSE_FAILURE);
+                }
+
+                @Override
+                public void onFailure(FlowFile flowFile, List<JMSResponse> 
processedMessages, List<JMSResponse> failedMessages, Exception e) {
+                    reject(processedMessages, failedMessages);
+                    // It would be nicer to call rollback and yield here, but 
we are rethrowing the exception to have the same error handling with 
processSingleMessage.
+                    throw new ProcessException(e);
+                }
+            });
+        });
+    }
+
     private void acknowledge(final JMSResponse response) {
         try {
             response.acknowledge();
@@ -314,6 +420,26 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         }
     }
 
+    private void acknowledge(final List<JMSResponse> processedMessages, final 
List<JMSResponse> failedMessages) {
+        acknowledge(findLastBatchedJmsResponse(processedMessages, 
failedMessages));
+    }
+
+    private void reject(final List<JMSResponse> processedMessages, final 
List<JMSResponse> failedMessages) {
+        findLastBatchedJmsResponse(processedMessages, failedMessages).reject();
+    }
+
+    private void withLog(Runnable runnable) {
+        try {
+            runnable.run();
+        } catch (Exception e) {
+            getLogger().error("An error happened during commitAsync callback", 
e);
+            throw e;
+        }
+    }
+
+    private JMSResponse findLastBatchedJmsResponse(List<JMSResponse> 
processedMessages, List<JMSResponse> failedMessages) {
+        return Stream.of(processedMessages, 
failedMessages).flatMap(Collection::stream).max(Comparator.comparing(JMSResponse::getBatchOrder)).get();
+    }
 
     /**
      * Will create an instance of {@link JMSConsumer}
@@ -375,4 +501,16 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         flowFile = processSession.putAllAttributes(flowFile, attributes);
         return flowFile;
     }
+
+    private Map<String, String> mergeJmsAttributes(Map<String, String> 
headers, Map<String, String> properties) {
+        final Map<String, String> jmsAttributes = new HashMap<>(headers);
+        properties.forEach((key, value) -> {
+            if (jmsAttributes.containsKey(key)) {
+                getLogger().warn("JMS Header and Property name collides as an 
attribute. JMS Property will override the JMS Header attribute. 
attributeName=[{}]", key);
+            }
+            jmsAttributes.put(key, value);
+        });
+
+        return jmsAttributes;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index 4d856ca157..141b01d9c7 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -38,19 +38,24 @@ import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * Generic consumer of messages from JMS compliant messaging system.
  */
 class JMSConsumer extends JMSWorker {
 
+    private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;
+
     JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate 
jmsTemplate, ComponentLog logger) {
         super(connectionFactory, jmsTemplate, logger);
-        logger.debug("Created Message Consumer for '{}'", new Object[] 
{jmsTemplate});
+        logger.debug("Created Message Consumer for '{}'", jmsTemplate);
     }
 
 
@@ -82,79 +87,62 @@ class JMSConsumer extends JMSWorker {
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
+            final JMSResponse response = receiveMessage(session, 
messageConsumer, charset, errorQueueName);
+            if (response != null) {
+                // Provide the JMSResponse to the processor to handle. It is 
the responsibility of the
+                // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
+                // the responsibility of the processor to handle closing the 
Message Consumer.
+                // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
+                // the JMSResponse.
+                singleMessageConsumer.accept(response);
+            }
+        });
+    }
 
-                    final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
-                    final Map<String, String> messageProperties = 
extractMessageProperties(message);
-                    final JMSResponse response = new JMSResponse(message, 
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, 
messageProperties, msgConsumer);
+    /**
+     * Receives a list of messages from the broker. It is the 
consumerCallback's responsibility to acknowledge the received message.
+     */
+    public void consumeMessageSet(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                        final String charset, final 
Consumer<List<JMSResponse>> messageSetConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {
+            @Override
+            public void consume(Session session, MessageConsumer 
messageConsumer) throws JMSException {
+                final List<JMSResponse> jmsResponses = new ArrayList<>();
+                int batchCounter = 0;
+
+                JMSResponse response;
+                while ((response = receiveMessage(session, messageConsumer, 
charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) 
{
+                    response.setBatchOrder(batchCounter);
+                    jmsResponses.add(response);
+                    batchCounter++;
+                }
 
+                if (!jmsResponses.isEmpty()) {
                     // Provide the JMSResponse to the processor to handle. It 
is the responsibility of the
                     // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
                     // the responsibility of the processor to handle closing 
the Message Consumer.
                     // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
                     // the JMSResponse.
-                    consumerCallback.accept(response);
+                    messageSetConsumer.accept(jmsResponses);
+                }
+            }
+        });
+    }
+
+    private void doWithJmsTemplate(String destinationName, boolean durable, 
boolean shared, String subscriptionName, String messageSelector, 
MessageReceiver messageReceiver) {
+        this.jmsTemplate.execute(new SessionCallback<Void>() {
+            @Override
+            public Void doInJms(final Session session) throws JMSException {
+
+                final MessageConsumer messageConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
+                try {
+                    messageReceiver.consume(session, messageConsumer);
                 } catch (Exception e) {
                     // We need to call recover to ensure that in the event of
                     // abrupt end or exception the current session will stop 
message
@@ -166,7 +154,7 @@ class JMSConsumer extends JMSWorker {
                         processLog.debug("Failed to recover JMS session while 
handling initial error. The recover error is: ", e1);
                     }
 
-                    JmsUtils.closeMessageConsumer(msgConsumer);
+                    JmsUtils.closeMessageConsumer(messageConsumer);
                     throw e;
                 }
 
@@ -175,6 +163,64 @@ class JMSConsumer extends JMSWorker {
         }, true);
     }
 
+    private JMSResponse receiveMessage(Session session, MessageConsumer 
msgConsumer, String charset, String errorQueueName) throws JMSException {
+        final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
+
+        // If there is no message, there's nothing for us to do. We can simply 
close the consumer and return.
+        if (message == null) {
+            JmsUtils.closeMessageConsumer(msgConsumer);
+            return null;
+        }
+
+        String messageType;
+        byte[] messageBody;
+
+        try {
+            if (message instanceof TextMessage) {
+                messageType = TextMessage.class.getSimpleName();
+                messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
+            } else if (message instanceof BytesMessage) {
+                messageType = BytesMessage.class.getSimpleName();
+                messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
+            } else if (message instanceof ObjectMessage) {
+                messageType = ObjectMessage.class.getSimpleName();
+                messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
+            } else if (message instanceof StreamMessage) {
+                messageType = StreamMessage.class.getSimpleName();
+                messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
+            } else if (message instanceof MapMessage) {
+                messageType = MapMessage.class.getSimpleName();
+                messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) 
message);
+            } else {
+                acknowledge(message, session);
+
+                if (errorQueueName != null) {
+                    processLog.error("Received unsupported JMS Message type 
[{}]; rerouting message to error queue [{}].", message, errorQueueName);
+                    jmsTemplate.send(errorQueueName, __ -> message);
+                } else {
+                    processLog.error("Received unsupported JMS Message type 
[{}]; will skip this message.", message);
+                }
+
+                return null;
+            }
+        } catch (final MessageConversionException mce) {
+            processLog.error("Received a JMS Message [{}] but failed to obtain 
the content of the message; will acknowledge this message without creating a 
FlowFile for it.",
+                new Object[] {message}, mce);
+            acknowledge(message, session);
+
+            if (errorQueueName != null) {
+                jmsTemplate.send(errorQueueName, __ -> message);
+            }
+
+            return null;
+        }
+
+        final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
+        final Map<String, String> messageProperties = 
extractMessageProperties(message);
+
+        return new JMSResponse(message, session.getAcknowledgeMode(), 
messageType, messageBody, messageHeaders, messageProperties, msgConsumer);
+    }
+
     private void acknowledge(final Message message, final Session session) 
throws JMSException {
         if (message != null && session.getAcknowledgeMode() == 
Session.CLIENT_ACKNOWLEDGE) {
             message.acknowledge();
@@ -245,6 +291,7 @@ class JMSConsumer extends JMSWorker {
         private final Map<String, String> messageHeaders;
         private final Map<String, String> messageProperties;
         private final MessageConsumer messageConsumer;
+        private Integer batchOrder;
 
         JMSResponse(final Message message, final int acknowledgementMode, 
final String messageType, final byte[] messageBody, final Map<String, String> 
messageHeaders,
                     final Map<String, String> messageProperties, final 
MessageConsumer msgConsumer) {
@@ -286,13 +333,18 @@ class JMSConsumer extends JMSWorker {
         public void reject() {
             JmsUtils.closeMessageConsumer(messageConsumer);
         }
+
+        public Integer getBatchOrder() {
+            return batchOrder;
+        }
+
+        public void setBatchOrder(Integer batchOrder) {
+            this.batchOrder = batchOrder;
+        }
     }
 
-    /**
-     * Callback to be invoked while executing inJMS call (the call within the
-     * live JMS session)
-     */
-    static interface ConsumerCallback {
-        void accept(JMSResponse response);
+    interface MessageReceiver {
+        void consume(Session session, MessageConsumer messageConsumer) throws 
JMSException;
     }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 3a65bababd..67a49a9dd6 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -36,7 +36,7 @@ import java.util.Map.Entry;
 /**
  * Generic publisher of messages to JMS compliant messaging system.
  */
-final class JMSPublisher extends JMSWorker {
+class JMSPublisher extends JMSWorker {
 
     JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate 
jmsTemplate, ComponentLog processLog) {
         super(connectionFactory, jmsTemplate, processLog);
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 2a236adfb1..03158f4cbc 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -31,12 +31,18 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
+import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReader;
+import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReaderCallback;
+import 
org.apache.nifi.jms.processors.ioconcept.reader.StateTrackingFlowFileReader;
+import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
@@ -55,6 +61,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static 
org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static 
org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static 
org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+
 /**
  * An implementation of JMS Message publishing {@link Processor} which upon 
each
  * invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will
@@ -122,6 +132,16 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
             .required(true)
             .build();
 
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_READER)
+            .description("The Record Reader to use for parsing the incoming 
FlowFile into Records.")
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BASE_RECORD_WRITER)
+            .description("The Record Writer to use for serializing Records 
before publishing them as an JMS Message.")
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are sent to the JMS destination 
are routed to this relationship")
@@ -154,12 +174,14 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         _propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
         _propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
 
+        _propertyDescriptors.add(RECORD_READER);
+        _propertyDescriptors.add(RECORD_WRITER);
+
         _propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
         _propertyDescriptors.addAll(JMS_CF_PROPERTIES);
 
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
 
-
         Set<Relationship> _relationships = new HashSet<>();
         _relationships.add(REL_SUCCESS);
         _relationships.add(REL_FAILURE);
@@ -173,7 +195,7 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
      * as JMS headers on the newly constructed message. For the list of
      * available message headers please see {@link JmsHeaders}. <br>
      * <br>
-     * Upon success the incoming {@link FlowFile} is transferred to 
the'success'
+     * Upon success the incoming {@link FlowFile} is transferred to the 
'success'
      * {@link Relationship} and upon failure FlowFile is penalized and
      * transferred to the 'failure' {@link Relationship}
      */
@@ -182,12 +204,12 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         FlowFile flowFile = processSession.get();
         if (flowFile != null) {
             try {
-                String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
-                String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
-                Boolean allowIllegalChars = 
context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
-                String attributeHeaderRegex = 
context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
+                final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
+                final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+                final Boolean allowIllegalChars = 
context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
+                final String attributeHeaderRegex = 
context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
 
-                Map<String,String> attributesToSend = new HashMap<>();
+                final Map<String,String> attributesToSend = new HashMap<>();
                 // REGEX Attributes
                 final Pattern pattern = Pattern.compile(attributeHeaderRegex);
                 for (final Map.Entry<String, String> entry : 
flowFile.getAttributes().entrySet()) {
@@ -199,36 +221,63 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
                     }
                 }
 
-                switch (context.getProperty(MESSAGE_BODY).getValue()) {
-                    case TEXT_MESSAGE:
-                        try {
-                            publisher.publish(destinationName, 
this.extractTextMessageBody(flowFile, processSession, charset), 
attributesToSend);
-                        } catch(Exception e) {
-                            publisher.setValid(false);
-                            throw e;
-                        }
-                        break;
-                    case BYTES_MESSAGE:
-                    default:
-                        try {
-                            publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), attributesToSend);
-                        } catch(Exception e) {
-                            publisher.setValid(false);
-                            throw e;
-                        }
-                        break;
+                if (context.getProperty(RECORD_READER).isSet()) {
+                    final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+                    final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+                    final FlowFileReader flowFileReader = new 
StateTrackingFlowFileReader(
+                            getIdentifier(),
+                            new RecordSupplier(readerFactory, writerFactory),
+                            getLogger()
+                    );
+
+                    flowFileReader.read(
+                            processSession,
+                            flowFile,
+                            content -> publisher.publish(destinationName, 
content, attributesToSend),
+                            new FlowFileReaderCallback() {
+                                @Override
+                                public void onSuccess(FlowFile flowFile, int 
processedRecords, boolean isRecover, long transmissionMillis) {
+                                    final String eventTemplate = isRecover ? 
PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER : 
PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+                                    
processSession.getProvenanceReporter().send(
+                                            flowFile,
+                                            destinationName,
+                                            String.format(eventTemplate, 
processedRecords),
+                                            transmissionMillis);
+
+                                    processSession.transfer(flowFile, 
REL_SUCCESS);
+                                }
+
+                                @Override
+                                public void onFailure(FlowFile flowFile, int 
processedRecords, long transmissionMillis, Exception e) {
+                                    
processSession.getProvenanceReporter().send(
+                                            flowFile,
+                                            destinationName,
+                                            
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords),
+                                            transmissionMillis);
+
+                                    handleException(context, processSession, 
publisher, flowFile, e);
+                                }
+                            }
+                    );
+                } else {
+                    processStandardFlowFile(context, processSession, 
publisher, flowFile, destinationName, charset, attributesToSend);
+                    processSession.transfer(flowFile, REL_SUCCESS);
+                    processSession.getProvenanceReporter().send(flowFile, 
destinationName);
                 }
-                processSession.transfer(flowFile, REL_SUCCESS);
-                processSession.getProvenanceReporter().send(flowFile, 
destinationName);
             } catch (Exception e) {
-                processSession.transfer(flowFile, REL_FAILURE);
-                getLogger().error("Failed while sending message to JMS via " + 
publisher, e);
-                context.yield();
-                publisher.setValid(false);
+                handleException(context, processSession, publisher, flowFile, 
e);
             }
         }
     }
 
+    private void handleException(ProcessContext context, ProcessSession 
processSession, JMSPublisher publisher, FlowFile flowFile, Exception e) {
+        processSession.transfer(flowFile, REL_FAILURE);
+        this.getLogger().error("Failed while sending message to JMS via " + 
publisher, e);
+        context.yield();
+        publisher.setValid(false);
+    }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
@@ -250,6 +299,34 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         return new JMSPublisher(connectionFactory, jmsTemplate, 
this.getLogger());
     }
 
+    private void processStandardFlowFile(ProcessContext context, 
ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile,
+                                         String destinationName, String 
charset, Map<String,String> attributesToSend) {
+        publishMessage(context, processSession, publisher, flowFile, 
destinationName, charset, attributesToSend);
+    }
+
+    private void publishMessage(ProcessContext context, ProcessSession 
processSession, JMSPublisher publisher, FlowFile flowFile,
+                                String destinationName, String charset, 
Map<String,String> attributesToSend) {
+        switch (context.getProperty(MESSAGE_BODY).getValue()) {
+            case TEXT_MESSAGE:
+                try {
+                    publisher.publish(destinationName, 
this.extractTextMessageBody(flowFile, processSession, charset), 
attributesToSend);
+                } catch(Exception e) {
+                    publisher.setValid(false);
+                    throw e;
+                }
+                break;
+            case BYTES_MESSAGE:
+            default:
+                try {
+                    publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), attributesToSend);
+                } catch(Exception e) {
+                    publisher.setValid(false);
+                    throw e;
+                }
+                break;
+        }
+    }
+
     /**
      * Extracts contents of the {@link FlowFile} as byte array.
      */
@@ -264,4 +341,5 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         session.read(flowFile, in -> IOUtils.copy(in, writer, 
Charset.forName(charset)));
         return writer.toString();
     }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java
new file mode 100644
index 0000000000..14529f3460
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface FlowFileReader {
+    void read(ProcessSession session, FlowFile flowFile, MessageHandler 
messageHandler, FlowFileReaderCallback flowFileReaderCallback);
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java
new file mode 100644
index 0000000000..eb9d612806
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface FlowFileReaderCallback {
+    void onSuccess(FlowFile flowFile, int processedRecords, boolean isRecover, 
long transmissionMillis);
+    void onFailure(FlowFile flowFile, int processedRecords, long 
transmissionMillis, Exception e);
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java
new file mode 100644
index 0000000000..6c5e2a8d7b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+public interface MessageHandler {
+    void handle(byte[] content);
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java
new file mode 100644
index 0000000000..e2d0f77486
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
+
+public class StateTrackingFlowFileReader implements FlowFileReader {
+
+    public static final String ATTR_READ_FAILED_INDEX_SUFFIX = 
".read.failed.index";
+
+    private final String identifier;
+    private final RecordSupplier recordSupplier;
+    private final ComponentLog logger;
+
+    public StateTrackingFlowFileReader(String identifier, RecordSupplier 
recordSupplier, ComponentLog logger) {
+        this.identifier = identifier;
+        this.recordSupplier = recordSupplier;
+        this.logger = logger;
+    }
+
+    @Override
+    public void read(ProcessSession session, FlowFile flowFile, MessageHandler 
messageHandler, FlowFileReaderCallback flowFileReaderCallback) {
+        final StopWatch stopWatch = new StopWatch(true);
+        final AtomicInteger processedRecords = new AtomicInteger();
+
+        final String publishFailedIndexAttributeName = identifier + 
ATTR_READ_FAILED_INDEX_SUFFIX;
+
+        try {
+            final Long previousProcessFailedAt = 
ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+            session.read(flowFile, in -> recordSupplier.process(flowFile, in, 
processedRecords, previousProcessFailedAt, logger, messageHandler));
+
+            FlowFile successFlowFile = flowFile;
+
+            final boolean isRecover = previousProcessFailedAt != null;
+            if (isRecover) {
+                successFlowFile = session.removeAttribute(flowFile, 
publishFailedIndexAttributeName);
+            }
+
+            flowFileReaderCallback.onSuccess(successFlowFile, 
processedRecords.get(), isRecover, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        } catch (Exception e) {
+            logger.error("An error happened while processing records. Routing 
to failure.", e);
+
+            final FlowFile failedFlowFile = session.putAttribute(flowFile, 
publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+            flowFileReaderCallback.onFailure(failedFlowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e);
+        }
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/ProvenanceEventTemplates.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/ProvenanceEventTemplates.java
new file mode 100644
index 0000000000..d02dcb9198
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/ProvenanceEventTemplates.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader.record;
+
+public class ProvenanceEventTemplates {
+
+    public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = 
"Publish failed after %d successfully published records.";
+    public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = 
"Successfully finished publishing previously failed records. Total record 
count: %d";
+    public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = 
"Successfully published all records. Total record count: %d";
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java
new file mode 100644
index 0000000000..95172c5d98
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.ioconcept.reader.MessageHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RecordSupplier {
+
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+
+    public RecordSupplier(RecordReaderFactory readerFactory, 
RecordSetWriterFactory writerFactory) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+    }
+
+    public void process(FlowFile flowfile, InputStream in, AtomicInteger 
processedRecords, Long processFromIndex, ComponentLog logger, MessageHandler 
messageHandler) throws IOException {
+
+        try (final RecordReader reader = 
readerFactory.createRecordReader(flowfile, in, logger)) {
+            final RecordSet recordSet = reader.createRecordSet();
+
+            final RecordSchema schema = 
writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
+
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+            Record record;
+            while ((record = recordSet.next()) != null) {
+                if (processFromIndex != null && processedRecords.get() < 
processFromIndex) {
+                    processedRecords.getAndIncrement();
+                    continue;
+                }
+
+                baos.reset();
+
+                try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos, flowfile)) {
+                    writer.write(record);
+                    writer.flush();
+                }
+
+                final byte[] messageContent = baos.toByteArray();
+
+                messageHandler.handle(messageContent);
+
+                processedRecords.getAndIncrement();
+            }
+        } catch (SchemaNotFoundException | MalformedRecordException e) {
+            throw new ProcessException("An error happened during creating 
components for serialization.", e);
+        }
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java
new file mode 100644
index 0000000000..390a8aa25b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import java.util.Map;
+
+public interface AttributeSource<T> {
+    Map<String, String> getAttributes(T message);
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java
new file mode 100644
index 0000000000..85efcc1739
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+public interface FlowFileWriter<T> {
+    void write(ProcessSession session, List<T> messages, 
FlowFileWriterCallback<T> flowFileWriterCallback);
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java
new file mode 100644
index 0000000000..7a8c464351
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.List;
+
+public interface FlowFileWriterCallback<T> {
+    void onSuccess(FlowFile flowFile, List<T> processedMessages, List<T> 
failedMessages);
+    void onParseFailure(FlowFile flowFile, T message, Exception e);
+    void onFailure(FlowFile flowFile, List<T> processedMessages, List<T> 
failedMessages, Exception e);
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java
new file mode 100644
index 0000000000..40c3342b06
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+public interface Marshaller<T> {
+    byte[] marshall(T message);
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/OutputStrategy.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/OutputStrategy.java
new file mode 100644
index 0000000000..6c8f6c8cc2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/OutputStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer.record;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {
+    USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the message to 
the FlowFile record."),
+
+    USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the additional attributes 
into the FlowFile record on a separate leaf. (See processor usage for more 
information.)"),
+
+    USE_APPENDER("USE_APPENDER", "Use Appender", "Write the additional 
attributes into the FlowFile record prefixed with \"_\". (See processor usage 
for more information.)");
+
+    private final String value;
+
+    private final String displayName;
+
+    private final String description;
+
+    OutputStrategy(final String value, final String displayName, final String 
description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordUtils.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordUtils.java
new file mode 100644
index 0000000000..114d8b9c03
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer.record;
+
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordUtils {
+
+    public static Record append(final Record originalRecord, final Map<String, 
String> decoratorValues, final String decoratorPrefix) {
+        final List<RecordField> originalFields = 
originalRecord.getSchema().getFields();
+
+        final List<RecordField> mergedFields = new ArrayList<>(originalFields);
+        decoratorValues.forEach((key, value) -> mergedFields.add(new 
RecordField(decoratorPrefix + key, RecordFieldType.STRING.getDataType())));
+
+        final RecordSchema mergedSchema = new SimpleRecordSchema(mergedFields);
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        
originalFields.stream().map(RecordField::getFieldName).forEach(fieldName -> 
recordValues.put(fieldName, originalRecord.getValue(fieldName)));
+        decoratorValues.forEach((key, value) -> 
recordValues.put(decoratorPrefix + key, value));
+
+        return new MapRecord(mergedSchema, recordValues);
+    }
+
+    public static MapRecord wrap(final Record originalRecord, final String 
originalRecordKey, final Map<String, String> decoratorValues, final String 
decoratorKey)
+            throws IOException, MalformedRecordException {
+
+        // create schema
+        final Tuple<RecordField, Object> originalRecordLeaf = 
wrapStandardRecord(originalRecord, originalRecordKey);
+        final Tuple<RecordField, Object> decoratorLeaf = 
wrapDecoratorValues(decoratorValues, decoratorKey);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(originalRecordLeaf.getKey(), 
decoratorLeaf.getKey()));
+
+        // assign values
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(originalRecordLeaf.getKey().getFieldName(), 
originalRecordLeaf.getValue());
+        recordValues.put(decoratorLeaf.getKey().getFieldName(), 
decoratorLeaf.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private static Tuple<RecordField, Object> wrapStandardRecord(final Record 
record, final String recordKey) {
+        final RecordSchema recordSchema = (record == null) ? null : 
record.getSchema();
+        final RecordField recordField = new RecordField(recordKey, 
RecordFieldType.RECORD.getRecordDataType(recordSchema));
+        return new Tuple<>(recordField, record);
+    }
+
+    private static Tuple<RecordField, Object> wrapDecoratorValues(final 
Map<String, String> decoratorValues, final String decoratorKey) {
+        final RecordField recordField = new RecordField(decoratorKey, 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        return new Tuple<>(recordField, decoratorValues);
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java
new file mode 100644
index 0000000000..0bea02da8d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.jms.processors.ioconcept.writer.AttributeSource;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
+import org.apache.nifi.jms.processors.ioconcept.writer.Marshaller;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_APPENDER;
+import static 
org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_VALUE;
+import static 
org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_WRAPPER;
+
+public class RecordWriter<T> implements FlowFileWriter<T> {
+
+    private final static String RECORD_COUNT_KEY = "record.count";
+
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final Marshaller<T> marshaller;
+    private final AttributeSource<T> attributeSource;
+    private final OutputStrategy outputStrategy;
+    private final ComponentLog logger;
+
+    public RecordWriter(RecordReaderFactory readerFactory,
+                        RecordSetWriterFactory writerFactory,
+                        Marshaller<T> marshaller,
+                        AttributeSource<T> attributeSource,
+                        OutputStrategy outputStrategy,
+                        ComponentLog logger) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.marshaller = marshaller;
+        this.attributeSource = attributeSource;
+        this.outputStrategy = outputStrategy;
+        this.logger = logger;
+    }
+
+    @Override
+    public void write(ProcessSession session, List<T> messages, 
FlowFileWriterCallback<T> flowFileWriterCallback) {
+        FlowFile flowFile = session.create();
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<T> processedMessages = new ArrayList<>();
+        final List<T> failedMessages = new ArrayList<>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            for (T message : messages) {
+                if (message == null) {
+                    break;
+                }
+
+                final byte[] recordBytes = marshaller.marshall(message);
+                try (final InputStream in = new 
ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    // parse incoming message which may contain multiple 
messages
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, 
in, recordBytes.length, logger);
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to parse message due to comms 
failure. Will roll back session and try again momentarily.");
+                        flowFileWriterCallback.onFailure(flowFile, 
processedMessages, failedMessages, ioe);
+                        closeWriter(writer);
+                        return;
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse message, sending to the 
parse failure relationship", e);
+                        failedMessages.add(message);
+                        flowFileWriterCallback.onParseFailure(flowFile, 
message, e);
+                        continue;
+                    }
+
+                    // write messages as records into FlowFile
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if (attributeSource != null && 
!outputStrategy.equals(USE_VALUE)) {
+                                final Map<String, String> additionalAttributes 
= attributeSource.getAttributes(message);
+                                if (outputStrategy.equals(USE_APPENDER)) {
+                                    record = RecordUtils.append(record, 
additionalAttributes, "_");
+                                } else if (outputStrategy.equals(USE_WRAPPER)){
+                                    record = RecordUtils.wrap(record, "value", 
additionalAttributes, "_");
+                                }
+                            }
+
+                            if (!isWriterInitialized) {
+                                final RecordSchema recordSchema = 
record.getSchema();
+                                final OutputStream rawOut = 
session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for 
FlowFile, sending to the parse failure relationship", e);
+                                    failedMessages.add(message);
+                                    
flowFileWriterCallback.onParseFailure(flowFile, message, e);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, 
writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                processedMessages.add(message);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using 
the configured Record Writer, sending to the parse failure relationship", re);
+                                failedMessages.add(message);
+                                
flowFileWriterCallback.onParseFailure(flowFile, message, re);
+                            }
+                        }
+                    } catch (final IOException | MalformedRecordException | 
SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                        failedMessages.add(message);
+                        flowFileWriterCallback.onParseFailure(flowFile, 
message, e);
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                    failedMessages.add(message);
+                    flowFileWriterCallback.onParseFailure(flowFile, message, 
e);
+                }
+            }
+
+            if (writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put(RECORD_COUNT_KEY, 
String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            flowFileWriterCallback.onFailure(flowFile, processedMessages, 
failedMessages, e);
+        } finally {
+            closeWriter(writer);
+        }
+
+        if (recordCount.get() == 0) {
+            session.remove(flowFile);
+            return;
+        }
+
+        session.putAllAttributes(flowFile, attributes);
+
+        final int count = recordCount.get();
+        logger.info("Successfully processed {} records for {}", count, 
flowFile);
+
+        flowFileWriterCallback.onSuccess(flowFile, processedMessages, 
failedMessages);
+    }
+
+    private void closeWriter(final RecordSetWriter writer) {
+        try {
+            if (writer != null) {
+                writer.close();
+            }
+        } catch (final Exception ioe) {
+            logger.warn("Failed to close Record Writer", ioe);
+        }
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index e61d1a38cc..de3e97761a 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.jms.processors;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
@@ -27,6 +31,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -46,6 +51,7 @@ import org.springframework.jms.support.JmsHeaders;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
@@ -64,7 +70,10 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.Arrays.asList;
 import static 
org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
+import static 
org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetWriterService;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -77,6 +86,8 @@ import static org.mockito.Mockito.when;
 
 public class ConsumeJMSIT {
 
+    private static final String JMS_DESTINATION_ATTRIBUTE_NAME = 
"jms_destination";
+
     @Test
     public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
         final String destinationName = "cooQueue";
@@ -478,6 +489,194 @@ public class ConsumeJMSIT {
         }
     }
 
+    @Test
+    public void testConsumeRecords() throws InitializationException {
+        String destination = "testConsumeRecords";
+        ArrayNode expectedRecordSet = createTestJsonInput();
+
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        try {
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedRecordSet.get(0).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedRecordSet.get(1).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedRecordSet.get(2).toString()));
+
+            TestRunner testRunner = 
initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+            testRunner.setProperty(ConsumeJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+            testRunner.setProperty(ConsumeJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+
+            testRunner.run(1, false);
+
+            List<MockFlowFile> successFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertEquals(1, successFlowFiles.size());
+            assertEquals(expectedRecordSet.toString(), new 
String(successFlowFiles.get(0).toByteArray()));
+
+            List<MockFlowFile> parseFailedFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+            assertEquals(0, parseFailedFlowFiles.size());
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    @Test
+    public void testConsumeMalformedRecords() throws InitializationException {
+        String destination = "testConsumeRecords";
+        ArrayNode expectedRecordSet = createTestJsonInput();
+        String expectedParseFailedContent1 = "this is not a json";
+        String expectedParseFailedContent2 = "this is still not a json";
+
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        try {
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedRecordSet.get(0).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedParseFailedContent1));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedRecordSet.get(1).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedParseFailedContent2));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(expectedRecordSet.get(2).toString()));
+
+            TestRunner testRunner = 
initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+            testRunner.setProperty(ConsumeJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+            testRunner.setProperty(ConsumeJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+            testRunner.setRelationshipAvailable(ConsumeJMS.REL_PARSE_FAILURE);
+
+            testRunner.run(1, false);
+
+            // checking whether the processor was able to construct a valid 
recordSet from the properly formatted messages
+            List<MockFlowFile> successFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertEquals(1, successFlowFiles.size());
+            assertEquals(expectedRecordSet.toString(), new 
String(successFlowFiles.get(0).toByteArray()));
+
+            // and checking whether it creates separate FlowFiles for the 
malformed messages
+            List<MockFlowFile> parseFailedFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+            assertEquals(2, parseFailedFlowFiles.size());
+            assertEquals(expectedParseFailedContent1, new 
String(parseFailedFlowFiles.get(0).toByteArray()));
+            assertEquals(expectedParseFailedContent2, new 
String(parseFailedFlowFiles.get(1).toByteArray()));
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    @Test
+    public void testConsumeRecordsWithAppenderOutputStrategy() throws 
InitializationException, JsonProcessingException {
+        String destination = "testConsumeRecordsWithAppenderOutputStrategy";
+        ArrayNode inputRecordSet = createTestJsonInput();
+
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        try {
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(inputRecordSet.get(0).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(inputRecordSet.get(1).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(inputRecordSet.get(2).toString()));
+
+            TestRunner testRunner = 
initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+            testRunner.setProperty(ConsumeJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+            testRunner.setProperty(ConsumeJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+            testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, 
OutputStrategy.USE_APPENDER.getValue());
+
+            testRunner.run(1, false);
+
+            List<MockFlowFile> successFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertEquals(1, successFlowFiles.size());
+            JsonNode flowFileContentAsJson = deserializeToJsonNode(new 
String(successFlowFiles.get(0).toByteArray()));
+            // checking that the output contains at least a part of the 
original input
+            assertEquals(inputRecordSet.get(0).get("firstAttribute").asText(), 
flowFileContentAsJson.get(0).get("firstAttribute").asText());
+            assertEquals(inputRecordSet.get(1).get("firstAttribute").asText(), 
flowFileContentAsJson.get(1).get("firstAttribute").asText());
+            assertEquals(inputRecordSet.get(2).get("firstAttribute").asText(), 
flowFileContentAsJson.get(2).get("firstAttribute").asText());
+            // checking jms_destination attribute exists with the given value
+            // this attribute has been chosen because it is deterministic; 
others vary based on host, time, etc.
+            // not nice, but stubbing all attributes would be uglier with the 
current code structure
+            assertEquals(destination, flowFileContentAsJson.get(0).get("_" + 
JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+            assertEquals(destination, flowFileContentAsJson.get(1).get("_" + 
JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+            assertEquals(destination, flowFileContentAsJson.get(2).get("_" + 
JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+
+            List<MockFlowFile> parseFailedFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+            assertEquals(0, parseFailedFlowFiles.size());
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    @Test
+    public void testConsumeRecordsWithWrapperOutputStrategy() throws 
InitializationException, JsonProcessingException {
+        String destination = "testConsumeRecordsWithWrapperOutputStrategy";
+        String valueKey = "value";
+        String attributeKey = "_";
+        ArrayNode inputRecordSet = createTestJsonInput();
+
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        try {
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(inputRecordSet.get(0).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(inputRecordSet.get(1).toString()));
+            jmsTemplate.send(destination, session -> 
session.createTextMessage(inputRecordSet.get(2).toString()));
+
+            TestRunner testRunner = 
initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+            testRunner.setProperty(ConsumeJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+            testRunner.setProperty(ConsumeJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+            testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, 
OutputStrategy.USE_WRAPPER.getValue());
+
+            testRunner.run(1, false);
+
+            List<MockFlowFile> successFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertEquals(1, successFlowFiles.size());
+            JsonNode flowFileContentAsJson = deserializeToJsonNode(new 
String(successFlowFiles.get(0).toByteArray()));
+            // checking that the original json is equal to the leaf
+            assertEquals(inputRecordSet.get(0), 
flowFileContentAsJson.get(0).get(valueKey));
+            assertEquals(inputRecordSet.get(1), 
flowFileContentAsJson.get(1).get(valueKey));
+            assertEquals(inputRecordSet.get(2), 
flowFileContentAsJson.get(2).get(valueKey));
+            // checking that the attribute leaf contains at least the 
jms_destination attribute
+            // this attribute has been chosen because it is deterministic; 
others vary based on host, time, etc.
+            // not nice, but stubbing all attributes would be uglier with the 
current code structure
+            assertEquals(destination, 
flowFileContentAsJson.get(0).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+            assertEquals(destination, 
flowFileContentAsJson.get(1).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+            assertEquals(destination, 
flowFileContentAsJson.get(2).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+
+            List<MockFlowFile> parseFailedFlowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+            assertEquals(0, parseFailedFlowFiles.size());
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    private static ArrayNode createTestJsonInput() {
+        final ObjectMapper mapper = new ObjectMapper();
+
+        return mapper.createArrayNode().addAll(asList(
+                mapper.createObjectNode()
+                        .put("recordId", 1)
+                        .put("firstAttribute", "foo")
+                        .put("secondAttribute", false),
+                mapper.createObjectNode()
+                        .put("recordId", 2)
+                        .put("firstAttribute", "bar")
+                        .put("secondAttribute", true),
+                mapper.createObjectNode()
+                        .put("recordId", 3)
+                        .put("firstAttribute", "foobar")
+                        .put("secondAttribute", false)
+        ));
+    }
+
+    private JsonNode deserializeToJsonNode(String rawJson) throws 
JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readTree(rawJson);
+    }
+
+    private TestRunner initializeTestRunner(ConnectionFactory 
connectionFactory, String destinationName) throws InitializationException {
+        return initializeTestRunner(new ConsumeJMS(), connectionFactory, 
destinationName);
+    }
+
+    private TestRunner initializeTestRunner(ConsumeJMS processor, 
ConnectionFactory connectionFactory, String destinationName) throws 
InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(processor);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(connectionFactory);
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
+        runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
+
+        return runner;
+    }
+
     private static void publishAMessage(ActiveMQConnectionFactory cf, final 
String destinationName, String messageContent) throws JMSException {
         // Publish a message.
         try (Connection conn = cf.createConnection();
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index b681bc09e8..e78734ac15 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -16,11 +16,27 @@
  */
 package org.apache.nifi.jms.processors;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.support.JmsHeaders;
 
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,29 +47,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
-import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
-import org.apache.nifi.logging.ComponentLog;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.support.JmsHeaders;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class JMSPublisherConsumerIT {
 
@@ -69,7 +68,7 @@ public class JMSPublisherConsumerIT {
             return message;
         };
 
-        ConsumerCallback responseChecker = response -> {
+        Consumer<JMSResponse> responseChecker = response -> {
             assertEquals(
                 "stringAsObject",
                 SerializationUtils.deserialize(response.getMessageBody())
@@ -123,7 +122,7 @@ public class JMSPublisherConsumerIT {
             expected = byteArrayOutputStream.toByteArray();
         }
 
-        ConsumerCallback responseChecker = response -> {
+        Consumer<JMSResponse> responseChecker = response -> {
             byte[] actual = response.getMessageBody();
 
             assertArrayEquals(
@@ -175,7 +174,7 @@ public class JMSPublisherConsumerIT {
     }
 
     private void testMapMessage(String destinationName, MessageCreator 
messageCreator, String expectedJson) {
-        ConsumerCallback responseChecker = response -> {
+        Consumer<JMSResponse> responseChecker = response -> {
             ObjectMapper objectMapper = new ObjectMapper();
 
             try {
@@ -191,7 +190,7 @@ public class JMSPublisherConsumerIT {
         testMessage(destinationName, messageCreator, responseChecker);
     }
 
-    private void testMessage(String destinationName, MessageCreator 
messageCreator, ConsumerCallback responseChecker) {
+    private void testMessage(String destinationName, MessageCreator 
messageCreator, Consumer<JMSResponse> responseChecker) {
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
         AtomicBoolean callbackInvoked = new AtomicBoolean();
@@ -200,7 +199,7 @@ public class JMSPublisherConsumerIT {
             jmsTemplate.send(destinationName, messageCreator);
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, null, false, false, null, null, 
"UTF-8", response -> {
+            consumer.consumeSingleMessage(destinationName, null, false, false, 
null, null, "UTF-8", response -> {
                 callbackInvoked.set(true);
                 responseChecker.accept(response);
             });
@@ -282,11 +281,8 @@ public class JMSPublisherConsumerIT {
             });
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, null, false, false, null, null, 
"UTF-8", new ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    // noop
-                }
+            consumer.consumeSingleMessage(destinationName, null, false, false, 
null, null, "UTF-8", response -> {
+                // noop
             });
         } finally {
             ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
@@ -312,15 +308,12 @@ public class JMSPublisherConsumerIT {
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, null, false, false, null, null, 
"UTF-8", new ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    callbackInvoked.set(true);
-                    assertEquals("hello from the other side", new 
String(response.getMessageBody()));
-                    assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
-                    assertEquals("foo", 
response.getMessageProperties().get("foo"));
-                    assertEquals("false", 
response.getMessageProperties().get("bar"));
-                }
+            consumer.consumeSingleMessage(destinationName, null, false, false, 
null, null, "UTF-8", response -> {
+                callbackInvoked.set(true);
+                assertEquals("hello from the other side", new 
String(response.getMessageBody()));
+                assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
+                assertEquals("foo", 
response.getMessageProperties().get("foo"));
+                assertEquals("false", 
response.getMessageProperties().get("bar"));
             });
             assertTrue(callbackInvoked.get());
 
@@ -348,13 +341,6 @@ public class JMSPublisherConsumerIT {
 
             final AtomicInteger msgCount = new AtomicInteger(0);
 
-            final ConsumerCallback callback = new ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    msgCount.incrementAndGet();
-                }
-            };
-
             final Thread[] threads = new Thread[4];
             for (int i = 0; i < 4; i++) {
                 final Thread t = new Thread(() -> {
@@ -364,7 +350,8 @@ public class JMSPublisherConsumerIT {
                         JMSConsumer consumer = new 
JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), 
consumeTemplate, mock(ComponentLog.class));
 
                         for (int j = 0; j < messagesPerThreadCount && 
msgCount.get() < totalMessageCount; j++) {
-                            consumer.consume(destinationName, null, false, 
false, null, null, "UTF-8", callback);
+                            consumer.consumeSingleMessage(destinationName, 
null, false, false, null, null, "UTF-8",
+                                    response -> msgCount.incrementAndGet());
                         }
                     } finally {
                         ((CachingConnectionFactory) 
consumeTemplate.getConnectionFactory()).destroy();
@@ -404,13 +391,10 @@ public class JMSPublisherConsumerIT {
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
             try {
-                consumer.consume(destinationName, null, false, false, null, 
null, "UTF-8", new ConsumerCallback() {
-                    @Override
-                    public void accept(JMSResponse response) {
-                        callbackInvoked.set(true);
-                        assertEquals("1", new 
String(response.getMessageBody()));
-                        throw new RuntimeException("intentional to avoid 
explicit ack");
-                    }
+                consumer.consumeSingleMessage(destinationName, null, false, 
false, null, null, "UTF-8", response -> {
+                    callbackInvoked.set(true);
+                    assertEquals("1", new String(response.getMessageBody()));
+                    throw new RuntimeException("intentional to avoid explicit 
ack");
                 });
             } catch (Exception e) {
                 // expected
@@ -421,17 +405,14 @@ public class JMSPublisherConsumerIT {
 
             // should receive the same message, but will process it 
successfully
             while (!callbackInvoked.get()) {
-                consumer.consume(destinationName, null, false, false, null, 
null, "UTF-8", new ConsumerCallback() {
-                    @Override
-                    public void accept(JMSResponse response) {
-                        if (response == null) {
-                            return;
-                        }
-
-                        callbackInvoked.set(true);
-                        assertEquals("1", new 
String(response.getMessageBody()));
-                        acknowledge(response);
+                consumer.consumeSingleMessage(destinationName, null, false, 
false, null, null, "UTF-8", response -> {
+                    if (response == null) {
+                        return;
                     }
+
+                    callbackInvoked.set(true);
+                    assertEquals("1", new String(response.getMessageBody()));
+                    acknowledge(response);
                 });
             }
 
@@ -441,17 +422,14 @@ public class JMSPublisherConsumerIT {
             // receiving next message and fail again
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, null, false, false, 
null, null, "UTF-8", new ConsumerCallback() {
-                        @Override
-                        public void accept(JMSResponse response) {
-                            if (response == null) {
-                                return;
-                            }
-
-                            callbackInvoked.set(true);
-                            assertEquals("2", new 
String(response.getMessageBody()));
-                            throw new RuntimeException("intentional to avoid 
explicit ack");
+                    consumer.consumeSingleMessage(destinationName, null, 
false, false, null, null, "UTF-8", response -> {
+                        if (response == null) {
+                            return;
                         }
+
+                        callbackInvoked.set(true);
+                        assertEquals("2", new 
String(response.getMessageBody()));
+                        throw new RuntimeException("intentional to avoid 
explicit ack");
                     });
                 }
             } catch (Exception e) {
@@ -463,17 +441,14 @@ public class JMSPublisherConsumerIT {
             // should receive the same message, but will process it 
successfully
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, null, false, false, 
null, null, "UTF-8", new ConsumerCallback() {
-                        @Override
-                        public void accept(JMSResponse response) {
-                            if (response == null) {
-                                return;
-                            }
-
-                            callbackInvoked.set(true);
-                            assertEquals("2", new 
String(response.getMessageBody()));
-                            acknowledge(response);
+                    consumer.consumeSingleMessage(destinationName, null, 
false, false, null, null, "UTF-8", response -> {
+                        if (response == null) {
+                            return;
                         }
+
+                        callbackInvoked.set(true);
+                        assertEquals("2", new 
String(response.getMessageBody()));
+                        acknowledge(response);
                     });
                 }
             } catch (Exception e) {
@@ -514,12 +489,9 @@ public class JMSPublisherConsumerIT {
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, null, false, false, null, 
messageSelector, "UTF-8", new ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    callbackInvoked.set(true);
-                    assertEquals("msg1", new 
String(response.getMessageBody()));
-                }
+            consumer.consumeSingleMessage(destinationName, null, false, false, 
null, messageSelector, "UTF-8", response -> {
+                callbackInvoked.set(true);
+                assertEquals("msg1", new String(response.getMessageBody()));
             });
             assertTrue(callbackInvoked.get());
 
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index 802f4956b2..43c51d6916 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.jms.processors;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
@@ -28,12 +30,17 @@ import 
org.apache.nifi.jms.processors.helpers.ConnectionFactoryInvocationHandler
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.Mockito;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
@@ -48,21 +55,42 @@ import java.lang.reflect.Proxy;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.Arrays.asList;
+import static org.apache.nifi.jms.processors.PublishJMS.REL_FAILURE;
+import static org.apache.nifi.jms.processors.PublishJMS.REL_SUCCESS;
 import static 
org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
+import static 
org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetReaderService;
+import static 
org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetWriterService;
+import static 
org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static 
org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static 
org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static 
org.apache.nifi.jms.processors.ioconcept.reader.StateTrackingFlowFileReader.ATTR_READ_FAILED_INDEX_SUFFIX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class PublishJMSIT {
 
+    TestRunner testRunner;
+
+    @AfterEach
+    public void cleanup() {
+        if (testRunner != null) {
+            testRunner.run(1, true, false); // Run once just so that we can 
trigger the shutdown of the Connection Factory
+            testRunner = null;
+        }
+    }
+
     @Test
     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
     public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
@@ -91,7 +119,7 @@ public class PublishJMSIT {
         runner.enqueue("Hey dude!".getBytes(), attributes);
         runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
 
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         assertNotNull(successFF);
 
         JmsTemplate jmst = new JmsTemplate(cf);
@@ -134,7 +162,7 @@ public class PublishJMSIT {
         runner.enqueue("Hey dude!".getBytes(), attributes);
         runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
 
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         assertNotNull(successFF);
 
         JmsTemplate jmst = new JmsTemplate(cf);
@@ -169,8 +197,8 @@ public class PublishJMSIT {
         runner.run();
         Thread.sleep(200);
 
-        
assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty());
-        
assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0));
+        assertTrue(runner.getFlowFilesForRelationship(REL_SUCCESS).isEmpty());
+        assertNotNull(runner.getFlowFilesForRelationship(REL_FAILURE).get(0));
     }
 
     @Test
@@ -198,7 +226,7 @@ public class PublishJMSIT {
         runner.enqueue("Hey dude!".getBytes(), attributes);
         runner.run(1, false);
 
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         assertNotNull(successFF);
 
         JmsTemplate jmst = new JmsTemplate(cf);
@@ -256,7 +284,7 @@ public class PublishJMSIT {
         runner.enqueue("Hey dude!".getBytes(), attributes);
         runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left intact so that we can use it.
 
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         assertNotNull(successFF);
 
         JmsTemplate jmst = new JmsTemplate(cf);
@@ -316,7 +344,7 @@ public class PublishJMSIT {
         runner.enqueue("Hey dude!".getBytes(), attributes);
         runner.run(1, false);
 
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         assertNotNull(successFF);
 
         JmsTemplate jmst = new JmsTemplate(cf);
@@ -542,4 +570,210 @@ public class PublishJMSIT {
 
         assertTrue(((MockProcessContext) 
runner.getProcessContext()).isYieldCalled(), "In case of an exception, the 
processor should be yielded.");
     }
+
+    @Test
+    public void testPublishRecords() throws InitializationException {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        String destination = "testPublishRecords";
+        testRunner = initializeTestRunner(cf, destination);
+        testRunner.setProperty(PublishJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run(1, false); // Run once but don't shut down because we 
want the Connection Factory left intact so that we can use it.
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS,
 3));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(cf, destination, testInput.get(0).toString());
+        verifyPublishedMessage(cf, destination, testInput.get(1).toString());
+        verifyPublishedMessage(cf, destination, testInput.get(2).toString());
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+        
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName),
 "Failed attribute should not be present on the FlowFile");
+    }
+
+    @Test
+    public void testPublishRecordsFailed() throws InitializationException {
+        PublishJMS processor = new PublishJMS() {
+            @Override
+            protected void rendezvousWithJms(ProcessContext context, 
ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
+                JMSPublisher spiedPublisher = Mockito.spy(publisher);
+                Mockito.doCallRealMethod()
+                        .doThrow(new RuntimeException("Second publish 
failed."))
+                        .when(spiedPublisher).publish(any(), 
any(byte[].class), any());
+                super.rendezvousWithJms(context, processSession, 
spiedPublisher);
+            }
+        };
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        String destination = "testPublishRecords";
+        testRunner = initializeTestRunner(processor, cf, destination);
+        testRunner.setProperty(PublishJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.assertValid();
+
+        final ArrayNode testInput = createTestJsonInput();
+
+        testRunner.enqueue(testInput.toString().getBytes());
+
+        testRunner.run(1, false); // Run once but don't shut down because we 
want the Connection Factory left intact so that we can use it.
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE,
 1));
+
+        verifyPublishedMessage(cf, destination, testInput.get(0).toString());
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+        assertEquals("1", 
failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record 
is expected to be published successfully.");
+    }
+
+    @Test
+    public void 
testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws 
InitializationException {
+        PublishJMS processor = new PublishJMS() {
+            @Override
+            protected void rendezvousWithJms(ProcessContext context, 
ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
+                JMSPublisher spiedPublisher = Mockito.spy(publisher);
+                Mockito.doCallRealMethod()
+                        .doThrow(new RuntimeException("Second publish 
failed."))
+                        .when(spiedPublisher).publish(any(), 
any(byte[].class), any());
+                super.rendezvousWithJms(context, processSession, 
spiedPublisher);
+            }
+        };
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        String destination = "testPublishRecords";
+        testRunner = initializeTestRunner(processor, cf, destination);
+        testRunner.setProperty(PublishJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run(1, false); // Run once but don't shut down because we 
want the Connection Factory left intact so that we can use it.
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE,
 2));
+
+        verifyPublishedMessage(cf, destination, testInput.get(1).toString());
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile failedFlowFile = flowFiles.get(0);
+        assertEquals("2", 
failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record 
is expected to be published successfully.");
+    }
+
+    @Test
+    public void 
testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws 
InitializationException {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        String destination = "testPublishRecords";
+        testRunner = initializeTestRunner(cf, destination);
+        testRunner.setProperty(PublishJMS.RECORD_READER, 
createJsonRecordSetReaderService(testRunner));
+        testRunner.setProperty(PublishJMS.RECORD_WRITER, 
createJsonRecordSetWriterService(testRunner));
+        testRunner.assertValid();
+
+        final String publishFailedIndexAttributeName = 
testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+        final ArrayNode testInput = createTestJsonInput();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(publishFailedIndexAttributeName, "1");
+        testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+        testRunner.run(1, false); // Run once but don't shut down because we 
want the Connection Factory left intact so that we can use it.
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER,
 3));
+
+        verifyPublishedMessage(cf, destination, testInput.get(1).toString());
+        verifyPublishedMessage(cf, destination, testInput.get(2).toString());
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile successfulFlowFile = flowFiles.get(0);
+        
assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+                publishFailedIndexAttributeName + " is expected to be removed 
after all remaining records have been published successfully.");
+    }
+
+    private TestRunner initializeTestRunner(ConnectionFactory 
connectionFactory, String destinationName) throws InitializationException {
+        PublishJMS processor = new PublishJMS();
+        return initializeTestRunner(processor, connectionFactory, 
destinationName);
+    }
+
+    private TestRunner initializeTestRunner(PublishJMS processor, 
ConnectionFactory connectionFactory, String destinationName) throws 
InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(processor);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(connectionFactory);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+
+        return runner;
+    }
+
+    private void verifyPublishedMessage(ConnectionFactory connectionFactory, 
String destinationName, String content) {
+        JmsTemplate jmst = new JmsTemplate(connectionFactory);
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+        assertEquals(content, new String(messageBytes));
+    }
+
+    private ProvenanceEventRecord assertProvenanceEvent() {
+        final List<ProvenanceEventRecord> provenanceEvents = 
testRunner.getProvenanceEvents();
+        assertNotNull(provenanceEvents);
+        assertEquals(1, provenanceEvents.size());
+
+        final ProvenanceEventRecord event = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, event.getEventType());
+
+        return event;
+    }
+
+    private void assertProvenanceEvent(String expectedDetails) {
+        final ProvenanceEventRecord event = assertProvenanceEvent();
+        assertEquals(expectedDetails, event.getDetails());
+    }
+
+    private static ArrayNode createTestJsonInput() {
+        final ObjectMapper mapper = new ObjectMapper();
+
+        return mapper.createArrayNode().addAll(asList(
+                mapper.createObjectNode()
+                        .put("recordId", 1)
+                        .put("firstAttribute", "foo")
+                        .put("secondAttribute", false),
+                mapper.createObjectNode()
+                        .put("recordId", 2)
+                        .put("firstAttribute", "bar")
+                        .put("secondAttribute", true),
+                mapper.createObjectNode()
+                        .put("recordId", 3)
+                        .put("firstAttribute", "foobar")
+                        .put("secondAttribute", false)
+        ));
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/JMSTestUtil.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/JMSTestUtil.java
new file mode 100644
index 0000000000..d6c95cbae4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/JMSTestUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.TestRunner;
+
+public class JMSTestUtil {
+
+    public static String createJsonRecordSetReaderService(TestRunner 
testRunner) throws InitializationException {
+        final String id = "record-reader";
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        testRunner.addControllerService(id, jsonReader);
+        testRunner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        testRunner.enableControllerService(jsonReader);
+        return id;
+    }
+
+    public static String createJsonRecordSetWriterService(TestRunner 
testRunner) throws InitializationException {
+        final String id = "record-writer";
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService(id, jsonWriter);
+        testRunner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        testRunner.enableControllerService(jsonWriter);
+        return id;
+    }
+}


Reply via email to