simonbence commented on code in PR #6987:
URL: https://github.com/apache/nifi/pull/6987#discussion_r1121505091


##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session 
session, final Strin
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
+            final JMSResponse response = receiveMessage(session, 
messageConsumer, charset, errorQueueName);
+            if (response != null) {
+                // Provide the JMSResponse to the processor to handle. It is 
the responsibility of the
+                // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
+                // the responsibility of the processor to handle closing the 
Message Consumer.
+                // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
+                // the JMSResponse.
+                singleMessageConsumer.accept(response);
+            }
+        });
+    }
 
-                    final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
-                    final Map<String, String> messageProperties = 
extractMessageProperties(message);
-                    final JMSResponse response = new JMSResponse(message, 
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, 
messageProperties, msgConsumer);
+    /**
+     * Receives a list of messages from the broker. It is the 
consumerCallback's responsibility to acknowledge the received message.
+     */
+    public void consumeMessageSet(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                        final String charset, final 
Consumer<List<JMSResponse>> messageSetConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {
+            @Override
+            public void consume(Session session, MessageConsumer 
messageConsumer) throws JMSException {
+                final List<JMSResponse> jmsResponses = new ArrayList<>();
+                int batchCounter = 0;
+
+                JMSResponse response;
+                while ((response = receiveMessage(session, messageConsumer, 
charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) 
{
+                    response.setBatchOrder(batchCounter);
+                    jmsResponses.add(response);
+                    batchCounter++;
+                }
 
+                if (!jmsResponses.isEmpty()) {
                     // Provide the JMSResponse to the processor to handle. It 
is the responsibility of the
                     // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
                     // the responsibility of the processor to handle closing 
the Message Consumer.
                     // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
                     // the JMSResponse.
-                    consumerCallback.accept(response);
+                    messageSetConsumer.accept(jmsResponses);
+                }
+            }
+        });
+    }
+
+    private void doWithJmsTemplate(String destinationName, boolean durable, 
boolean shared, String subscriptionName, String messageSelector, 
MessageReceiver messageReceiver) {

Review Comment:
   Minor: method name "doSomething" (doWithJmsTemplate and doInJms) is not 
really describing, I suggest to find more specific names



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session 
session, final Strin
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
+            final JMSResponse response = receiveMessage(session, 
messageConsumer, charset, errorQueueName);
+            if (response != null) {
+                // Provide the JMSResponse to the processor to handle. It is 
the responsibility of the
+                // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
+                // the responsibility of the processor to handle closing the 
Message Consumer.
+                // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
+                // the JMSResponse.
+                singleMessageConsumer.accept(response);
+            }
+        });
+    }
 
