tpalfy commented on code in PR #6987:
URL: https://github.com/apache/nifi/pull/6987#discussion_r1152948717


##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
+
+public class StateTrackingFlowFileReader implements FlowFileReader {
+
+    public static final String ATTR_READ_FAILED_INDEX_SUFFIX = 
".read.failed.index";
+
+    private final String identifier;
+    private final RecordSupplier recordSupplier;
+    private final ComponentLog logger;
+
+    public StateTrackingFlowFileReader(String identifier, RecordSupplier 
recordSupplier, ComponentLog logger) {
+        this.identifier = identifier;
+        this.recordSupplier = recordSupplier;
+        this.logger = logger;
+    }
+
+    @Override
+    public void read(ProcessSession session, FlowFile flowFile, MessageHandler 
messageHandler, FlowFileReaderCallback flowFileReaderCallback) {
+        final StopWatch stopWatch = new StopWatch(true);
+        final AtomicInteger processedRecords = new AtomicInteger();
+
+        final String publishFailedIndexAttributeName = identifier + 
ATTR_READ_FAILED_INDEX_SUFFIX;
+
+        try {
+            final Long previousProcessFailedAt = 
ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+            session.read(flowFile, in -> recordSupplier.process(flowFile, in, 
processedRecords, previousProcessFailedAt, logger, messageHandler));
+
+            FlowFile successFlowFile = flowFile;
+
+            final boolean isRecover = previousProcessFailedAt != null;
+            if (isRecover) {
+                successFlowFile = session.removeAttribute(flowFile, 
publishFailedIndexAttributeName);
+            }
+
+            flowFileReaderCallback.onSuccess(successFlowFile, 
processedRecords.get(), isRecover, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        } catch (Exception e) {
+            logger.error("An error happened while processing records. Routing 
to failure.", e);
+
+            final FlowFile failedFlowFile = session.putAttribute(flowFile, 
publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+            flowFileReaderCallback.onFailure(failedFlowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e);
+        }
+    }
+
+}

Review Comment:
   ```suggestion
   public class ProgressTrackingFlowFileContentProcessor implements 
FlowFileContentProcessor {
   
       public static final String PROCESS_FAILED_INDEX_ATTRIBUTE_SUFFIX = 
".process.failed.index";
   
       private final String processFailedIndexAttributeName;
       private final RecordProcessor recordProcessor;
       private final ComponentLog logger;
   
       public ProgressTrackingFlowFileContentProcessor(String 
processFailedIndexAttributePrefix, RecordProcessor recordProcessor, 
ComponentLog logger) {
           this.processFailedIndexAttributeName = 
processFailedIndexAttributePrefix + PROCESS_FAILED_INDEX_ATTRIBUTE_SUFFIX;
           this.recordProcessor = recordProcessor;
           this.logger = logger;
       }
   
       @Override
       public void processContent(ProcessSession session, FlowFile flowFile, 
FlowFileContentHandler flowFileContentHandler, FlowFileContentProcessorCallback 
flowFileContentProcessorCallback) {
           final StopWatch stopWatch = new StopWatch(true);
           
           final AtomicInteger nrOfProcessedRecordsHolder = new AtomicInteger();
           
           try {
               final Long previousProcessFailedAt = 
ofNullable(flowFile.getAttribute(processFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
   
               session.read(flowFile, in -> {
                   int nrOfProcessedRecords = 
recordProcessor.processRecords(flowFile, in, previousProcessFailedAt, logger, 
flowFileContentHandler);
                   nrOfProcessedRecordsHolder.set(nrOfProcessedRecords);
               });
   
               FlowFile successFlowFile = flowFile;
   
               final boolean isReattempt = previousProcessFailedAt != null;
               if (isReattempt) {
                   successFlowFile = session.removeAttribute(flowFile, 
processFailedIndexAttributeName);
               }
   
               flowFileContentProcessorCallback.onSuccess(successFlowFile, 
nrOfProcessedRecordsHolder.get(), isReattempt, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
           } catch (Exception e) {
               logger.error("An error happened while processing records. 
Routing to failure.", e);
   
               final FlowFile failedFlowFile = session.putAttribute(flowFile, 
processFailedIndexAttributeName, 
String.valueOf(nrOfProcessedRecordsHolder.get()));
   
               flowFileContentProcessorCallback.onFailure(failedFlowFile, 
nrOfProcessedRecordsHolder.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), 
e);
           }
       }
   
   }
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface FlowFileReader {
+    void read(ProcessSession session, FlowFile flowFile, MessageHandler 
messageHandler, FlowFileReaderCallback flowFileReaderCallback);
+}

Review Comment:
   ```suggestion
   public interface FlowFileContentProcessor {
       void processContent(ProcessSession session, FlowFile flowFile, 
FlowFileContentHandler flowFileContentHandler, FlowFileContentProcessorCallback 
flowFileContentProcessorCallback);
   }
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import java.util.Map;
+
+public interface AttributeSource<T> {
+    Map<String, String> getAttributes(T message);

Review Comment:
   ```suggestion
       Map<String, String> getAttributes(T contentObject);
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session 
session, final Strin
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
+            final JMSResponse response = receiveMessage(session, 
messageConsumer, charset, errorQueueName);
+            if (response != null) {
+                // Provide the JMSResponse to the processor to handle. It is 
the responsibility of the
+                // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
+                // the responsibility of the processor to handle closing the 
Message Consumer.
+                // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
+                // the JMSResponse.
+                singleMessageConsumer.accept(response);
+            }
+        });
+    }
 
-                    final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
-                    final Map<String, String> messageProperties = 
extractMessageProperties(message);
-                    final JMSResponse response = new JMSResponse(message, 
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, 
messageProperties, msgConsumer);
+    /**
+     * Receives a list of messages from the broker. It is the 
consumerCallback's responsibility to acknowledge the received message.
+     */
+    public void consumeMessageSet(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                        final String charset, final 
Consumer<List<JMSResponse>> messageSetConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {

Review Comment:
   ```suggestion
           processMessage(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session 
session, final Strin
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
+            final JMSResponse response = receiveMessage(session, 
messageConsumer, charset, errorQueueName);
+            if (response != null) {
+                // Provide the JMSResponse to the processor to handle. It is 
the responsibility of the
+                // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
+                // the responsibility of the processor to handle closing the 
Message Consumer.
+                // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
+                // the JMSResponse.
+                singleMessageConsumer.accept(response);
+            }
+        });
+    }
 
-                    final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
-                    final Map<String, String> messageProperties = 
extractMessageProperties(message);
-                    final JMSResponse response = new JMSResponse(message, 
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, 
messageProperties, msgConsumer);
+    /**
+     * Receives a list of messages from the broker. It is the 
consumerCallback's responsibility to acknowledge the received message.
+     */
+    public void consumeMessageSet(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                        final String charset, final 
Consumer<List<JMSResponse>> messageSetConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {
+            @Override
+            public void consume(Session session, MessageConsumer 
messageConsumer) throws JMSException {
+                final List<JMSResponse> jmsResponses = new ArrayList<>();
+                int batchCounter = 0;
+
+                JMSResponse response;
+                while ((response = receiveMessage(session, messageConsumer, 
charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) 
{
+                    response.setBatchOrder(batchCounter);
+                    jmsResponses.add(response);
+                    batchCounter++;
+                }
 
+                if (!jmsResponses.isEmpty()) {
                     // Provide the JMSResponse to the processor to handle. It 
is the responsibility of the
                     // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
                     // the responsibility of the processor to handle closing 
the Message Consumer.
                     // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
                     // the JMSResponse.
-                    consumerCallback.accept(response);
+                    messageSetConsumer.accept(jmsResponses);
+                }
+            }
+        });
+    }
+
+    private void doWithJmsTemplate(String destinationName, boolean durable, 
boolean shared, String subscriptionName, String messageSelector, 
MessageReceiver messageReceiver) {

Review Comment:
   ```suggestion
       private void processMessage(String destinationName, boolean durable, 
boolean shared, String subscriptionName, String messageSelector, 
MessageProcessor messageProcessor) {
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -286,13 +333,18 @@ public void acknowledge() throws JMSException {
         public void reject() {
             JmsUtils.closeMessageConsumer(messageConsumer);
         }
+
+        public Integer getBatchOrder() {
+            return batchOrder;
+        }
+
+        public void setBatchOrder(Integer batchOrder) {
+            this.batchOrder = batchOrder;
+        }
     }
 
-    /**
-     * Callback to be invoked while executing inJMS call (the call within the
-     * live JMS session)
-     */
-    static interface ConsumerCallback {
-        void accept(JMSResponse response);
+    interface MessageReceiver {
+        void consume(Session session, MessageConsumer messageConsumer) throws 
JMSException;

Review Comment:
   ```suggestion
       interface MessageProcessor {
           void processMessage(Session session, MessageConsumer 
messageConsumer) throws JMSException;
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface FlowFileReaderCallback {

Review Comment:
   ```suggestion
   public interface FlowFileContentProcessorCallback {
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.jms.processors.ioconcept.writer.AttributeSource;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
+import org.apache.nifi.jms.processors.ioconcept.writer.Marshaller;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_APPENDER;
+import static 
org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_VALUE;
+import static 
org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_WRAPPER;
+
+public class RecordWriter<T> implements FlowFileWriter<T> {
+
+    private final static String RECORD_COUNT_KEY = "record.count";
+
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final Marshaller<T> marshaller;
+    private final AttributeSource<T> attributeSource;
+    private final OutputStrategy outputStrategy;
+    private final ComponentLog logger;
+
+    public RecordWriter(RecordReaderFactory readerFactory,
+                        RecordSetWriterFactory writerFactory,
+                        Marshaller<T> marshaller,
+                        AttributeSource<T> attributeSource,
+                        OutputStrategy outputStrategy,
+                        ComponentLog logger) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.marshaller = marshaller;
+        this.attributeSource = attributeSource;
+        this.outputStrategy = outputStrategy;
+        this.logger = logger;
+    }
+
+    @Override
+    public void write(ProcessSession session, List<T> messages, 
FlowFileWriterCallback<T> flowFileWriterCallback) {
+        FlowFile flowFile = session.create();
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<T> processedMessages = new ArrayList<>();
+        final List<T> failedMessages = new ArrayList<>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            for (T message : messages) {
+                if (message == null) {
+                    break;
+                }
+
+                final byte[] recordBytes = marshaller.marshall(message);
+                try (final InputStream in = new 
ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    // parse incoming message which may contain multiple 
messages
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, 
in, recordBytes.length, logger);
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to parse message due to comms 
failure. Will roll back session and try again momentarily.");
+                        flowFileWriterCallback.onFailure(flowFile, 
processedMessages, failedMessages, ioe);
+                        closeWriter(writer);
+                        return;
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse message, sending to the 
parse failure relationship", e);
+                        failedMessages.add(message);
+                        flowFileWriterCallback.onParseFailure(flowFile, 
message, e);
+                        continue;
+                    }
+
+                    // write messages as records into FlowFile
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if (attributeSource != null && 
!outputStrategy.equals(USE_VALUE)) {
+                                final Map<String, String> additionalAttributes 
= attributeSource.getAttributes(message);
+                                if (outputStrategy.equals(USE_APPENDER)) {
+                                    record = RecordUtils.append(record, 
additionalAttributes, "_");
+                                } else if (outputStrategy.equals(USE_WRAPPER)){
+                                    record = RecordUtils.wrap(record, "value", 
additionalAttributes, "_");
+                                }
+                            }
+
+                            if (!isWriterInitialized) {
+                                final RecordSchema recordSchema = 
record.getSchema();
+                                final OutputStream rawOut = 
session.write(flowFile);
+
+                                RecordSchema writeSchema;
+                                try {
+                                    writeSchema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for 
FlowFile, sending to the parse failure relationship", e);
+                                    failedMessages.add(message);
+                                    
flowFileWriterCallback.onParseFailure(flowFile, message, e);
+                                    continue;
+                                }
+
+                                writer = writerFactory.createWriter(logger, 
writeSchema, rawOut, flowFile);
+                                writer.beginRecordSet();
+                            }
+
+                            try {
+                                writer.write(record);
+                                isWriterInitialized = true;
+                                processedMessages.add(message);
+                            } catch (final RuntimeException re) {
+                                logger.error("Failed to write message using 
the configured Record Writer, sending to the parse failure relationship", re);
+                                failedMessages.add(message);
+                                
flowFileWriterCallback.onParseFailure(flowFile, message, re);
+                            }
+                        }
+                    } catch (final IOException | MalformedRecordException | 
SchemaValidationException e) {
+                        logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                        failedMessages.add(message);
+                        flowFileWriterCallback.onParseFailure(flowFile, 
message, e);
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to write message, sending to the 
parse failure relationship", e);
+                    failedMessages.add(message);
+                    flowFileWriterCallback.onParseFailure(flowFile, message, 
e);
+                }
+            }
+
+            if (writer != null) {
+                final WriteResult writeResult = writer.finishRecordSet();
+                attributes.put(RECORD_COUNT_KEY, 
String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+                recordCount.set(writeResult.getRecordCount());
+            }
+
+        } catch (final Exception e) {
+            flowFileWriterCallback.onFailure(flowFile, processedMessages, 
failedMessages, e);
+        } finally {
+            closeWriter(writer);
+        }
+
+        if (recordCount.get() == 0) {
+            session.remove(flowFile);
+            return;
+        }
+
+        session.putAllAttributes(flowFile, attributes);
+
+        final int count = recordCount.get();
+        logger.info("Successfully processed {} records for {}", count, 
flowFile);
+
+        flowFileWriterCallback.onSuccess(flowFile, processedMessages, 
failedMessages);
+    }

Review Comment:
   ```suggestion
       public void writeToFlowFile(ProcessSession session, List<T> 
contentObjects, FlowFileWriterCallback<T> flowFileWriterCallback) {
           FlowFile flowFile = session.create();
   
           final Map<String, String> attributes = new HashMap<>();
           final AtomicInteger recordCount = new AtomicInteger();
   
           final List<T> processedMessages = new ArrayList<>();
           final List<T> failedMessages = new ArrayList<>();
   
           RecordSetWriter recordSetWriter = null;
           boolean isWriterInitialized = false;
   
           try {
               for (T contentOjbect : contentObjects) {
                   if (contentOjbect == null) {
                       break;
                   }
   
                   final byte[] contentBytes = 
marshaller.marshall(contentOjbect);
                   try (final InputStream contentInputStream = new 
ByteArrayInputStream(contentBytes)) {
                       final RecordReader recordReader;
   
                       // parse incoming message which may contain multiple 
messages
                       try {
                           recordReader = 
readerFactory.createRecordReader(attributes, contentInputStream, 
contentBytes.length, logger);
                       } catch (final IOException ioe) {
                           logger.error("Failed to parse message due to comms 
failure. Will roll back session and try again momentarily.");
                           flowFileWriterCallback.onFailure(flowFile, 
processedMessages, failedMessages, ioe);
                           closeWriter(recordSetWriter);
                           return;
                       } catch (final Exception e) {
                           logger.error("Failed to parse message, sending to 
the parse failure relationship", e);
                           failedMessages.add(contentOjbect);
                           flowFileWriterCallback.onParseFailure(flowFile, 
contentOjbect, e);
                           continue;
                       }
   
                       // write messages as records into FlowFile
                       try {
                           Record record;
                           while ((record = recordReader.nextRecord()) != null) 
{
   
                               if (attributeSource != null && 
!outputStrategy.equals(USE_VALUE)) {
                                   final Map<String, String> 
additionalAttributes = attributeSource.getAttributes(contentOjbect);
                                   if (outputStrategy.equals(USE_APPENDER)) {
                                       record = RecordUtils.append(record, 
additionalAttributes, "_");
                                   } else if 
(outputStrategy.equals(USE_WRAPPER)){
                                       record = RecordUtils.wrap(record, 
"value", additionalAttributes, "_");
                                   }
                               }
   
                               if (!isWriterInitialized) {
                                   final RecordSchema recordSchema = 
record.getSchema();
                                   final OutputStream rawOut = 
session.write(flowFile);
   
                                   RecordSchema writeSchema;
                                   try {
                                       writeSchema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
                                   } catch (final Exception e) {
                                       logger.error("Failed to obtain Schema 
for FlowFile, sending to the parse failure relationship", e);
                                       failedMessages.add(contentOjbect);
                                       
flowFileWriterCallback.onParseFailure(flowFile, contentOjbect, e);
                                       continue;
                                   }
   
                                   recordSetWriter = 
writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
                                   recordSetWriter.beginRecordSet();
                               }
   
                               try {
                                   recordSetWriter.write(record);
                                   isWriterInitialized = true;
                                   processedMessages.add(contentOjbect);
                               } catch (final RuntimeException re) {
                                   logger.error("Failed to write message using 
the configured Record Writer, sending to the parse failure relationship", re);
                                   failedMessages.add(contentOjbect);
                                   
flowFileWriterCallback.onParseFailure(flowFile, contentOjbect, re);
                               }
                           }
                       } catch (final IOException | MalformedRecordException | 
SchemaValidationException e) {
                           logger.error("Failed to write message, sending to 
the parse failure relationship", e);
                           failedMessages.add(contentOjbect);
                           flowFileWriterCallback.onParseFailure(flowFile, 
contentOjbect, e);
                       }
                   } catch (Exception e) {
                       logger.error("Failed to write message, sending to the 
parse failure relationship", e);
                       failedMessages.add(contentOjbect);
                       flowFileWriterCallback.onParseFailure(flowFile, 
contentOjbect, e);
                   }
               }
   
               if (recordSetWriter != null) {
                   final WriteResult writeResult = 
recordSetWriter.finishRecordSet();
                   attributes.put(RECORD_COUNT_KEY, 
String.valueOf(writeResult.getRecordCount()));
                   attributes.put(CoreAttributes.MIME_TYPE.key(), 
recordSetWriter.getMimeType());
                   attributes.putAll(writeResult.getAttributes());
                   recordCount.set(writeResult.getRecordCount());
               }
   
           } catch (final Exception e) {
               flowFileWriterCallback.onFailure(flowFile, processedMessages, 
failedMessages, e);
           } finally {
               closeWriter(recordSetWriter);
           }
   
           if (recordCount.get() == 0) {
               session.remove(flowFile);
               return;
           }
   
           session.putAllAttributes(flowFile, attributes);
   
           final int count = recordCount.get();
           logger.info("Successfully processed {} records for {}", count, 
flowFile);
   
           flowFileWriterCallback.onSuccess(flowFile, processedMessages, 
failedMessages);
       }
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session 
session, final Strin
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {

Review Comment:
   ```suggestion
           processMessage(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.List;
+
+public interface FlowFileWriterCallback<T> {
+    void onSuccess(FlowFile flowFile, List<T> processedMessages, List<T> 
failedMessages);
+    void onParseFailure(FlowFile flowFile, T message, Exception e);
+    void onFailure(FlowFile flowFile, List<T> processedMessages, List<T> 
failedMessages, Exception e);

Review Comment:
   ```suggestion
       void onSuccess(FlowFile flowFile, List<T> processedContentObjects, 
List<T> failedContentObjects);
       void onParseFailure(FlowFile flowFile, T contentObject, Exception e);
       void onFailure(FlowFile flowFile, List<T> processedContentObjects, 
List<T> failedContentObjects, Exception e);FlowFile flowFile, List<T> 
processedMessages, List<T> failedMessages, Exception e);
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session 
session, final Strin
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
+            final JMSResponse response = receiveMessage(session, 
messageConsumer, charset, errorQueueName);
+            if (response != null) {
+                // Provide the JMSResponse to the processor to handle. It is 
the responsibility of the
+                // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
+                // the responsibility of the processor to handle closing the 
Message Consumer.
+                // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
+                // the JMSResponse.
+                singleMessageConsumer.accept(response);
+            }
+        });
+    }
 
-                    final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
-                    final Map<String, String> messageProperties = 
extractMessageProperties(message);
-                    final JMSResponse response = new JMSResponse(message, 
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, 
messageProperties, msgConsumer);
+    /**
+     * Receives a list of messages from the broker. It is the 
consumerCallback's responsibility to acknowledge the received message.
+     */
+    public void consumeMessageSet(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                        final String charset, final 
Consumer<List<JMSResponse>> messageSetConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {
+            @Override
+            public void consume(Session session, MessageConsumer 
messageConsumer) throws JMSException {
+                final List<JMSResponse> jmsResponses = new ArrayList<>();
+                int batchCounter = 0;
+
+                JMSResponse response;
+                while ((response = receiveMessage(session, messageConsumer, 
charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) 
{
+                    response.setBatchOrder(batchCounter);
+                    jmsResponses.add(response);
+                    batchCounter++;
+                }
 
+                if (!jmsResponses.isEmpty()) {
                     // Provide the JMSResponse to the processor to handle. It 
is the responsibility of the
                     // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
                     // the responsibility of the processor to handle closing 
the Message Consumer.
                     // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
                     // the JMSResponse.
-                    consumerCallback.accept(response);
+                    messageSetConsumer.accept(jmsResponses);
+                }
+            }
+        });
+    }
+
+    private void doWithJmsTemplate(String destinationName, boolean durable, 
boolean shared, String subscriptionName, String messageSelector, 
MessageReceiver messageReceiver) {
+        this.jmsTemplate.execute(new SessionCallback<Void>() {
+            @Override
+            public Void doInJms(final Session session) throws JMSException {
+
+                final MessageConsumer messageConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
+                try {
+                    messageReceiver.consume(session, messageConsumer);

Review Comment:
   ```suggestion
                       messageProcessor.consume(session, messageConsumer);
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+public interface FlowFileWriter<T> {
+    void write(ProcessSession session, List<T> messages, 
FlowFileWriterCallback<T> flowFileWriterCallback);

Review Comment:
   ```suggestion
       void writeToFlowFile(ProcessSession session, List<T> contentObjects, 
FlowFileWriterCallback<T> flowFileWriterCallback);
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java:
##########
@@ -199,36 +221,63 @@ protected void rendezvousWithJms(ProcessContext context, 
ProcessSession processS
                     }
                 }
 
-                switch (context.getProperty(MESSAGE_BODY).getValue()) {
-                    case TEXT_MESSAGE:
-                        try {
-                            publisher.publish(destinationName, 
this.extractTextMessageBody(flowFile, processSession, charset), 
attributesToSend);
-                        } catch(Exception e) {
-                            publisher.setValid(false);
-                            throw e;
-                        }
-                        break;
-                    case BYTES_MESSAGE:
-                    default:
-                        try {
-                            publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), attributesToSend);
-                        } catch(Exception e) {
-                            publisher.setValid(false);
-                            throw e;
-                        }
-                        break;
+                if (context.getProperty(RECORD_READER).isSet()) {
+                    final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+                    final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+                    final FlowFileReader flowFileReader = new 
StateTrackingFlowFileReader(
+                            getIdentifier(),
+                            new RecordSupplier(readerFactory, writerFactory),
+                            getLogger()
+                    );
+
+                    flowFileReader.read(
+                            processSession,
+                            flowFile,
+                            content -> publisher.publish(destinationName, 
content, attributesToSend),
+                            new FlowFileReaderCallback() {
+                                @Override
+                                public void onSuccess(FlowFile flowFile, int 
processedRecords, boolean isRecover, long transmissionMillis) {
+                                    final String eventTemplate = isRecover ? 
PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER : 
PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+                                    
processSession.getProvenanceReporter().send(
+                                            flowFile,
+                                            destinationName,
+                                            String.format(eventTemplate, 
processedRecords),
+                                            transmissionMillis);
+
+                                    processSession.transfer(flowFile, 
REL_SUCCESS);
+                                }
+
+                                @Override
+                                public void onFailure(FlowFile flowFile, int 
processedRecords, long transmissionMillis, Exception e) {
+                                    
processSession.getProvenanceReporter().send(
+                                            flowFile,
+                                            destinationName,
+                                            
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords),
+                                            transmissionMillis);
+
+                                    handleException(context, processSession, 
publisher, flowFile, e);
+                                }
+                            }
+                    );

Review Comment:
   ```suggestion
                       final FlowFileContentProcessor flowFileContentProcessor 
= new ProgressTrackingFlowFileContentProcessor(
                               getIdentifier(),
                               new RecordProcessor(readerFactory, 
writerFactory),
                               getLogger()
                       );
   
                       final FlowFileContentProcessorCallback 
flowFileContentProcessorCallback = new FlowFileContentProcessorCallback() {
                           @Override
                           public void onSuccess(FlowFile flowFile, int 
processedRecords, boolean isRecover, long transmissionMillis) {
                               final String eventTemplate = isRecover ? 
PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER : 
PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
                               processSession.getProvenanceReporter().send(
                                       flowFile,
                                       destinationName,
                                       String.format(eventTemplate, 
processedRecords),
                                       transmissionMillis);
   
                               processSession.transfer(flowFile, REL_SUCCESS);
                           }
   
                           @Override
                           public void onFailure(FlowFile flowFile, int 
processedRecords, long transmissionMillis, Exception e) {
                               processSession.getProvenanceReporter().send(
                                       flowFile,
                                       destinationName,
                                       
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords),
                                       transmissionMillis);
   
                               handleException(context, processSession, 
publisher, flowFile, e);
                           }
                       };
                       
                       flowFileContentProcessor.processContent(
                               processSession,
                               flowFile,
                               content -> publisher.publish(destinationName, 
content, attributesToSend),
                               flowFileContentProcessorCallback
                       );
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java:
##########
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+public interface Marshaller<T> {
+    byte[] marshall(T message);

Review Comment:
   ```suggestion
       byte[] marshall(T contentObject);
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.ioconcept.reader.MessageHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RecordSupplier {
+
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+
+    public RecordSupplier(RecordReaderFactory readerFactory, 
RecordSetWriterFactory writerFactory) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+    }
+
+    public void process(FlowFile flowfile, InputStream in, AtomicInteger 
processedRecords, Long processFromIndex, ComponentLog logger, MessageHandler 
messageHandler) throws IOException {
+
+        try (final RecordReader reader = 
readerFactory.createRecordReader(flowfile, in, logger)) {
+            final RecordSet recordSet = reader.createRecordSet();
+
+            final RecordSchema schema = 
writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
+
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+            Record record;
+            while ((record = recordSet.next()) != null) {
+                if (processFromIndex != null && processedRecords.get() < 
processFromIndex) {
+                    processedRecords.getAndIncrement();
+                    continue;
+                }
+
+                baos.reset();
+
+                try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos, flowfile)) {
+                    writer.write(record);
+                    writer.flush();
+                }
+
+                final byte[] messageContent = baos.toByteArray();
+
+                messageHandler.handle(messageContent);
+
+                processedRecords.getAndIncrement();
+            }
+        } catch (SchemaNotFoundException | MalformedRecordException e) {
+            throw new ProcessException("An error happened during creating 
components for serialization.", e);
+        }
+    }
+
+}

Review Comment:
   ```suggestion
   public class RecordProcessor {
   
       private final RecordReaderFactory readerFactory;
       private final RecordSetWriterFactory writerFactory;
   
       public RecordProcessor(RecordReaderFactory readerFactory, 
RecordSetWriterFactory writerFactory) {
           this.readerFactory = readerFactory;
           this.writerFactory = writerFactory;
       }
   
       public int processRecords(FlowFile flowfile, InputStream in, Long 
processFromIndex, ComponentLog logger, FlowFileContentHandler 
flowFileContentHandler) throws IOException {
           int nrOfProcessedRecords = 0;
           
           try (final RecordReader reader = 
readerFactory.createRecordReader(flowfile, in, logger)) {
               final RecordSet recordSet = reader.createRecordSet();
   
               final RecordSchema writerSchema = 
writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
   
               final ByteArrayOutputStream baos = new 
ByteArrayOutputStream(1024);
   
               Record record;
               while ((record = recordSet.next()) != null) {
                   if (processFromIndex != null && nrOfProcessedRecords < 
processFromIndex) {
                       nrOfProcessedRecords++;
                       continue;
                   }
   
                   baos.reset();
   
                   try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, writerSchema, baos, flowfile)) {
                       writer.write(record);
                       writer.flush();
                   }
   
                   final byte[] rawContent = baos.toByteArray();
   
                   flowFileContentHandler.handle(rawContent);
   
                   nrOfProcessedRecords++;
               }
           } catch (SchemaNotFoundException | MalformedRecordException e) {
               throw new ProcessException("An error happened during creating 
components for serialization.", e);
           }
           
           return nrOfProcessedRecords;
       }
   
   }
   ```



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java:
##########
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+public interface MessageHandler {

Review Comment:
   ```suggestion
   public interface FlowFileContentHandler {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to