tpalfy commented on code in PR #6987: URL: https://github.com/apache/nifi/pull/6987#discussion_r1152948717
########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.reader; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.StopWatch; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Optional.ofNullable; + +public class StateTrackingFlowFileReader implements FlowFileReader { + + public static final String ATTR_READ_FAILED_INDEX_SUFFIX = ".read.failed.index"; + + private final String identifier; + private final RecordSupplier recordSupplier; + private final ComponentLog logger; + + public StateTrackingFlowFileReader(String identifier, RecordSupplier recordSupplier, ComponentLog logger) { + this.identifier = identifier; + this.recordSupplier = recordSupplier; + this.logger = logger; + } + + @Override + public void read(ProcessSession session, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback) { + final StopWatch stopWatch = new StopWatch(true); + final AtomicInteger processedRecords = new AtomicInteger(); + + final String publishFailedIndexAttributeName = identifier + ATTR_READ_FAILED_INDEX_SUFFIX; + + try { + final Long previousProcessFailedAt = ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null); + + session.read(flowFile, in -> recordSupplier.process(flowFile, in, processedRecords, previousProcessFailedAt, logger, messageHandler)); + + FlowFile successFlowFile = flowFile; + + final boolean isRecover = previousProcessFailedAt != null; + if (isRecover) { + successFlowFile = session.removeAttribute(flowFile, publishFailedIndexAttributeName); + } + + flowFileReaderCallback.onSuccess(successFlowFile, processedRecords.get(), isRecover, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } catch (Exception e) { + logger.error("An error happened while processing records. Routing to failure.", e); + + final FlowFile failedFlowFile = session.putAttribute(flowFile, publishFailedIndexAttributeName, String.valueOf(processedRecords.get())); + + flowFileReaderCallback.onFailure(failedFlowFile, processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e); + } + } + +} Review Comment: ```suggestion public class ProgressTrackingFlowFileContentProcessor implements FlowFileContentProcessor { public static final String PROCESS_FAILED_INDEX_ATTRIBUTE_SUFFIX = ".process.failed.index"; private final String processFailedIndexAttributeName; private final RecordProcessor recordProcessor; private final ComponentLog logger; public ProgressTrackingFlowFileContentProcessor(String processFailedIndexAttributePrefix, RecordProcessor recordProcessor, ComponentLog logger) { this.processFailedIndexAttributeName = processFailedIndexAttributePrefix + PROCESS_FAILED_INDEX_ATTRIBUTE_SUFFIX; this.recordProcessor = recordProcessor; this.logger = logger; } @Override public void processContent(ProcessSession session, FlowFile flowFile, FlowFileContentHandler flowFileContentHandler, FlowFileContentProcessorCallback flowFileContentProcessorCallback) { final StopWatch stopWatch = new StopWatch(true); final AtomicInteger nrOfProcessedRecordsHolder = new AtomicInteger(); try { final Long previousProcessFailedAt = ofNullable(flowFile.getAttribute(processFailedIndexAttributeName)).map(Long::valueOf).orElse(null); session.read(flowFile, in -> { int nrOfProcessedRecords = recordProcessor.processRecords(flowFile, in, previousProcessFailedAt, logger, flowFileContentHandler); nrOfProcessedRecordsHolder.set(nrOfProcessedRecords); }); FlowFile successFlowFile = flowFile; final boolean isReattempt = previousProcessFailedAt != null; if (isReattempt) { successFlowFile = session.removeAttribute(flowFile, processFailedIndexAttributeName); } flowFileContentProcessorCallback.onSuccess(successFlowFile, nrOfProcessedRecordsHolder.get(), isReattempt, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } catch (Exception e) { logger.error("An error happened while processing records. Routing to failure.", e); final FlowFile failedFlowFile = session.putAttribute(flowFile, processFailedIndexAttributeName, String.valueOf(nrOfProcessedRecordsHolder.get())); flowFileContentProcessorCallback.onFailure(failedFlowFile, nrOfProcessedRecordsHolder.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e); } } } ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.reader; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; + +public interface FlowFileReader { + void read(ProcessSession session, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback); +} Review Comment: ```suggestion public interface FlowFileContentProcessor { void processContent(ProcessSession session, FlowFile flowFile, FlowFileContentHandler flowFileContentHandler, FlowFileContentProcessorCallback flowFileContentProcessorCallback); } ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java: ########## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.writer; + +import java.util.Map; + +public interface AttributeSource<T> { + Map<String, String> getAttributes(T message); Review Comment: ```suggestion Map<String, String> getAttributes(T contentObject); ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java: ########## @@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin } } - /** * Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message. */ - public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, - final String charset, final ConsumerCallback consumerCallback) { - this.jmsTemplate.execute(new SessionCallback<Void>() { - @Override - public Void doInJms(final Session session) throws JMSException { - - final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector); - try { - final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); - - // If there is no message, there's nothing for us to do. We can simply close the consumer and return. - if (message == null) { - JmsUtils.closeMessageConsumer(msgConsumer); - return null; - } - - String messageType; - byte[] messageBody; - - try { - if (message instanceof TextMessage) { - messageType = TextMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset)); - } else if (message instanceof BytesMessage) { - messageType = BytesMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); - } else if (message instanceof ObjectMessage) { - messageType = ObjectMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message); - } else if (message instanceof StreamMessage) { - messageType = StreamMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message); - } else if (message instanceof MapMessage) { - messageType = MapMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message); - } else { - acknowledge(message, session); - - if (errorQueueName != null) { - processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName}); - jmsTemplate.send(errorQueueName, __ -> message); - } else { - processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message}); - } - - return null; - } - } catch (final MessageConversionException mce) { - processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", - new Object[] {message}, mce); - acknowledge(message, session); - - if (errorQueueName != null) { - jmsTemplate.send(errorQueueName, __ -> message); - } - - return null; - } + public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, + final String charset, final Consumer<JMSResponse> singleMessageConsumer) { + doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> { + final JMSResponse response = receiveMessage(session, messageConsumer, charset, errorQueueName); + if (response != null) { + // Provide the JMSResponse to the processor to handle. It is the responsibility of the + // processor to handle acknowledgment of the message (if Client Acknowledge), and it is + // the responsibility of the processor to handle closing the Message Consumer. + // Both of these actions can be handled by calling the acknowledge() or reject() methods of + // the JMSResponse. + singleMessageConsumer.accept(response); + } + }); + } - final Map<String, String> messageHeaders = extractMessageHeaders(message); - final Map<String, String> messageProperties = extractMessageProperties(message); - final JMSResponse response = new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer); + /** + * Receives a list of messages from the broker. It is the consumerCallback's responsibility to acknowledge the received message. + */ + public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, + final String charset, final Consumer<List<JMSResponse>> messageSetConsumer) { + doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() { Review Comment: ```suggestion processMessage(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() { ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java: ########## @@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin } } - /** * Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message. */ - public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, - final String charset, final ConsumerCallback consumerCallback) { - this.jmsTemplate.execute(new SessionCallback<Void>() { - @Override - public Void doInJms(final Session session) throws JMSException { - - final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector); - try { - final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); - - // If there is no message, there's nothing for us to do. We can simply close the consumer and return. - if (message == null) { - JmsUtils.closeMessageConsumer(msgConsumer); - return null; - } - - String messageType; - byte[] messageBody; - - try { - if (message instanceof TextMessage) { - messageType = TextMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset)); - } else if (message instanceof BytesMessage) { - messageType = BytesMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); - } else if (message instanceof ObjectMessage) { - messageType = ObjectMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message); - } else if (message instanceof StreamMessage) { - messageType = StreamMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message); - } else if (message instanceof MapMessage) { - messageType = MapMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message); - } else { - acknowledge(message, session); - - if (errorQueueName != null) { - processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName}); - jmsTemplate.send(errorQueueName, __ -> message); - } else { - processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message}); - } - - return null; - } - } catch (final MessageConversionException mce) { - processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", - new Object[] {message}, mce); - acknowledge(message, session); - - if (errorQueueName != null) { - jmsTemplate.send(errorQueueName, __ -> message); - } - - return null; - } + public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, + final String charset, final Consumer<JMSResponse> singleMessageConsumer) { + doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> { + final JMSResponse response = receiveMessage(session, messageConsumer, charset, errorQueueName); + if (response != null) { + // Provide the JMSResponse to the processor to handle. It is the responsibility of the + // processor to handle acknowledgment of the message (if Client Acknowledge), and it is + // the responsibility of the processor to handle closing the Message Consumer. + // Both of these actions can be handled by calling the acknowledge() or reject() methods of + // the JMSResponse. + singleMessageConsumer.accept(response); + } + }); + } - final Map<String, String> messageHeaders = extractMessageHeaders(message); - final Map<String, String> messageProperties = extractMessageProperties(message); - final JMSResponse response = new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer); + /** + * Receives a list of messages from the broker. It is the consumerCallback's responsibility to acknowledge the received message. + */ + public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, + final String charset, final Consumer<List<JMSResponse>> messageSetConsumer) { + doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() { + @Override + public void consume(Session session, MessageConsumer messageConsumer) throws JMSException { + final List<JMSResponse> jmsResponses = new ArrayList<>(); + int batchCounter = 0; + + JMSResponse response; + while ((response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) { + response.setBatchOrder(batchCounter); + jmsResponses.add(response); + batchCounter++; + } + if (!jmsResponses.isEmpty()) { // Provide the JMSResponse to the processor to handle. It is the responsibility of the // processor to handle acknowledgment of the message (if Client Acknowledge), and it is // the responsibility of the processor to handle closing the Message Consumer. // Both of these actions can be handled by calling the acknowledge() or reject() methods of // the JMSResponse. - consumerCallback.accept(response); + messageSetConsumer.accept(jmsResponses); + } + } + }); + } + + private void doWithJmsTemplate(String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector, MessageReceiver messageReceiver) { Review Comment: ```suggestion private void processMessage(String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector, MessageProcessor messageProcessor) { ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java: ########## @@ -286,13 +333,18 @@ public void acknowledge() throws JMSException { public void reject() { JmsUtils.closeMessageConsumer(messageConsumer); } + + public Integer getBatchOrder() { + return batchOrder; + } + + public void setBatchOrder(Integer batchOrder) { + this.batchOrder = batchOrder; + } } - /** - * Callback to be invoked while executing inJMS call (the call within the - * live JMS session) - */ - static interface ConsumerCallback { - void accept(JMSResponse response); + interface MessageReceiver { + void consume(Session session, MessageConsumer messageConsumer) throws JMSException; Review Comment: ```suggestion interface MessageProcessor { void processMessage(Session session, MessageConsumer messageConsumer) throws JMSException; ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.reader; + +import org.apache.nifi.flowfile.FlowFile; + +public interface FlowFileReaderCallback { Review Comment: ```suggestion public interface FlowFileContentProcessorCallback { ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java: ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.writer.record; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.jms.processors.ioconcept.writer.AttributeSource; +import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter; +import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback; +import org.apache.nifi.jms.processors.ioconcept.writer.Marshaller; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SchemaValidationException; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_APPENDER; +import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_VALUE; +import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_WRAPPER; + +public class RecordWriter<T> implements FlowFileWriter<T> { + + private final static String RECORD_COUNT_KEY = "record.count"; + + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; + private final Marshaller<T> marshaller; + private final AttributeSource<T> attributeSource; + private final OutputStrategy outputStrategy; + private final ComponentLog logger; + + public RecordWriter(RecordReaderFactory readerFactory, + RecordSetWriterFactory writerFactory, + Marshaller<T> marshaller, + AttributeSource<T> attributeSource, + OutputStrategy outputStrategy, + ComponentLog logger) { + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.marshaller = marshaller; + this.attributeSource = attributeSource; + this.outputStrategy = outputStrategy; + this.logger = logger; + } + + @Override + public void write(ProcessSession session, List<T> messages, FlowFileWriterCallback<T> flowFileWriterCallback) { + FlowFile flowFile = session.create(); + + final Map<String, String> attributes = new HashMap<>(); + final AtomicInteger recordCount = new AtomicInteger(); + + final List<T> processedMessages = new ArrayList<>(); + final List<T> failedMessages = new ArrayList<>(); + + RecordSetWriter writer = null; + boolean isWriterInitialized = false; + + try { + for (T message : messages) { + if (message == null) { + break; + } + + final byte[] recordBytes = marshaller.marshall(message); + try (final InputStream in = new ByteArrayInputStream(recordBytes)) { + final RecordReader reader; + + // parse incoming message which may contain multiple messages + try { + reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger); + } catch (final IOException ioe) { + logger.error("Failed to parse message due to comms failure. Will roll back session and try again momentarily."); + flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, ioe); + closeWriter(writer); + return; + } catch (final Exception e) { + logger.error("Failed to parse message, sending to the parse failure relationship", e); + failedMessages.add(message); + flowFileWriterCallback.onParseFailure(flowFile, message, e); + continue; + } + + // write messages as records into FlowFile + try { + Record record; + while ((record = reader.nextRecord()) != null) { + + if (attributeSource != null && !outputStrategy.equals(USE_VALUE)) { + final Map<String, String> additionalAttributes = attributeSource.getAttributes(message); + if (outputStrategy.equals(USE_APPENDER)) { + record = RecordUtils.append(record, additionalAttributes, "_"); + } else if (outputStrategy.equals(USE_WRAPPER)){ + record = RecordUtils.wrap(record, "value", additionalAttributes, "_"); + } + } + + if (!isWriterInitialized) { + final RecordSchema recordSchema = record.getSchema(); + final OutputStream rawOut = session.write(flowFile); + + RecordSchema writeSchema; + try { + writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); + } catch (final Exception e) { + logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e); + failedMessages.add(message); + flowFileWriterCallback.onParseFailure(flowFile, message, e); + continue; + } + + writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile); + writer.beginRecordSet(); + } + + try { + writer.write(record); + isWriterInitialized = true; + processedMessages.add(message); + } catch (final RuntimeException re) { + logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re); + failedMessages.add(message); + flowFileWriterCallback.onParseFailure(flowFile, message, re); + } + } + } catch (final IOException | MalformedRecordException | SchemaValidationException e) { + logger.error("Failed to write message, sending to the parse failure relationship", e); + failedMessages.add(message); + flowFileWriterCallback.onParseFailure(flowFile, message, e); + } + } catch (Exception e) { + logger.error("Failed to write message, sending to the parse failure relationship", e); + failedMessages.add(message); + flowFileWriterCallback.onParseFailure(flowFile, message, e); + } + } + + if (writer != null) { + final WriteResult writeResult = writer.finishRecordSet(); + attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + recordCount.set(writeResult.getRecordCount()); + } + + } catch (final Exception e) { + flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, e); + } finally { + closeWriter(writer); + } + + if (recordCount.get() == 0) { + session.remove(flowFile); + return; + } + + session.putAllAttributes(flowFile, attributes); + + final int count = recordCount.get(); + logger.info("Successfully processed {} records for {}", count, flowFile); + + flowFileWriterCallback.onSuccess(flowFile, processedMessages, failedMessages); + } Review Comment: ```suggestion public void writeToFlowFile(ProcessSession session, List<T> contentObjects, FlowFileWriterCallback<T> flowFileWriterCallback) { FlowFile flowFile = session.create(); final Map<String, String> attributes = new HashMap<>(); final AtomicInteger recordCount = new AtomicInteger(); final List<T> processedMessages = new ArrayList<>(); final List<T> failedMessages = new ArrayList<>(); RecordSetWriter recordSetWriter = null; boolean isWriterInitialized = false; try { for (T contentOjbect : contentObjects) { if (contentOjbect == null) { break; } final byte[] contentBytes = marshaller.marshall(contentOjbect); try (final InputStream contentInputStream = new ByteArrayInputStream(contentBytes)) { final RecordReader recordReader; // parse incoming message which may contain multiple messages try { recordReader = readerFactory.createRecordReader(attributes, contentInputStream, contentBytes.length, logger); } catch (final IOException ioe) { logger.error("Failed to parse message due to comms failure. Will roll back session and try again momentarily."); flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, ioe); closeWriter(recordSetWriter); return; } catch (final Exception e) { logger.error("Failed to parse message, sending to the parse failure relationship", e); failedMessages.add(contentOjbect); flowFileWriterCallback.onParseFailure(flowFile, contentOjbect, e); continue; } // write messages as records into FlowFile try { Record record; while ((record = recordReader.nextRecord()) != null) { if (attributeSource != null && !outputStrategy.equals(USE_VALUE)) { final Map<String, String> additionalAttributes = attributeSource.getAttributes(contentOjbect); if (outputStrategy.equals(USE_APPENDER)) { record = RecordUtils.append(record, additionalAttributes, "_"); } else if (outputStrategy.equals(USE_WRAPPER)){ record = RecordUtils.wrap(record, "value", additionalAttributes, "_"); } } if (!isWriterInitialized) { final RecordSchema recordSchema = record.getSchema(); final OutputStream rawOut = session.write(flowFile); RecordSchema writeSchema; try { writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); } catch (final Exception e) { logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e); failedMessages.add(contentOjbect); flowFileWriterCallback.onParseFailure(flowFile, contentOjbect, e); continue; } recordSetWriter = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile); recordSetWriter.beginRecordSet(); } try { recordSetWriter.write(record); isWriterInitialized = true; processedMessages.add(contentOjbect); } catch (final RuntimeException re) { logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re); failedMessages.add(contentOjbect); flowFileWriterCallback.onParseFailure(flowFile, contentOjbect, re); } } } catch (final IOException | MalformedRecordException | SchemaValidationException e) { logger.error("Failed to write message, sending to the parse failure relationship", e); failedMessages.add(contentOjbect); flowFileWriterCallback.onParseFailure(flowFile, contentOjbect, e); } } catch (Exception e) { logger.error("Failed to write message, sending to the parse failure relationship", e); failedMessages.add(contentOjbect); flowFileWriterCallback.onParseFailure(flowFile, contentOjbect, e); } } if (recordSetWriter != null) { final WriteResult writeResult = recordSetWriter.finishRecordSet(); attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType()); attributes.putAll(writeResult.getAttributes()); recordCount.set(writeResult.getRecordCount()); } } catch (final Exception e) { flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, e); } finally { closeWriter(recordSetWriter); } if (recordCount.get() == 0) { session.remove(flowFile); return; } session.putAllAttributes(flowFile, attributes); final int count = recordCount.get(); logger.info("Successfully processed {} records for {}", count, flowFile); flowFileWriterCallback.onSuccess(flowFile, processedMessages, failedMessages); } ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java: ########## @@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin } } - /** * Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message. */ - public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, - final String charset, final ConsumerCallback consumerCallback) { - this.jmsTemplate.execute(new SessionCallback<Void>() { - @Override - public Void doInJms(final Session session) throws JMSException { - - final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector); - try { - final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); - - // If there is no message, there's nothing for us to do. We can simply close the consumer and return. - if (message == null) { - JmsUtils.closeMessageConsumer(msgConsumer); - return null; - } - - String messageType; - byte[] messageBody; - - try { - if (message instanceof TextMessage) { - messageType = TextMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset)); - } else if (message instanceof BytesMessage) { - messageType = BytesMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); - } else if (message instanceof ObjectMessage) { - messageType = ObjectMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message); - } else if (message instanceof StreamMessage) { - messageType = StreamMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message); - } else if (message instanceof MapMessage) { - messageType = MapMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message); - } else { - acknowledge(message, session); - - if (errorQueueName != null) { - processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName}); - jmsTemplate.send(errorQueueName, __ -> message); - } else { - processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message}); - } - - return null; - } - } catch (final MessageConversionException mce) { - processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", - new Object[] {message}, mce); - acknowledge(message, session); - - if (errorQueueName != null) { - jmsTemplate.send(errorQueueName, __ -> message); - } - - return null; - } + public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, + final String charset, final Consumer<JMSResponse> singleMessageConsumer) { + doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> { Review Comment: ```suggestion processMessage(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> { ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.writer; + +import org.apache.nifi.flowfile.FlowFile; + +import java.util.List; + +public interface FlowFileWriterCallback<T> { + void onSuccess(FlowFile flowFile, List<T> processedMessages, List<T> failedMessages); + void onParseFailure(FlowFile flowFile, T message, Exception e); + void onFailure(FlowFile flowFile, List<T> processedMessages, List<T> failedMessages, Exception e); Review Comment: ```suggestion void onSuccess(FlowFile flowFile, List<T> processedContentObjects, List<T> failedContentObjects); void onParseFailure(FlowFile flowFile, T contentObject, Exception e); void onFailure(FlowFile flowFile, List<T> processedContentObjects, List<T> failedContentObjects, Exception e);FlowFile flowFile, List<T> processedMessages, List<T> failedMessages, Exception e); ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java: ########## @@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin } } - /** * Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message. */ - public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, - final String charset, final ConsumerCallback consumerCallback) { - this.jmsTemplate.execute(new SessionCallback<Void>() { - @Override - public Void doInJms(final Session session) throws JMSException { - - final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector); - try { - final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); - - // If there is no message, there's nothing for us to do. We can simply close the consumer and return. - if (message == null) { - JmsUtils.closeMessageConsumer(msgConsumer); - return null; - } - - String messageType; - byte[] messageBody; - - try { - if (message instanceof TextMessage) { - messageType = TextMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset)); - } else if (message instanceof BytesMessage) { - messageType = BytesMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); - } else if (message instanceof ObjectMessage) { - messageType = ObjectMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message); - } else if (message instanceof StreamMessage) { - messageType = StreamMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message); - } else if (message instanceof MapMessage) { - messageType = MapMessage.class.getSimpleName(); - messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message); - } else { - acknowledge(message, session); - - if (errorQueueName != null) { - processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName}); - jmsTemplate.send(errorQueueName, __ -> message); - } else { - processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message}); - } - - return null; - } - } catch (final MessageConversionException mce) { - processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", - new Object[] {message}, mce); - acknowledge(message, session); - - if (errorQueueName != null) { - jmsTemplate.send(errorQueueName, __ -> message); - } - - return null; - } + public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, + final String charset, final Consumer<JMSResponse> singleMessageConsumer) { + doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> { + final JMSResponse response = receiveMessage(session, messageConsumer, charset, errorQueueName); + if (response != null) { + // Provide the JMSResponse to the processor to handle. It is the responsibility of the + // processor to handle acknowledgment of the message (if Client Acknowledge), and it is + // the responsibility of the processor to handle closing the Message Consumer. + // Both of these actions can be handled by calling the acknowledge() or reject() methods of + // the JMSResponse. + singleMessageConsumer.accept(response); + } + }); + } - final Map<String, String> messageHeaders = extractMessageHeaders(message); - final Map<String, String> messageProperties = extractMessageProperties(message); - final JMSResponse response = new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer); + /** + * Receives a list of messages from the broker. It is the consumerCallback's responsibility to acknowledge the received message. + */ + public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, + final String charset, final Consumer<List<JMSResponse>> messageSetConsumer) { + doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() { + @Override + public void consume(Session session, MessageConsumer messageConsumer) throws JMSException { + final List<JMSResponse> jmsResponses = new ArrayList<>(); + int batchCounter = 0; + + JMSResponse response; + while ((response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) { + response.setBatchOrder(batchCounter); + jmsResponses.add(response); + batchCounter++; + } + if (!jmsResponses.isEmpty()) { // Provide the JMSResponse to the processor to handle. It is the responsibility of the // processor to handle acknowledgment of the message (if Client Acknowledge), and it is // the responsibility of the processor to handle closing the Message Consumer. // Both of these actions can be handled by calling the acknowledge() or reject() methods of // the JMSResponse. - consumerCallback.accept(response); + messageSetConsumer.accept(jmsResponses); + } + } + }); + } + + private void doWithJmsTemplate(String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector, MessageReceiver messageReceiver) { + this.jmsTemplate.execute(new SessionCallback<Void>() { + @Override + public Void doInJms(final Session session) throws JMSException { + + final MessageConsumer messageConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector); + try { + messageReceiver.consume(session, messageConsumer); Review Comment: ```suggestion messageProcessor.consume(session, messageConsumer); ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.writer; + +import org.apache.nifi.processor.ProcessSession; + +import java.util.List; + +public interface FlowFileWriter<T> { + void write(ProcessSession session, List<T> messages, FlowFileWriterCallback<T> flowFileWriterCallback); Review Comment: ```suggestion void writeToFlowFile(ProcessSession session, List<T> contentObjects, FlowFileWriterCallback<T> flowFileWriterCallback); ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java: ########## @@ -199,36 +221,63 @@ protected void rendezvousWithJms(ProcessContext context, ProcessSession processS } } - switch (context.getProperty(MESSAGE_BODY).getValue()) { - case TEXT_MESSAGE: - try { - publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend); - } catch(Exception e) { - publisher.setValid(false); - throw e; - } - break; - case BYTES_MESSAGE: - default: - try { - publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), attributesToSend); - } catch(Exception e) { - publisher.setValid(false); - throw e; - } - break; + if (context.getProperty(RECORD_READER).isSet()) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + final FlowFileReader flowFileReader = new StateTrackingFlowFileReader( + getIdentifier(), + new RecordSupplier(readerFactory, writerFactory), + getLogger() + ); + + flowFileReader.read( + processSession, + flowFile, + content -> publisher.publish(destinationName, content, attributesToSend), + new FlowFileReaderCallback() { + @Override + public void onSuccess(FlowFile flowFile, int processedRecords, boolean isRecover, long transmissionMillis) { + final String eventTemplate = isRecover ? PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER : PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS; + processSession.getProvenanceReporter().send( + flowFile, + destinationName, + String.format(eventTemplate, processedRecords), + transmissionMillis); + + processSession.transfer(flowFile, REL_SUCCESS); + } + + @Override + public void onFailure(FlowFile flowFile, int processedRecords, long transmissionMillis, Exception e) { + processSession.getProvenanceReporter().send( + flowFile, + destinationName, + String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords), + transmissionMillis); + + handleException(context, processSession, publisher, flowFile, e); + } + } + ); Review Comment: ```suggestion final FlowFileContentProcessor flowFileContentProcessor = new ProgressTrackingFlowFileContentProcessor( getIdentifier(), new RecordProcessor(readerFactory, writerFactory), getLogger() ); final FlowFileContentProcessorCallback flowFileContentProcessorCallback = new FlowFileContentProcessorCallback() { @Override public void onSuccess(FlowFile flowFile, int processedRecords, boolean isRecover, long transmissionMillis) { final String eventTemplate = isRecover ? PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER : PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS; processSession.getProvenanceReporter().send( flowFile, destinationName, String.format(eventTemplate, processedRecords), transmissionMillis); processSession.transfer(flowFile, REL_SUCCESS); } @Override public void onFailure(FlowFile flowFile, int processedRecords, long transmissionMillis, Exception e) { processSession.getProvenanceReporter().send( flowFile, destinationName, String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords), transmissionMillis); handleException(context, processSession, publisher, flowFile, e); } }; flowFileContentProcessor.processContent( processSession, flowFile, content -> publisher.publish(destinationName, content, attributesToSend), flowFileContentProcessorCallback ); ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java: ########## @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.writer; + +public interface Marshaller<T> { + byte[] marshall(T message); Review Comment: ```suggestion byte[] marshall(T contentObject); ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.reader.record; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.jms.processors.ioconcept.reader.MessageHandler; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; + +public class RecordSupplier { + + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; + + public RecordSupplier(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory) { + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + } + + public void process(FlowFile flowfile, InputStream in, AtomicInteger processedRecords, Long processFromIndex, ComponentLog logger, MessageHandler messageHandler) throws IOException { + + try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) { + final RecordSet recordSet = reader.createRecordSet(); + + final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + + Record record; + while ((record = recordSet.next()) != null) { + if (processFromIndex != null && processedRecords.get() < processFromIndex) { + processedRecords.getAndIncrement(); + continue; + } + + baos.reset(); + + try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) { + writer.write(record); + writer.flush(); + } + + final byte[] messageContent = baos.toByteArray(); + + messageHandler.handle(messageContent); + + processedRecords.getAndIncrement(); + } + } catch (SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("An error happened during creating components for serialization.", e); + } + } + +} Review Comment: ```suggestion public class RecordProcessor { private final RecordReaderFactory readerFactory; private final RecordSetWriterFactory writerFactory; public RecordProcessor(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory) { this.readerFactory = readerFactory; this.writerFactory = writerFactory; } public int processRecords(FlowFile flowfile, InputStream in, Long processFromIndex, ComponentLog logger, FlowFileContentHandler flowFileContentHandler) throws IOException { int nrOfProcessedRecords = 0; try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) { final RecordSet recordSet = reader.createRecordSet(); final RecordSchema writerSchema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema()); final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); Record record; while ((record = recordSet.next()) != null) { if (processFromIndex != null && nrOfProcessedRecords < processFromIndex) { nrOfProcessedRecords++; continue; } baos.reset(); try (final RecordSetWriter writer = writerFactory.createWriter(logger, writerSchema, baos, flowfile)) { writer.write(record); writer.flush(); } final byte[] rawContent = baos.toByteArray(); flowFileContentHandler.handle(rawContent); nrOfProcessedRecords++; } } catch (SchemaNotFoundException | MalformedRecordException e) { throw new ProcessException("An error happened during creating components for serialization.", e); } return nrOfProcessedRecords; } } ``` ########## nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java: ########## @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.ioconcept.reader; + +public interface MessageHandler { Review Comment: ```suggestion public interface FlowFileContentHandler { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