-                    final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
-                    final Map<String, String> messageProperties = 
extractMessageProperties(message);
-                    final JMSResponse response = new JMSResponse(message, 
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, 
messageProperties, msgConsumer);
+    /**
+     * Receives a list of messages from the broker. It is the 
consumerCallback's responsibility to acknowledge the received message.
+     */
+    public void consumeMessageSet(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                        final String charset, final 
Consumer<List<JMSResponse>> messageSetConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {
+            @Override
+            public void consume(Session session, MessageConsumer 
messageConsumer) throws JMSException {
+                final List<JMSResponse> jmsResponses = new ArrayList<>();
+                int batchCounter = 0;
+
+                JMSResponse response;
+                while ((response = receiveMessage(session, messageConsumer, 
charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) 
{
+                    response.setBatchOrder(batchCounter);
+                    jmsResponses.add(response);
+                    batchCounter++;
+                }
 
+                if (!jmsResponses.isEmpty()) {
                     // Provide the JMSResponse to the processor to handle. It 
is the responsibility of the
                     // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
                     // the responsibility of the processor to handle closing 
the Message Consumer.
                     // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
                     // the JMSResponse.
-                    consumerCallback.accept(response);
+                    messageSetConsumer.accept(jmsResponses);
+                }
+            }
+        });
+    }
+
+    private void doWithJmsTemplate(String destinationName, boolean durable, 
boolean shared, String subscriptionName, String messageSelector, 
MessageReceiver messageReceiver) {
+        this.jmsTemplate.execute(new SessionCallback<Void>() {
+            @Override
+            public Void doInJms(final Session session) throws JMSException {
+
+                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);

Review Comment:
   Minor: abbreviations are to be avoided



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java:
##########
@@ -305,6 +326,97 @@ public void accept(final JMSResponse response) {
         }
     }
 
+    private void processSingleMessage(ProcessSession processSession, 
JMSConsumer consumer, String destinationName, String errorQueueName,
+                                      boolean durable, boolean shared, String 
subscriptionName, String messageSelector, String charset) {
+
+        consumer.consumeSingleMessage(destinationName, errorQueueName, 
durable, shared, subscriptionName, messageSelector, charset, response -> {
+            if (response == null) {
+                return;
+            }
+
+            try {
+                final FlowFile flowFile = 
createFlowFileFromMessage(processSession, destinationName, response);
+
+                processSession.getProvenanceReporter().receive(flowFile, 
destinationName);
+                processSession.transfer(flowFile, REL_SUCCESS);
+                processSession.commitAsync(
+                        () -> withLog(() -> acknowledge(response)),
+                        __ -> withLog(() -> response.reject()));
+            } catch (final Throwable t) {
+                response.reject();
+                throw t;
+            }
+        });
+    }
+
+    private FlowFile createFlowFileFromMessage(ProcessSession processSession, 
String destinationName, JMSResponse response) {
+        FlowFile flowFile = processSession.create();
+        flowFile = processSession.write(flowFile, out -> 
out.write(response.getMessageBody()));
+
+        final Map<String, String> jmsHeaders = response.getMessageHeaders();
+        final Map<String, String> jmsProperties = 
response.getMessageProperties();
+
+        flowFile = updateFlowFileAttributesWithJMSAttributes(jmsHeaders, 
flowFile, processSession);
+        flowFile = updateFlowFileAttributesWithJMSAttributes(jmsProperties, 
flowFile, processSession);
+        flowFile = processSession.putAttribute(flowFile, 
JMS_SOURCE_DESTINATION_NAME, destinationName);
+
+        flowFile = processSession.putAttribute(flowFile, JMS_MESSAGETYPE, 
response.getMessageType());
+
+        return flowFile;
+    }
+
+    private void processMessageSet(ProcessContext context, ProcessSession 
session, JMSConsumer consumer, String destinationName,String errorQueueName,
+                                   boolean durable, boolean shared, String 
subscriptionName, String messageSelector, String charset) {
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final OutputStrategy outputStrategy = 
OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
+
+        consumer.consumeMessageSet(destinationName, errorQueueName, durable, 
shared, subscriptionName, messageSelector, charset, jmsResponses -> {
+            final MessageConsumer<JMSResponse> messageConsumer = new 
MessageConsumer.Builder<JMSResponse>()
+                    .withFlowFileWriter((new 
RecordWriter.Builder<JMSResponse>()
+                            .withReaderFactory(readerFactory)
+                            .withWriterFactory(writerFactory)
+                            .withSerializer(message -> 
message.getMessageBody() == null ? new byte[0] : message.getMessageBody())
+                            .withOutputStrategy(outputStrategy)
+                            .withLogger(getLogger()).build()))
+                    .withAttributeSupplier(message -> 
mergeJmsAttributes(message.getMessageHeaders(), message.getMessageProperties()))
+                    
.withEventReporter(StandardRecordReceivedEventReporter.of(destinationName))
+                    .build();
+
+            messageConsumer.consumeMessages(
+                    session,
+                    jmsResponses,
+                    new MessageConsumerCallback<>() {
+                        @Override
+                        public void onSuccess(FlowFile flowFile, 
List<JMSResponse> processedMessages, List<JMSResponse> failedMessages) {
+                            session.transfer(flowFile, REL_SUCCESS);

Review Comment:
   I think it would make sense to add some logging to the callback methods



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -38,19 +38,24 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * Generic consumer of messages from JMS compliant messaging system.
  */
 class JMSConsumer extends JMSWorker {
 
+    private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;

Review Comment:
   Minor: I would use this as a default, but not as a hardcoded value. I 
suggest to provide option to parametrize



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/AttributeSupplier.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer;
+
+import java.util.Map;
+
+public interface AttributeSupplier<T> {

Review Comment:
   Minor: I have no strong reasons againsts this naming, but it is quite 
similar to the Supplier functional interface. Have you considered 
AttributeSource as name?



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/MessageConsumer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+public class MessageConsumer<T> {

Review Comment:
   Most of the design is flexible and the cogs are hidden behind interfaces. 
This crucial class however looks to be out in the wild. I would suggest to move 
up MessageConsumer.Builder to a top level class; make it return an interface 
and hide this implementation (preferably: package private and final to avoid 
leaking)



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/MessageConsumer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+public class MessageConsumer<T> {
+
+    private final FlowFileWriter<T> flowFileWriter;
+    private final AttributeSupplier<T> attributeSupplier;
+    private final EventReporter eventReporter;
+
+    private MessageConsumer(FlowFileWriter<T> flowFileWriter, 
AttributeSupplier<T> attributeSupplier, EventReporter eventReporter) {
+        this.flowFileWriter = flowFileWriter;
+        this.attributeSupplier = attributeSupplier;
+        this.eventReporter = eventReporter;
+    }
+
+    public void consumeMessages(ProcessSession session, List<T> messages, 
MessageConsumerCallback<T> messageConsumerCallback) {
+        flowFileWriter.write(session, messages, attributeSupplier, new 
MessageConsumerCallback<T>() {
+            @Override
+            public void onSuccess(FlowFile flowFile, List<T> 
processedMessages, List<T> failedMessages) {
+                eventReporter.reportSuccessEvent(session, flowFile);
+                messageConsumerCallback.onSuccess(flowFile, processedMessages, 
failedMessages);
+            }
+
+            @Override
+            public void onParseFailure(FlowFile flowFile, T message, Exception 
e) {
+                eventReporter.reportParseFailureEvent(session, flowFile);

Review Comment:
   Should not the event reporter get a reference to the exception in error 
cases?



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/RecordUtils.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordUtils {
+
+    public static Record append(final Record originalRecord, final Map<String, 
String> decoratorValues, final String decoratorPrefix) {
+        final List<RecordField> originalFields = 
originalRecord.getSchema().getFields();
+
+        final List<RecordField> mergedFields = new ArrayList<>(originalFields);
+        decoratorValues.forEach((key, value) -> mergedFields.add(new 
RecordField(decoratorPrefix + key, RecordFieldType.STRING.getDataType())));

Review Comment:
   I think a logic to check if an "original field" with the same name is in 
place should be useful. The handling strategy might be something from argument 
or simply a logging even



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java:
##########
@@ -305,6 +326,97 @@ public void accept(final JMSResponse response) {
         }
     }
 
+    private void processSingleMessage(ProcessSession processSession, 
JMSConsumer consumer, String destinationName, String errorQueueName,
+                                      boolean durable, boolean shared, String 
subscriptionName, String messageSelector, String charset) {
+
+        consumer.consumeSingleMessage(destinationName, errorQueueName, 
durable, shared, subscriptionName, messageSelector, charset, response -> {
+            if (response == null) {
+                return;
+            }
+
+            try {
+                final FlowFile flowFile = 
createFlowFileFromMessage(processSession, destinationName, response);
+
+                processSession.getProvenanceReporter().receive(flowFile, 
destinationName);
+                processSession.transfer(flowFile, REL_SUCCESS);
+                processSession.commitAsync(
+                        () -> withLog(() -> acknowledge(response)),
+                        __ -> withLog(() -> response.reject()));
+            } catch (final Throwable t) {
+                response.reject();
+                throw t;
+            }
+        });
+    }
+
+    private FlowFile createFlowFileFromMessage(ProcessSession processSession, 
String destinationName, JMSResponse response) {
+        FlowFile flowFile = processSession.create();
+        flowFile = processSession.write(flowFile, out -> 
out.write(response.getMessageBody()));
+
+        final Map<String, String> jmsHeaders = response.getMessageHeaders();
+        final Map<String, String> jmsProperties = 
response.getMessageProperties();
+
+        flowFile = updateFlowFileAttributesWithJMSAttributes(jmsHeaders, 
flowFile, processSession);
+        flowFile = updateFlowFileAttributesWithJMSAttributes(jmsProperties, 
flowFile, processSession);
+        flowFile = processSession.putAttribute(flowFile, 
JMS_SOURCE_DESTINATION_NAME, destinationName);
+
+        flowFile = processSession.putAttribute(flowFile, JMS_MESSAGETYPE, 
response.getMessageType());
+
+        return flowFile;
+    }
+
+    private void processMessageSet(ProcessContext context, ProcessSession 
session, JMSConsumer consumer, String destinationName,String errorQueueName,
+                                   boolean durable, boolean shared, String 
subscriptionName, String messageSelector, String charset) {
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final OutputStrategy outputStrategy = 
OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
+
+        consumer.consumeMessageSet(destinationName, errorQueueName, durable, 
shared, subscriptionName, messageSelector, charset, jmsResponses -> {

Review Comment:
   It might make sense to move this declaration to the "onScheduled" hook



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java:
##########
@@ -36,7 +36,7 @@
 /**
  * Generic publisher of messages to JMS compliant messaging system.
  */
-final class JMSPublisher extends JMSWorker {
+class JMSPublisher extends JMSWorker {

Review Comment:
   Is there any particular reason for removing the final? I did not find any 
subclass for this. In general it is not considered a good approach to remove 
these kind of restrictios without the need of using it.



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/OutputStrategy.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported JMS Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {
+    USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the JMS Message 
to the FlowFile record."),
+
+    USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the JMS Attributes into 
the FlowFile record on a separate leaf. (See processor usage for more 
information.)"),

Review Comment:
   This looks to be something non-trivial in the impelmentation. Using "value" 
for actual message and "_" for metadata, hardcoded is not looking flexible from 
user's perspective (also it does not look too well documented)



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/RecordWriter.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.jms.processors.strategy.consumer.AttributeSupplier;
+import org.apache.nifi.jms.processors.strategy.consumer.FlowFileWriter;
+import 
org.apache.nifi.jms.processors.strategy.consumer.MessageConsumerCallback;
+import org.apache.nifi.jms.processors.strategy.consumer.Serializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_APPENDER;
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_VALUE;
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_WRAPPER;
+
+public class RecordWriter<T> implements FlowFileWriter<T> {
+
+    private final static String RECORD_COUNT_KEY = "record.count";
+
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final Serializer<T> serializer;
+    private final OutputStrategy outputStrategy;
+    private final ComponentLog logger;
+
+    public RecordWriter(RecordReaderFactory readerFactory,
+                        RecordSetWriterFactory writerFactory,
+                        Serializer<T> serializer,
+                        OutputStrategy outputStrategy,
+                        ComponentLog logger) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.serializer = serializer;
+        this.outputStrategy = outputStrategy;
+        this.logger = logger;
+    }
+
+    @Override
+    public void write(ProcessSession session, List<T> messages, 
AttributeSupplier<T> attributeSupplier, MessageConsumerCallback<T> 
messageConsumerCallback) {
+        consumeMessagesIntoRecords(session, messages, attributeSupplier, 
messageConsumerCallback);
+    }
+
+    private void consumeMessagesIntoRecords(ProcessSession session, List<T> 
messages, AttributeSupplier<T> attributeSupplier, MessageConsumerCallback<T> 
messageConsumerCallback) {

Review Comment:
   I am not sure this extra layer of call gives us any benefit



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java:
##########
@@ -375,4 +507,11 @@ private FlowFile 
updateFlowFileAttributesWithJMSAttributes(Map<String, String> j
         flowFile = processSession.putAllAttributes(flowFile, attributes);
         return flowFile;
     }
+
+    private Map<String, String> mergeJmsAttributes(Map<String, String> 
headers, Map<String, String> properties) {
+        final Map<String, String> jmsAttributes = new HashMap<>(headers);

Review Comment:
   Minor: I would add some logic in order to check if there is any overlap in 
the keys. If there is, I think the right action would be to log it in a warning 
level.



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/StandardRecordReceivedEventReporter.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.strategy.consumer.EventReporter;
+import org.apache.nifi.processor.ProcessSession;
+
+public class StandardRecordReceivedEventReporter implements EventReporter {

Review Comment:
   Instead of standard, this could have the name ProvenanceEverntReporter



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/RecordUtils.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordUtils {
+
+    public static Record append(final Record originalRecord, final Map<String, 
String> decoratorValues, final String decoratorPrefix) {
+        final List<RecordField> originalFields = 
originalRecord.getSchema().getFields();
+
+        final List<RecordField> mergedFields = new ArrayList<>(originalFields);
+        decoratorValues.forEach((key, value) -> mergedFields.add(new 
RecordField(decoratorPrefix + key, RecordFieldType.STRING.getDataType())));
+
+        final RecordSchema mergedSchema = new SimpleRecordSchema(mergedFields);
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        
originalFields.stream().map(RecordField::getFieldName).forEach(fieldName -> 
recordValues.put(fieldName, originalRecord.getValue(fieldName)));
+        decoratorValues.forEach((key, value) -> 
recordValues.put(decoratorPrefix + key, value));
+
+        return new MapRecord(mergedSchema, recordValues);
+    }
+
+    public static MapRecord wrap(final Record originalRecord, final 
Map<String, String> decoratorValues, final String decoratorKey)

Review Comment:
   I suggest to add parametrization for the key for the "standard record" as 
well



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/Serializer.java:
##########
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer;
+
+public interface Serializer<T> {

Review Comment:
   As "serialize" is a term which has a fairly well-defined meaning in Java, I 
think a different naming might be considered. Usually "marshallig" is 
considered having a pretty overlapping meaning for example



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/publisher/record/RecordBasedFlowFileReader.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.publisher.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.strategy.publisher.EventReporter;
+import org.apache.nifi.jms.processors.strategy.publisher.FlowFileReader;
+import 
org.apache.nifi.jms.processors.strategy.publisher.MessagePublisherCallback;
+import org.apache.nifi.jms.processors.strategy.publisher.MessageHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
+
+public class RecordBasedFlowFileReader implements FlowFileReader {
+
+    public static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = 
".publish.failed.index";
+
+    private final String identifier;
+    private final RecordSupplier recordSupplier;
+    private final EventReporter eventReporter;
+    private final ComponentLog logger;
+
+    public RecordBasedFlowFileReader(String identifier, RecordSupplier 
recordSupplier, EventReporter eventReporter, ComponentLog logger) {
+        this.identifier = identifier;
+        this.recordSupplier = recordSupplier;
+        this.eventReporter = eventReporter;
+        this.logger = logger;
+    }
+
+    @Override
+    public void processFlowFileContent(ProcessSession session, FlowFile 
flowFile, MessageHandler messageHandler, MessagePublisherCallback 
messagePublisherCallback) {
+        final StopWatch stopWatch = new StopWatch(true);
+        final AtomicInteger processedRecords = new AtomicInteger();
+
+        final String publishFailedIndexAttributeName = identifier + 
ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+
+        try {
+            final Long previousProcessFailedAt = 
ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+            session.read(flowFile, in -> recordSupplier.process(flowFile, in, 
processedRecords, previousProcessFailedAt, logger, messageHandler));
+
+            FlowFile successFlowFile = flowFile;
+
+            if (previousProcessFailedAt != null) {
+                successFlowFile = session.removeAttribute(flowFile, 
publishFailedIndexAttributeName);
+                eventReporter.reportRecoverEvent(session, flowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            } else {
+                eventReporter.reportSuccessEvent(session, flowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+
+            messagePublisherCallback.onSuccess(successFlowFile);
+        } catch (Exception e) {
+            logger.error("An error happened during publishing records. Routing 
to failure.", e);
+
+            final FlowFile failedFlowFile = session.putAttribute(flowFile, 
publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+            if (processedRecords.get() > 0) {
+                eventReporter.reportFailureEvent(session, failedFlowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+
+            messagePublisherCallback.onFailure(failedFlowFile, e);
+        }
+    }
+
+    public static final class RecordBasedFlowFileReaderBuilder {
+        private String identifier;
+        private RecordReaderFactory readerFactory;
+        private RecordSetWriterFactory writerFactory;
+        private EventReporter eventReporter;

Review Comment:
   If majority of the usage depends on the same settings (like there is one 
event reporter implementation and that looks to be the generally used), it 
might worth to use a default and having the withX method in place for possible 
override if needed



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/publisher/FlowFileReader.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.publisher;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface FlowFileReader {
+    void processFlowFileContent(ProcessSession session, FlowFile flowFile, 
MessageHandler messageHandler, MessagePublisherCallback 
messagePublisherCallback);

Review Comment:
   I bit too much concepts meet here: process, handler, publish(er). I would 
suggest to remediate this to a more graspable naming



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/publisher/record/RecordSupplier.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.publisher.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.strategy.publisher.MessageHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RecordSupplier {

Review Comment:
   At this point, this is not a supplier, but a "processor", or the actual 
handler.
   Is there any reason to accumulate the _number of_ processed records as a 
side effect instead of using return value or return object?



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/EventReporter.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface EventReporter {

Review Comment:
   I think the to EventReporter should be separated by name. It is a bit 
misleading



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/publisher/record/StandardRecordsPublishedEventReporter.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.publisher.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.strategy.publisher.EventReporter;
+import org.apache.nifi.processor.ProcessSession;
+
+public class StandardRecordsPublishedEventReporter implements EventReporter {

Review Comment:
   I suggest to use Provenance in the naming, just like in the other reporter



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/StandardRecordReceivedEventReporter.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.strategy.consumer.EventReporter;
+import org.apache.nifi.processor.ProcessSession;
+
+public class StandardRecordReceivedEventReporter implements EventReporter {
+
+    private final String transitUri;
+
+    public static StandardRecordReceivedEventReporter of(String transitUri) {
+        return new StandardRecordReceivedEventReporter(transitUri);
+    }
+
+    public StandardRecordReceivedEventReporter(String transitUri) {

Review Comment:
   Minor: this can be even hidden from the outside world



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/RecordWriter.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.jms.processors.strategy.consumer.AttributeSupplier;
+import org.apache.nifi.jms.processors.strategy.consumer.FlowFileWriter;
+import 
org.apache.nifi.jms.processors.strategy.consumer.MessageConsumerCallback;
+import org.apache.nifi.jms.processors.strategy.consumer.Serializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_APPENDER;
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_VALUE;
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_WRAPPER;
+
+public class RecordWriter<T> implements FlowFileWriter<T> {

Review Comment:
   NiFi already has a "RecordSetWriter", which is a distinct thing, but 
somewhat misleading. The concept reminds me the JdbcTemplate (the basic purpose 
seems to be similar as well: moving boilerplate around an external operation 
into a separate place to reuse). As of this I think it would worth to think 
about a different naming approach, like RecordWritingTemplate and the interface 
according (not 100% sure, needs some more thinking)



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/publisher/record/RecordBasedFlowFileReader.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.publisher.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.strategy.publisher.EventReporter;
+import org.apache.nifi.jms.processors.strategy.publisher.FlowFileReader;
+import 
org.apache.nifi.jms.processors.strategy.publisher.MessagePublisherCallback;
+import org.apache.nifi.jms.processors.strategy.publisher.MessageHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
+
+public class RecordBasedFlowFileReader implements FlowFileReader {
+
+    public static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = 
".publish.failed.index";
+
+    private final String identifier;
+    private final RecordSupplier recordSupplier;
+    private final EventReporter eventReporter;
+    private final ComponentLog logger;
+
+    public RecordBasedFlowFileReader(String identifier, RecordSupplier 
recordSupplier, EventReporter eventReporter, ComponentLog logger) {
+        this.identifier = identifier;
+        this.recordSupplier = recordSupplier;
+        this.eventReporter = eventReporter;
+        this.logger = logger;
+    }
+
+    @Override
+    public void processFlowFileContent(ProcessSession session, FlowFile 
flowFile, MessageHandler messageHandler, MessagePublisherCallback 
messagePublisherCallback) {
+        final StopWatch stopWatch = new StopWatch(true);
+        final AtomicInteger processedRecords = new AtomicInteger();
+
+        final String publishFailedIndexAttributeName = identifier + 
ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+
+        try {
+            final Long previousProcessFailedAt = 
ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+            session.read(flowFile, in -> recordSupplier.process(flowFile, in, 
processedRecords, previousProcessFailedAt, logger, messageHandler));
+
+            FlowFile successFlowFile = flowFile;
+
+            if (previousProcessFailedAt != null) {
+                successFlowFile = session.removeAttribute(flowFile, 
publishFailedIndexAttributeName);
+                eventReporter.reportRecoverEvent(session, flowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            } else {
+                eventReporter.reportSuccessEvent(session, flowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+
+            messagePublisherCallback.onSuccess(successFlowFile);
+        } catch (Exception e) {
+            logger.error("An error happened during publishing records. Routing 
to failure.", e);
+
+            final FlowFile failedFlowFile = session.putAttribute(flowFile, 
publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+            if (processedRecords.get() > 0) {
+                eventReporter.reportFailureEvent(session, failedFlowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+
+            messagePublisherCallback.onFailure(failedFlowFile, e);
+        }
+    }
+
+    public static final class RecordBasedFlowFileReaderBuilder {
+        private String identifier;
+        private RecordReaderFactory readerFactory;
+        private RecordSetWriterFactory writerFactory;
+        private EventReporter eventReporter;
+        private ComponentLog logger;
+
+        private RecordBasedFlowFileReaderBuilder() {
+        }
+
+        public static RecordBasedFlowFileReaderBuilder 
aRecordBasedFlowFileReader() {
+            return new RecordBasedFlowFileReaderBuilder();
+        }
+
+        public RecordBasedFlowFileReaderBuilder withIdentifier(String 
identifier) {
+            this.identifier = identifier;
+            return this;
+        }
+
+        public RecordBasedFlowFileReaderBuilder 
withReaderFactory(RecordReaderFactory readerFactory) {
+            this.readerFactory = readerFactory;
+            return this;
+        }
+
+        public RecordBasedFlowFileReaderBuilder 
withWriterFactory(RecordSetWriterFactory writerFactory) {
+            this.writerFactory = writerFactory;
+            return this;
+        }
+
+        public RecordBasedFlowFileReaderBuilder 
withEventReporter(EventReporter eventReporter) {
+            this.eventReporter = eventReporter;
+            return this;
+        }
+
+        public RecordBasedFlowFileReaderBuilder withLogger(ComponentLog 
logger) {
+            this.logger = logger;
+            return this;
+        }
+
+        public RecordBasedFlowFileReader build() {
+            return new RecordBasedFlowFileReader(identifier, new 
RecordSupplier(readerFactory, writerFactory), eventReporter, logger);

Review Comment:
   I miss the value checks from these builders. In order to ensure everything 
is set, either run-time check (validation during build) or compile-time check 
(using wizard-pattern for example) is suggested



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/publisher/record/RecordBasedFlowFileReader.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.publisher.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.strategy.publisher.EventReporter;
+import org.apache.nifi.jms.processors.strategy.publisher.FlowFileReader;
+import 
org.apache.nifi.jms.processors.strategy.publisher.MessagePublisherCallback;
+import org.apache.nifi.jms.processors.strategy.publisher.MessageHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
+
+public class RecordBasedFlowFileReader implements FlowFileReader {
+
+    public static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = 
".publish.failed.index";
+
+    private final String identifier;
+    private final RecordSupplier recordSupplier;
+    private final EventReporter eventReporter;
+    private final ComponentLog logger;
+
+    public RecordBasedFlowFileReader(String identifier, RecordSupplier 
recordSupplier, EventReporter eventReporter, ComponentLog logger) {
+        this.identifier = identifier;
+        this.recordSupplier = recordSupplier;
+        this.eventReporter = eventReporter;
+        this.logger = logger;
+    }
+
+    @Override
+    public void processFlowFileContent(ProcessSession session, FlowFile 
flowFile, MessageHandler messageHandler, MessagePublisherCallback 
messagePublisherCallback) {
+        final StopWatch stopWatch = new StopWatch(true);
+        final AtomicInteger processedRecords = new AtomicInteger();
+
+        final String publishFailedIndexAttributeName = identifier + 
ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+
+        try {
+            final Long previousProcessFailedAt = 
ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+            session.read(flowFile, in -> recordSupplier.process(flowFile, in, 
processedRecords, previousProcessFailedAt, logger, messageHandler));
+
+            FlowFile successFlowFile = flowFile;
+
+            if (previousProcessFailedAt != null) {
+                successFlowFile = session.removeAttribute(flowFile, 
publishFailedIndexAttributeName);
+                eventReporter.reportRecoverEvent(session, flowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            } else {
+                eventReporter.reportSuccessEvent(session, flowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+
+            messagePublisherCallback.onSuccess(successFlowFile);
+        } catch (Exception e) {
+            logger.error("An error happened during publishing records. Routing 
to failure.", e);
+
+            final FlowFile failedFlowFile = session.putAttribute(flowFile, 
publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+            if (processedRecords.get() > 0) {
+                eventReporter.reportFailureEvent(session, failedFlowFile, 
processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            }
+
+            messagePublisherCallback.onFailure(failedFlowFile, e);
+        }
+    }
+
+    public static final class RecordBasedFlowFileReaderBuilder {
+        private String identifier;
+        private RecordReaderFactory readerFactory;
+        private RecordSetWriterFactory writerFactory;
+        private EventReporter eventReporter;
+        private ComponentLog logger;
+
+        private RecordBasedFlowFileReaderBuilder() {
+        }
+
+        public static RecordBasedFlowFileReaderBuilder 
aRecordBasedFlowFileReader() {

Review Comment:
   Please find a more describing name :)



##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/strategy/consumer/record/RecordWriter.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.strategy.consumer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.jms.processors.strategy.consumer.AttributeSupplier;
+import org.apache.nifi.jms.processors.strategy.consumer.FlowFileWriter;
+import 
org.apache.nifi.jms.processors.strategy.consumer.MessageConsumerCallback;
+import org.apache.nifi.jms.processors.strategy.consumer.Serializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_APPENDER;
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_VALUE;
+import static 
org.apache.nifi.jms.processors.strategy.consumer.record.OutputStrategy.USE_WRAPPER;
+
+public class RecordWriter<T> implements FlowFileWriter<T> {
+
+    private final static String RECORD_COUNT_KEY = "record.count";
+
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final Serializer<T> serializer;
+    private final OutputStrategy outputStrategy;
+    private final ComponentLog logger;
+
+    public RecordWriter(RecordReaderFactory readerFactory,
+                        RecordSetWriterFactory writerFactory,
+                        Serializer<T> serializer,
+                        OutputStrategy outputStrategy,
+                        ComponentLog logger) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.serializer = serializer;
+        this.outputStrategy = outputStrategy;
+        this.logger = logger;
+    }
+
+    @Override
+    public void write(ProcessSession session, List<T> messages, 
AttributeSupplier<T> attributeSupplier, MessageConsumerCallback<T> 
messageConsumerCallback) {
+        consumeMessagesIntoRecords(session, messages, attributeSupplier, 
messageConsumerCallback);
+    }
+
+    private void consumeMessagesIntoRecords(ProcessSession session, List<T> 
messages, AttributeSupplier<T> attributeSupplier, MessageConsumerCallback<T> 
messageConsumerCallback) {
+        FlowFile flowFile = session.create();
+
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final List<T> processedMessages = new ArrayList<>();
+        final List<T> failedMessages = new ArrayList<>();
+
+        RecordSetWriter writer = null;
+        boolean isWriterInitialized = false;
+
+        try {
+            for (T message : messages) {
+                if (message == null) {
+                    break;
+                }
+
+                final byte[] recordBytes = serializer.serialize(message);
+                try (final InputStream in = new 
ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    // parse incoming message which may contain multiple 
messages
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, 
in, recordBytes.length, logger);
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to parse message due to comms 
failure. Will roll back session and try again momentarily.");
+                        messageConsumerCallback.onFailure(flowFile, 
processedMessages, failedMessages, ioe);
+                        closeWriter(writer);
+                        return;
+                    } catch (final Exception e) {
+                        logger.error("Failed to parse message, sending to the 
parse failure relationship", e);
+                        failedMessages.add(message);
+                        messageConsumerCallback.onParseFailure(flowFile, 
message, e);
+                        continue;
+                    }
+
+                    // write messages as records into FlowFile
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if (attributeSupplier != null && 
!outputStrategy.equals(USE_VALUE)) {
+                                final Map<String, String> additionalAttributes 
= attributeSupplier.supply(message);
+                                if (outputStrategy.equals(USE_APPENDER)) {

Review Comment:
   I am fond of providing the possibility to have effect on the output like 
this, but introducing branching logic as "strategy" does not look the best 
approach. Limiting the possible values using enum in the builder/property is 
straightforward, but in best case, the builder should translate these enums 
into instances of some OutputStrategy interface instances (whatever they are 
different implementations of the same implementation with different 
parametrizations). As such, the logic stored here in branches should be the 
responsibility of these implementations thus hidden from this calling logic. In 
this case I would shift toward having AppengingOutputStrategy, 
WrappingOutputStrategy and ValueOnlyOutputStrategy. All the logic from 
RecordUtils#append and RecordUtils#wrap could be moved into the respective 
classes and also: this would be an answer for the hardcoded "_" and "value" 
literals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to