This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e8c914630 NIFI-14696 Improved ConsumeKinesisStream handling of Record 
Schema Differences (#10053)
9e8c914630 is described below

commit 9e8c914630bbc04c6fc0e5ab1514dde05f535a30
Author: Dariusz Seweryn <[email protected]>
AuthorDate: Tue Aug 5 22:59:33 2025 +0200

    NIFI-14696 Improved ConsumeKinesisStream handling of Record Schema 
Differences (#10053)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../property/SchemaDifferenceHandlingStrategy.java |  49 ++++
 .../aws/kinesis/stream/ConsumeKinesisStream.java   |  17 +-
 .../record/AbstractKinesisRecordProcessor.java     |  46 ++--
 .../stream/record/KinesisRecordProcessorRaw.java   |  16 +-
 .../record/KinesisRecordProcessorRecord.java       | 268 ++++++++++++++-------
 .../stream/record/StateHandlerStrategy.java        | 105 ++++++++
 .../record/TestAbstractKinesisRecordProcessor.java |   6 +-
 .../record/TestKinesisRecordProcessorRecord.java   | 245 +++++++++++++++----
 8 files changed, 597 insertions(+), 155 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/SchemaDifferenceHandlingStrategy.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/SchemaDifferenceHandlingStrategy.java
new file mode 100644
index 0000000000..ef9c647c31
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/SchemaDifferenceHandlingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum SchemaDifferenceHandlingStrategy implements DescribedValue {
+    CREATE_FLOW_FILE("Create FlowFile", "Create a new FlowFile for each record 
with a different schema. The previous FlowFile will be completed with the 
records that have the previous schema." +
+            " Emitted FlowFiles will contain continuous record sequences."),
+    GROUP_RECORDS("Group Records By Schema", "Group records with the same 
schema into a single FlowFile. If a record with a different schema is 
encountered, a new FlowFile will be created for" +
+            " the new schema. Emitted FlowFiles may contain non-sequential 
records. This strategy is useful when the schema changes frequently and highest 
performance is required.");
+
+    private final String displayName;
+    private final String description;
+
+    SchemaDifferenceHandlingStrategy(final String displayName, final String 
description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
index 7d1b8738c5..9cf3dd8e9e 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
@@ -49,6 +49,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
 import org.apache.nifi.processors.aws.kinesis.property.OutputStrategy;
 import 
org.apache.nifi.processors.aws.kinesis.stream.pause.StandardRecordProcessorBlocker;
 import 
org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
@@ -319,6 +320,15 @@ public class ConsumeKinesisStream extends 
AbstractAwsAsyncProcessor<KinesisAsync
             .dependsOn(RECORD_WRITER)
             .build();
 
+    public static final PropertyDescriptor 
FLOW_FILE_HANDLING_ON_SCHEMA_CHANGE_STRATEGY = new PropertyDescriptor.Builder()
+            .name("FlowFile Handling On Schema Difference")
+            .description("The strategy used when records in a Kinesis Stream 
change their schema in a single batch.")
+            .required(true)
+            .defaultValue(SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE)
+            .allowableValues(SchemaDifferenceHandlingStrategy.class)
+            .dependsOn(RECORD_WRITER)
+            .build();
+
     public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
             .name("parse.failure")
             .description("If a message from Kinesis cannot be parsed using the 
configured Record Reader" +
@@ -333,6 +343,7 @@ public class ConsumeKinesisStream extends 
AbstractAwsAsyncProcessor<KinesisAsync
             RECORD_READER,
             RECORD_WRITER,
             OUTPUT_STRATEGY,
+            FLOW_FILE_HANDLING_ON_SCHEMA_CHANGE_STRATEGY,
             REGION,
             ENDPOINT_OVERRIDE,
             DYNAMODB_ENDPOINT_OVERRIDE,
@@ -727,12 +738,14 @@ public class ConsumeKinesisStream extends 
AbstractAwsAsyncProcessor<KinesisAsync
                 final RecordConverter recordConverter = 
OutputStrategy.USE_WRAPPER == outputStrategy
                         ? new RecordConverterWrapper()
                         : new RecordConverterIdentity();
+                final SchemaDifferenceHandlingStrategy 
schemaDifferenceHandlingStrategy = 
context.getProperty(FLOW_FILE_HANDLING_ON_SCHEMA_CHANGE_STRATEGY)
+                        
.asAllowableValue(SchemaDifferenceHandlingStrategy.class);
                 return new KinesisRecordProcessorRecord(
                         sessionFactory, getLogger(), getStreamName(context), 
getEndpointPrefix(context),
                         getKinesisEndpoint(context).orElse(null), 
getCheckpointIntervalMillis(context),
                         getRetryWaitMillis(context), getNumRetries(context), 
getDateTimeFormatter(context),
-                        getReaderFactory(context), getWriterFactory(context), 
recordConverter, recordProcessorBlocker
-                );
+                        getReaderFactory(context), getWriterFactory(context), 
recordConverter, recordProcessorBlocker,
+                        schemaDifferenceHandlingStrategy);
             } else {
                 return new KinesisRecordProcessorRaw(
                         sessionFactory, getLogger(), getStreamName(context), 
getEndpointPrefix(context),
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
index e2da8e9297..014b91f756 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
@@ -133,8 +133,10 @@ public abstract class AbstractKinesisRecordProcessor 
implements ShardRecordProce
                 final StopWatch stopWatch = new StopWatch(true);
                 session = sessionFactory.createSession();
 
+                final BatchProcessingContext batchProcessingContext = new 
BatchProcessingContext(session, flowFiles, stopWatch);
                 startProcessingRecords();
-                final int recordsTransformed = 
processRecordsWithRetries(records, flowFiles, session, stopWatch);
+                final int recordsTransformed = 
processRecordsWithRetries(records, batchProcessingContext);
+                finishProcessingRecords(batchProcessingContext);
                 transferTo(ConsumeKinesisStream.REL_SUCCESS, session, 
records.size(), recordsTransformed, flowFiles);
 
                 session.commitAsync(() -> {
@@ -157,14 +159,14 @@ public abstract class AbstractKinesisRecordProcessor 
implements ShardRecordProce
         processingRecords = true;
     }
 
-    private int processRecordsWithRetries(final List<KinesisClientRecord> 
records, final List<FlowFile> flowFiles,
-                                           final ProcessSession session, final 
StopWatch stopWatch) {
+    void finishProcessingRecords(final BatchProcessingContext 
batchProcessingContext) { }
+
+    private int processRecordsWithRetries(final List<KinesisClientRecord> 
records, final BatchProcessingContext batchProcessingContext) {
         int recordsTransformed = 0;
-        for (int r = 0; r < records.size(); r++) {
-            final KinesisClientRecord kinesisRecord = records.get(r);
+        for (final KinesisClientRecord kinesisRecord : records) {
             boolean processedSuccessfully = false;
             for (int i = 0; !processedSuccessfully && i < numRetries; i++) {
-                processedSuccessfully = attemptProcessRecord(flowFiles, 
kinesisRecord, r == records.size() - 1, session, stopWatch);
+                processedSuccessfully = attemptProcessRecord(kinesisRecord, 
batchProcessingContext);
             }
 
             if (processedSuccessfully) {
@@ -177,12 +179,13 @@ public abstract class AbstractKinesisRecordProcessor 
implements ShardRecordProce
         return recordsTransformed;
     }
 
-    private boolean attemptProcessRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
-                                         final ProcessSession session, final 
StopWatch stopWatch) {
+    private boolean attemptProcessRecord(final KinesisClientRecord 
kinesisRecord, final BatchProcessingContext batchProcessingContext) {
         boolean processedSuccessfully = false;
         try {
-            processRecord(flowFiles, kinesisRecord, lastRecord, session, 
stopWatch);
+            processRecord(kinesisRecord, batchProcessingContext);
             processedSuccessfully = true;
+        } catch (final KinesisBatchUnrecoverableException e) {
+            throw e;
         } catch (final Exception e) {
             log.error("Caught Exception while processing Kinesis record {}", 
kinesisRecord, e);
 
@@ -200,16 +203,11 @@ public abstract class AbstractKinesisRecordProcessor 
implements ShardRecordProce
     /**
      * Process an individual {@link Record} and serialise to {@link FlowFile}
      *
-     * @param flowFiles {@link List} of {@link FlowFile}s to be output after 
all processing is complete
      * @param kinesisRecord the Kinesis {@link Record} to be processed
-     * @param lastRecord whether this is the last {@link Record} to be 
processed in this batch
-     * @param session {@link ProcessSession} into which {@link FlowFile}s will 
be transferred
-     * @param stopWatch {@link StopWatch} tracking how much time has been 
spent processing the current batch
-     *
+     * @param batchProcessingContext the {@link BatchProcessingContext} for 
the current batch of records being processed, containing the session, flow 
files and stopwatch
      * @throws RuntimeException if there are any unhandled Exceptions that 
should be retried
      */
-    abstract void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
-                                final ProcessSession session, final StopWatch 
stopWatch);
+    abstract void processRecord(final KinesisClientRecord kinesisRecord, final 
BatchProcessingContext batchProcessingContext) throws RuntimeException;
 
     void reportProvenance(final ProcessSession session, final FlowFile 
flowFile, final String partitionKey,
                  final String sequenceNumber, final StopWatch stopWatch) {
@@ -345,4 +343,20 @@ public abstract class AbstractKinesisRecordProcessor 
implements ShardRecordProce
     void setProcessingRecords(final boolean processingRecords) {
         this.processingRecords = processingRecords;
     }
+
+    protected static class KinesisBatchUnrecoverableException extends 
RuntimeException {
+        public KinesisBatchUnrecoverableException(final String message, final 
Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    /**
+     * contains:
+     * <ol>
+     *      <li>{@link ProcessSession} into which {@link FlowFile}s will be 
transferred
+     *      <li>{@link List} of {@link FlowFile}s to be output after all 
processing is complete
+     *      <li>{@link StopWatch} tracking how much time has been spent 
processing the current batch;
+     * </ol>
+     */
+    protected record BatchProcessingContext(ProcessSession session, 
List<FlowFile> flowFiles, StopWatch stopWatch) { }
 }
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
index 3d39305c5e..6f91db60fd 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
@@ -21,13 +21,11 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import 
org.apache.nifi.processors.aws.kinesis.stream.pause.RecordProcessorBlocker;
-import org.apache.nifi.util.StopWatch;
 import software.amazon.kinesis.retrieval.KinesisClientRecord;
 
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.format.DateTimeFormatter;
-import java.util.List;
 import java.util.Map;
 
 public class KinesisRecordProcessorRaw extends AbstractKinesisRecordProcessor {
@@ -40,29 +38,29 @@ public class KinesisRecordProcessorRaw extends 
AbstractKinesisRecordProcessor {
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
-                       final ProcessSession session, final StopWatch 
stopWatch) {
+    void processRecord(final KinesisClientRecord kinesisRecord, final 
BatchProcessingContext batchProcessingContext) {
         final String partitionKey = kinesisRecord.partitionKey();
         final String sequenceNumber = kinesisRecord.sequenceNumber();
         final Instant approximateArrivalTimestamp = 
kinesisRecord.approximateArrivalTimestamp();
         final ByteBuffer dataBuffer = kinesisRecord.data();
-        byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : 
new byte[0];
+        final byte[] data = dataBuffer != null ? new 
byte[dataBuffer.remaining()] : new byte[0];
         if (dataBuffer != null) {
             dataBuffer.get(data);
         }
 
-        FlowFile flowFile = session.create();
+        final ProcessSession session = batchProcessingContext.session();
+        final FlowFile flowFile = session.create();
         session.write(flowFile, out -> out.write(data));
 
         if (getLogger().isDebugEnabled()) {
             getLogger().debug("Sequence No: {}, Partition Key: {}, Data: {}", 
sequenceNumber, partitionKey, BASE_64_ENCODER.encodeToString(data));
         }
 
-        reportProvenance(session, flowFile, partitionKey, sequenceNumber, 
stopWatch);
+        reportProvenance(session, flowFile, partitionKey, sequenceNumber, 
batchProcessingContext.stopWatch());
 
         final Map<String, String> attributes = 
getDefaultAttributes(sequenceNumber, partitionKey, approximateArrivalTimestamp);
-        flowFile = session.putAllAttributes(flowFile, attributes);
+        final FlowFile attributedFlowFile = session.putAllAttributes(flowFile, 
attributes);
 
-        flowFiles.add(flowFile);
+        batchProcessingContext.flowFiles().add(attributedFlowFile);
     }
 }
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
index be52be9d4a..43fa88efa0 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
@@ -21,6 +21,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
+import 
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
 import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
 import 
org.apache.nifi.processors.aws.kinesis.stream.pause.RecordProcessorBlocker;
 import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
@@ -34,10 +35,10 @@ import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.PushBackRecordSet;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StopWatch;
 import software.amazon.kinesis.retrieval.KinesisClientRecord;
 
 import java.io.ByteArrayInputStream;
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -49,20 +50,19 @@ import java.util.List;
 import java.util.Map;
 
 public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor {
-    final RecordReaderFactory readerFactory;
-    final RecordSetWriterFactory writerFactory;
-    final Map<String, String> schemaRetrievalVariables;
-
-    private RecordSetWriter writer;
-    private OutputStream outputStream;
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final Map<String, String> schemaRetrievalVariables;
     private final RecordConverter recordConverter;
+    private final StateHandlerStrategy stateHandlerStrategy;
 
     public KinesisRecordProcessorRecord(final ProcessSessionFactory 
sessionFactory, final ComponentLog log, final String streamName,
                                         final String endpointPrefix, final 
String kinesisEndpoint,
                                         final long checkpointIntervalMillis, 
final long retryWaitMillis,
                                         final int numRetries, final 
DateTimeFormatter dateTimeFormatter,
                                         final RecordReaderFactory 
readerFactory, final RecordSetWriterFactory writerFactory,
-                                        final RecordConverter recordConverter, 
final RecordProcessorBlocker recordProcessorBlocker) {
+                                        final RecordConverter recordConverter, 
final RecordProcessorBlocker recordProcessorBlocker,
+                                        final SchemaDifferenceHandlingStrategy 
schemaDifferenceHandlingStrategy) {
         super(sessionFactory, log, streamName, endpointPrefix, 
kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis,
                 numRetries, dateTimeFormatter, recordProcessorBlocker);
         this.readerFactory = readerFactory;
@@ -70,56 +70,63 @@ public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor
 
         schemaRetrievalVariables = 
Collections.singletonMap(KINESIS_RECORD_SCHEMA_KEY, streamName);
         this.recordConverter = recordConverter;
+        this.stateHandlerStrategy = new 
StateHandlerStrategy(schemaDifferenceHandlingStrategy, 
this::initializeFlowFileState, this::completeFlowFileState);
     }
 
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        FlowFileState flowFileState;
+        while ((flowFileState = stateHandlerStrategy.pop()) != null) {
+            // this may happen if the previous processing has not been 
completed successfully, close the leftover state
+            closeSafe(flowFileState.asClosable(), "FlowFile State");
+        }
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
-                       final ProcessSession session, final StopWatch 
stopWatch) {
-        boolean firstOutputRecord = true;
-        int recordCount = 0;
-        final ByteBuffer dataBuffer = kinesisRecord.data();
-        byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : 
new byte[0];
-        if (dataBuffer != null) {
-            dataBuffer.get(data);
+    void finishProcessingRecords(final BatchProcessingContext 
batchProcessingContext) {
+        super.finishProcessingRecords(batchProcessingContext);
+        final List<FlowFile> flowFiles = batchProcessingContext.flowFiles();
+        FlowFileState flowFileState;
+        while ((flowFileState = stateHandlerStrategy.pop()) != null) {
+            if (!flowFiles.contains(flowFileState.flowFile)) {
+                // this is unexpected, flowFiles have been altered not in this 
class after the start of processing
+                throw new IllegalStateException("%s is not available in 
provided FlowFiles [%d]".formatted(flowFileState.flowFile, flowFiles.size()));
+            }
+            try {
+                completeFlowFileState(flowFileState, batchProcessingContext);
+            } catch (final FlowFileCompletionException e) {
+                handleFlowFileCompletionException(e, batchProcessingContext);
+            }
         }
+    }
+
+    @Override
+    void processRecord(final KinesisClientRecord kinesisRecord, final 
BatchProcessingContext batchProcessingContext) {
+        final byte[] data = getData(kinesisRecord);
 
-        FlowFile flowFile = null;
         try (final InputStream in = new ByteArrayInputStream(data);
              final RecordReader reader = 
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length, 
getLogger())
         ) {
             Record intermediateRecord;
             final PushBackRecordSet recordSet = new 
PushBackRecordSet(reader.createRecordSet());
             while ((intermediateRecord = recordSet.next()) != null) {
-                Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
-                if (flowFiles.isEmpty()) {
-                    flowFile = session.create();
-                    flowFiles.add(flowFile);
-
-                    // initialize the writer when the first record is read.
-                    createWriter(flowFile, session, outputRecord);
-                }
-
-                final WriteResult writeResult = writer.write(outputRecord);
-                recordCount += writeResult.getRecordCount();
-
-                // complete the FlowFile if there are no more incoming Kinesis 
Records and no more records in this RecordSet
-                if (lastRecord && !recordSet.isAnotherRecord()) {
-                    completeFlowFile(flowFiles, session, recordCount, 
writeResult, kinesisRecord, stopWatch);
+                FlowFileState flowFileState;
+                final Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
+                try {
+                    flowFileState = 
stateHandlerStrategy.getOrCreate(outputRecord, batchProcessingContext);
+                } catch (final FlowFileCompletionException e) {
+                    stateHandlerStrategy.drop(e.flowFileState.recordSchema);
+                    handleFlowFileCompletionException(e, 
batchProcessingContext);
+                    flowFileState = stateHandlerStrategy.create(outputRecord, 
batchProcessingContext);
                 }
-                firstOutputRecord = false;
+                flowFileState.write(outputRecord, kinesisRecord);
             }
         } catch (final MalformedRecordException | IOException | 
SchemaNotFoundException e) {
             // write raw Kinesis Record to the parse failure relationship
             getLogger().error("Failed to parse message from Kinesis Stream 
using configured Record Reader and Writer due to {}",
                     e.getLocalizedMessage(), e);
-            outputRawRecordOnException(firstOutputRecord, flowFile, flowFiles, 
session, data, kinesisRecord, e);
+            outputRawRecordOnException(batchProcessingContext.session(), data, 
kinesisRecord, e);
         }
 
         if (getLogger().isDebugEnabled()) {
@@ -128,63 +135,93 @@ public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor
         }
     }
 
-    private void createWriter(final FlowFile flowFile, final ProcessSession 
session, final Record outputRecord)
-            throws IOException, SchemaNotFoundException {
-
-        final RecordSchema readerSchema = outputRecord.getSchema();
-        final RecordSchema writeSchema = 
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
-        outputStream = session.write(flowFile);
-        writer = writerFactory.createWriter(getLogger(), writeSchema, 
outputStream, flowFile);
-        writer.beginRecordSet();
+    private void handleFlowFileCompletionException(final 
FlowFileCompletionException e, final BatchProcessingContext 
batchProcessingContext) {
+        if (!e.flowFileState.containsDataFromExactlyOneKinesisRecord()) {
+            throw new KinesisBatchUnrecoverableException("Not all 
KinesisClientRecords contained in FlowFile contents can be routed to Failure 
relationship", e);
+        }
+        dropFlowFileState(e.flowFileState, batchProcessingContext);
+        final KinesisClientRecord failedKinesisRecord = 
e.flowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+        final byte[] failedRecordData = getData(failedKinesisRecord);
+        outputRawRecordOnException(batchProcessingContext.session(), 
failedRecordData, failedKinesisRecord, e);
     }
 
-    private void completeFlowFile(final List<FlowFile> flowFiles, final 
ProcessSession session, final int recordCount,
-                                  final WriteResult writeResult, final 
KinesisClientRecord lastRecord, final StopWatch stopWatch)
-            throws IOException {
+    private static byte[] getData(final KinesisClientRecord kinesisRecord) {
+        final ByteBuffer dataBuffer = kinesisRecord.data();
+        final byte[] data = dataBuffer != null ? new 
byte[dataBuffer.remaining()] : new byte[0];
+        if (dataBuffer != null) {
+            dataBuffer.get(data);
+        }
+        return data;
+    }
 
+    /**
+     * Initializes the FlowFile state for the current processing. This 
includes creating a new FlowFile, initializing the RecordSetWriter, and setting 
up the output stream.
+     * In case of an exception during initialization, the FlowFile is removed 
from the session and the resources are closed properly.
+     */
+    private FlowFileState initializeFlowFileState(final Record record, final 
BatchProcessingContext batchProcessingContext) throws IOException, 
SchemaNotFoundException {
+        final ProcessSession session = batchProcessingContext.session();
+        FlowFile flowFile = null;
+        OutputStream outputStream = null;
+        RecordSetWriter writer = null;
         try {
-            writer.finishRecordSet();
-        } catch (IOException e) {
-            getLogger().error("Failed to finish record output due to {}", 
e.getLocalizedMessage(), e);
-            session.remove(flowFiles.get(0));
-            flowFiles.remove(0);
-            throw e;
-        } finally {
-            try {
-                writer.close();
-                outputStream.close();
-            } catch (final IOException e) {
-                getLogger().warn("Failed to close Record Writer due to {}", 
e.getLocalizedMessage(), e);
+            flowFile = session.create();
+            final RecordSchema newReadSchema = record.getSchema();
+            final RecordSchema writeSchema = 
writerFactory.getSchema(schemaRetrievalVariables, newReadSchema);
+            outputStream = session.write(flowFile);
+            writer = writerFactory.createWriter(getLogger(), writeSchema, 
outputStream, flowFile);
+            writer.beginRecordSet();
+            batchProcessingContext.flowFiles().add(flowFile);
+        } catch (final Exception e) {
+            if (flowFile != null) {
+                session.remove(flowFile);
             }
+            closeSafe(writer, "Record Writer");
+            closeSafe(outputStream, "Output Stream");
+            throw e;
         }
+        return new FlowFileState(flowFile, writer, outputStream, 
record.getSchema(), getLogger());
+    }
 
-        reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
+    private void completeFlowFileState(final FlowFileState flowFileState, 
final BatchProcessingContext batchProcessingContext)
+            throws FlowFileCompletionException {
+        final ProcessSession session = batchProcessingContext.session();
+        final List<FlowFile> flowFiles = batchProcessingContext.flowFiles();
+        if (flowFileState.isFlowFileEmpty()) {
+            dropFlowFileState(flowFileState, batchProcessingContext);
+            return;
+        }
+        try {
+            flowFileState.writer.finishRecordSet();
+            closeSafe(flowFileState.asClosable(), "FlowFile State");
+            reportProvenance(session, flowFileState.flowFile, null, null, 
batchProcessingContext.stopWatch());
 
-        final Map<String, String> attributes = 
getDefaultAttributes(lastRecord);
-        attributes.put("record.count", String.valueOf(recordCount));
-        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-        attributes.putAll(writeResult.getAttributes());
-        flowFiles.set(0, session.putAllAttributes(flowFiles.get(0), 
attributes));
+            final Map<String, String> attributes = 
getDefaultAttributes(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+            attributes.put("record.count", 
String.valueOf(flowFileState.lastSuccessfulWriteInfo.writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), 
flowFileState.writer.getMimeType());
+            
attributes.putAll(flowFileState.lastSuccessfulWriteInfo.writeResult.getAttributes());
+            final int flowFileIndex = 
flowFiles.indexOf(flowFileState.flowFile);
+            flowFiles.set(flowFileIndex, 
session.putAllAttributes(flowFileState.flowFile, attributes));
+        } catch (final IOException e) {
+            dropFlowFileState(flowFileState, batchProcessingContext);
+            final String message = "Failed to complete a FlowFile containing 
records from Stream Name: %s, Shard Id: %s, Sequence/Subsequence No range: 
[%s/%d, %s/%d)".formatted(
+                    getStreamName(),
+                    getKinesisShardId(),
+                    
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                    
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.subSequenceNumber(),
+                    
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                    
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.subSequenceNumber()
+            );
+            throw new FlowFileCompletionException(message, e, flowFileState);
+        }
+    }
 
-        writer = null;
-        outputStream = null;
+    private void dropFlowFileState(final FlowFileState flowFileState, final 
BatchProcessingContext batchProcessingContext) {
+        closeSafe(flowFileState.asClosable(), "FlowFile State");
+        batchProcessingContext.session().remove(flowFileState.flowFile);
+        batchProcessingContext.flowFiles().remove(flowFileState.flowFile);
     }
 
-    private void outputRawRecordOnException(final boolean firstOutputRecord, 
final FlowFile flowFile,
-                                            final List<FlowFile> flowFiles, 
final ProcessSession session,
-                                            final byte[] data, final 
KinesisClientRecord kinesisRecord, final Exception e) {
-        if (firstOutputRecord && flowFile != null) {
-            session.remove(flowFile);
-            flowFiles.remove(0);
-            if (writer != null) {
-                try {
-                    writer.close();
-                    outputStream.close();
-                } catch (IOException ioe) {
-                    getLogger().warn("Failed to close Record Writer due to 
{}", ioe.getLocalizedMessage(), ioe);
-                }
-            }
-        }
+    private void outputRawRecordOnException(final ProcessSession session, 
final byte[] data, final KinesisClientRecord kinesisRecord, final Exception e) {
         FlowFile failed = session.create();
         session.write(failed, o -> o.write(data));
         final Map<String, String> attributes = 
getDefaultAttributes(kinesisRecord);
@@ -200,4 +237,71 @@ public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor
         final Instant approximateArrivalTimestamp = 
kinesisRecord.approximateArrivalTimestamp();
         return getDefaultAttributes(sequenceNumber, partitionKey, 
approximateArrivalTimestamp);
     }
+
+    private void closeSafe(final Closeable closeable, final String 
closeableName) {
+        closeSafe(closeable, closeableName, getLogger());
+    }
+
+    private static void closeSafe(final Closeable closeable, final String 
closeableName, final ComponentLog logger) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                logger.warn("Failed to close {}", closeableName, e);
+            }
+        }
+    }
+
+    record SuccessfulWriteInfo(KinesisClientRecord kinesisRecord, WriteResult 
writeResult) { }
+
+    static class FlowFileState {
+        private final FlowFile flowFile;
+        private final RecordSetWriter writer;
+        private final OutputStream outputStream;
+        private final RecordSchema recordSchema;
+        private final ComponentLog componentLog;
+        private SuccessfulWriteInfo firstSuccessfulWriteInfo;
+        private SuccessfulWriteInfo lastSuccessfulWriteInfo;
+
+        private FlowFileState(final FlowFile flowFile, final RecordSetWriter 
writer, final OutputStream outputStream, final RecordSchema recordSchema, final 
ComponentLog componentLog) {
+            this.flowFile = flowFile;
+            this.writer = writer;
+            this.outputStream = outputStream;
+            this.recordSchema = recordSchema;
+            this.componentLog = componentLog;
+        }
+
+        private boolean isFlowFileEmpty() {
+            return lastSuccessfulWriteInfo == null;
+        }
+
+        @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+        private boolean containsDataFromExactlyOneKinesisRecord() {
+            return !isFlowFileEmpty() && 
firstSuccessfulWriteInfo.kinesisRecord == lastSuccessfulWriteInfo.kinesisRecord;
+        }
+
+        private void write(final Record outputRecord, final 
KinesisClientRecord kinesisRecord) throws IOException {
+            final WriteResult writeResult = writer.write(outputRecord);
+            firstSuccessfulWriteInfo = firstSuccessfulWriteInfo == null
+                    ? new SuccessfulWriteInfo(kinesisRecord, writeResult)
+                    : firstSuccessfulWriteInfo;
+            lastSuccessfulWriteInfo = new SuccessfulWriteInfo(kinesisRecord, 
writeResult);
+        }
+
+        private Closeable asClosable() {
+            return () -> {
+                closeSafe(writer, "Record Writer", componentLog);
+                closeSafe(outputStream, "Output Stream", componentLog);
+            };
+        }
+    }
+
+    static class FlowFileCompletionException extends Exception {
+        private final FlowFileState flowFileState;
+
+        private FlowFileCompletionException(final String message, final 
Throwable cause, final FlowFileState flowFileState) {
+            super(message, cause);
+            this.flowFileState = flowFileState;
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/StateHandlerStrategy.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/StateHandlerStrategy.java
new file mode 100644
index 0000000000..a69c68e959
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/StateHandlerStrategy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record;
+
+import 
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor.BatchProcessingContext;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRecord.FlowFileCompletionException;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRecord.FlowFileState;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * StateHandlerStrategy is responsible for managing the state of FlowFiles 
created for Records processed in a single batch. In general this class decides 
what should happen when a new RecordSchema
+ * is encountered — whether previous State should be completed and new state 
created or only a new state should be created.
+ */
+class StateHandlerStrategy {
+
+    private final SchemaDifferenceHandlingStrategy strategy;
+    private final StateInitializerAction stateInitializerAction;
+    private final StateFinalizerAction stateFinalizerAction;
+    private final Map<RecordSchema, FlowFileState> activeStateMap = new 
HashMap<>();
+
+    StateHandlerStrategy(final SchemaDifferenceHandlingStrategy strategy, 
final StateInitializerAction stateInitializerAction, final StateFinalizerAction 
stateFinalizerAction) {
+        this.strategy = strategy;
+        this.stateInitializerAction = stateInitializerAction;
+        this.stateFinalizerAction = stateFinalizerAction;
+    }
+
+    FlowFileState getOrCreate(final Record record, final 
BatchProcessingContext flowFileContext) throws FlowFileCompletionException, 
IOException, SchemaNotFoundException {
+        final FlowFileState previousState = 
activeStateMap.get(record.getSchema());
+        if (previousState != null) {
+            return previousState;
+        }
+        if (strategy == SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE) {
+            // for create flow file strategy we need to complete the possible 
previous state before creating a new one
+            completeAllAvailableFlowFileStates(flowFileContext);
+        }
+        return create(record, flowFileContext);
+    }
+
+    private void completeAllAvailableFlowFileStates(BatchProcessingContext 
flowFileContext) throws FlowFileCompletionException {
+        FlowFileState previousStateForDifferentSchema;
+        while ((previousStateForDifferentSchema = pop()) != null) {
+            stateFinalizerAction.complete(previousStateForDifferentSchema, 
flowFileContext);
+        }
+    }
+
+    FlowFileState create(final Record record, final BatchProcessingContext 
flowFileContext) throws IOException, SchemaNotFoundException {
+        final FlowFileState previousState = 
activeStateMap.get(record.getSchema());
+        if (previousState != null) {
+            throw new IllegalStateException(
+                "FlowFile state already exists for schema: " + 
record.getSchema() + ". This should not happen in a batch processing context."
+            );
+        }
+        if (strategy == SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE && 
!activeStateMap.isEmpty()) {
+            throw new IllegalStateException(
+                "An uncompleted FlowFileState found while using 
SchemaDifferenceHandlingStrategy: " +
+                        SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE + ". 
Cannot create a new state until previous is completed or dropped."
+            );
+        }
+        final FlowFileState newState = stateInitializerAction.init(record, 
flowFileContext);
+        activeStateMap.put(record.getSchema(), newState);
+        return newState;
+    }
+
+    FlowFileState pop() {
+        final Iterator<Map.Entry<RecordSchema, FlowFileState>> iterator = 
activeStateMap.entrySet().iterator();
+        if (!iterator.hasNext()) {
+            return null;
+        }
+        return activeStateMap.remove(iterator.next().getKey());
+    }
+
+    void drop(final RecordSchema recordSchema) {
+        activeStateMap.remove(recordSchema);
+    }
+
+    interface StateInitializerAction {
+        FlowFileState init(Record record, BatchProcessingContext 
flowFileContext) throws IOException, SchemaNotFoundException;
+    }
+
+    interface StateFinalizerAction {
+        void complete(FlowFileState flowFile, BatchProcessingContext 
flowFileContext) throws FlowFileCompletionException;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
index 5d24264538..c6fca27f00 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
@@ -16,14 +16,11 @@
  */
 package org.apache.nifi.processors.aws.kinesis.stream.record;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
 import 
org.apache.nifi.processors.aws.kinesis.stream.pause.StandardRecordProcessorBlocker;
 import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.SharedSessionState;
-import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterEach;
@@ -40,7 +37,6 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
 import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
 
 import java.time.format.DateTimeFormatter;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,7 +73,7 @@ public class TestAbstractKinesisRecordProcessor {
         fixture = new AbstractKinesisRecordProcessor(processSessionFactory, 
runner.getLogger(), "kinesis-test",
                 "endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER, 
NOOP_RECORD_PROCESSOR_BLOCKER) {
             @Override
-            void processRecord(List<FlowFile> flowFiles, KinesisClientRecord 
kinesisRecord, boolean lastRecord, ProcessSession session, StopWatch stopWatch) 
{
+            void processRecord(KinesisClientRecord kinesisRecord, 
BatchProcessingContext batchProcessingContext) {
                 // intentionally blank
             }
         };
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
index 7c8baf57c7..fa8377b346 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
@@ -16,10 +16,12 @@
  */
 package org.apache.nifi.processors.aws.kinesis.stream.record;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessSessionFactory;
+import 
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
 import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
 import 
org.apache.nifi.processors.aws.kinesis.stream.pause.StandardRecordProcessorBlocker;
 import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterIdentity;
@@ -36,8 +38,14 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import software.amazon.kinesis.exceptions.InvalidStateException;
 import software.amazon.kinesis.exceptions.ShutdownException;
 import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@@ -56,6 +64,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -103,11 +112,13 @@ public class TestKinesisRecordProcessorRecord {
         runner.setProperty(writer, "output-grouping", "output-oneline");
         runner.enableControllerService(writer);
         runner.setProperty(ConsumeKinesisStream.RECORD_WRITER, 
"record-writer");
+    }
 
+    private KinesisRecordProcessorRecord defaultFixtureWithStrategy(final 
SchemaDifferenceHandlingStrategy strategy) {
         // default test fixture will try operations twice with very little 
wait in between
-        fixture = new KinesisRecordProcessorRecord(processSessionFactory, 
runner.getLogger(), "kinesis-test",
+        return new KinesisRecordProcessorRecord(processSessionFactory, 
runner.getLogger(), "kinesis-test",
                 "endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER,
-                reader, writer, new RecordConverterIdentity(), 
NOOP_RECORD_PROCESSOR_BLOCKER);
+                reader, writer, new RecordConverterIdentity(), 
NOOP_RECORD_PROCESSOR_BLOCKER, strategy);
     }
 
     @AfterEach
@@ -116,8 +127,10 @@ public class TestKinesisRecordProcessorRecord {
         reset(checkpointer, kinesisRecord, processSessionFactory);
     }
 
-    @Test
-    public void testProcessRecordsEmpty() {
+    @ParameterizedTest
+    @EnumSource(SchemaDifferenceHandlingStrategy.class)
+    public void testProcessRecordsEmpty(final SchemaDifferenceHandlingStrategy 
strategy) {
+        fixture = defaultFixtureWithStrategy(strategy);
         final ProcessRecordsInput processRecordsInput = 
ProcessRecordsInput.builder()
                 .records(Collections.emptyList())
                 .checkpointer(checkpointer)
@@ -137,27 +150,33 @@ public class TestKinesisRecordProcessorRecord {
         session.assertNotRolledBack();
     }
 
-    @Test
-    public void testProcessRecordsNoCheckpoint() {
-        processMultipleRecordsAssertProvenance(false, false);
+    private static Stream<Arguments.ArgumentSet> testProcessRecordsArgs() {
+        final List<Pair<String, Boolean>> endpointOverriden = List.of(
+                Pair.of("Overriden Endpoint", true),
+                Pair.of("Default Endpoint", false)
+        );
+        final List<Pair<String, Boolean>> usingWrapper = List.of(
+                Pair.of("Using Schema Wrapper", true),
+                Pair.of("Default Schema", false)
+        );
+        final List<Pair<String, SchemaDifferenceHandlingStrategy>> 
schemaChangeStrategy = Arrays.stream(SchemaDifferenceHandlingStrategy.values())
+                .map(strategy -> Pair.of(strategy.name(), strategy))
+                .toList();
+        return endpointOverriden.stream()
+                .flatMap(endpoint -> schemaChangeStrategy.stream()
+                        .flatMap(strategy -> usingWrapper.stream()
+                                .map(wrapper -> Arguments.argumentSet(
+                                        String.format("%s, %s, %s", 
endpoint.getLeft(), wrapper.getLeft(), strategy.getLeft()),
+                                        endpoint.getRight(),
+                                        wrapper.getRight(),
+                                        strategy.getRight()
+                                ))));
     }
 
-    @Test
-    public void testProcessRecordsWithEndpointOverride() {
-        processMultipleRecordsAssertProvenance(true, false);
-    }
-
-    @Test
-    public void testProcessWrappedRecordsNoCheckpoint() {
-        processMultipleRecordsAssertProvenance(false, true);
-    }
-
-    @Test
-    public void testProcessWrappedRecordsWithEndpointOverride() {
-        processMultipleRecordsAssertProvenance(true, true);
-    }
-
-    private void processMultipleRecordsAssertProvenance(final boolean 
endpointOverridden, final boolean useWrapper) {
+    @ParameterizedTest
+    @MethodSource("testProcessRecordsArgs")
+    void processMultipleRecordsAssertProvenance(final boolean 
endpointOverridden, final boolean useWrapper, final 
SchemaDifferenceHandlingStrategy strategy) {
+        fixture = defaultFixtureWithStrategy(strategy);
         final Instant referenceInstant = 
Instant.parse("2021-01-01T00:00:00.000Z");
         final Date firstDate = Date.from(referenceInstant.minus(1, 
ChronoUnit.MINUTES));
         final Date secondDate = Date.from(referenceInstant);
@@ -189,7 +208,7 @@ public class TestKinesisRecordProcessorRecord {
         final String transitUriPrefix = endpointOverridden ? 
"https://another-endpoint.com:8443"; : "http://endpoint-prefix.amazonaws.com";;
         fixture = new KinesisRecordProcessorRecord(processSessionFactory, 
runner.getLogger(), "kinesis-test",
                 "endpoint-prefix", transitUriPrefix, 10_000L, 1L, 2, 
DATE_TIME_FORMATTER,
-                reader, writer, useWrapper ? new RecordConverterWrapper() : 
new RecordConverterIdentity(), NOOP_RECORD_PROCESSOR_BLOCKER);
+                reader, writer, useWrapper ? new RecordConverterWrapper() : 
new RecordConverterIdentity(), NOOP_RECORD_PROCESSOR_BLOCKER, strategy);
 
         // skip checkpoint
         fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() + 
10_000L);
@@ -233,8 +252,10 @@ public class TestKinesisRecordProcessorRecord {
                 
"shardedSequenceNumber":"200000000000000000000","partitionKey":"partition-2","approximateArrival":1609459200000},"value":{"record":"2"}}""";
     }
 
-    @Test
-    public void testProcessPoisonPillRecordButNoRawOutputWithCheckpoint() 
throws ShutdownException, InvalidStateException {
+    @ParameterizedTest
+    @EnumSource(SchemaDifferenceHandlingStrategy.class)
+    public void testProcessPoisonPillRecordButNoRawOutputWithCheckpoint(final 
SchemaDifferenceHandlingStrategy strategy) throws ShutdownException, 
InvalidStateException {
+        fixture = defaultFixtureWithStrategy(strategy);
         final ProcessRecordsInput processRecordsInput = 
ProcessRecordsInput.builder()
                 .records(Arrays.asList(
                         
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
@@ -284,22 +305,70 @@ public class TestKinesisRecordProcessorRecord {
         session.assertNotRolledBack();
     }
 
-    @Test
-    public void testProcessUnparsableRecordWithRawOutputWithCheckpoint() 
throws ShutdownException, InvalidStateException {
+    private static Stream<Arguments.ArgumentSet> unparsableRecordsLists() {
+        final KinesisClientRecord unparsableRecordMock = 
mock(KinesisClientRecord.class);
+        final KinesisClientRecord parsableRecord1 = 
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
+                .partitionKey("partition-1")
+                .sequenceNumber("1")
+                
.data(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8)))
+                .build();
+        final KinesisClientRecord parsableRecord3 = 
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
+                .partitionKey("partition-3")
+                .sequenceNumber("3")
+                
.data(ByteBuffer.wrap("{\"record\":\"3\"}".getBytes(StandardCharsets.UTF_8)))
+                .build();
+        final List<List<KinesisClientRecord>> recordLists = List.of(
+                Arrays.asList(
+                        unparsableRecordMock,
+                        parsableRecord1,
+                        parsableRecord3
+                ),
+                Arrays.asList(
+                        parsableRecord1,
+                        unparsableRecordMock,
+                        parsableRecord3
+                ),
+                Arrays.asList(
+                        parsableRecord1,
+                        parsableRecord3,
+                        unparsableRecordMock
+                )
+        );
+        return Arrays.stream(SchemaDifferenceHandlingStrategy.values())
+                .flatMap(strategy -> recordLists.stream().map(recordList -> 
Pair.of(strategy, recordList)))
+                .map(pair -> {
+                    final SchemaDifferenceHandlingStrategy strategy = 
pair.getLeft();
+                    final List<KinesisClientRecord> inputRecords = 
pair.getRight();
+                    String position = "Middle";
+                    if (inputRecords.getFirst() == unparsableRecordMock) {
+                        position = "Beginning";
+                    }
+                    if (inputRecords.getLast() == unparsableRecordMock) {
+                        position = "End";
+                    }
+                    return Arguments.argumentSet(
+                            String.format("Strategy=%s, Unparsable Record 
Position=%s", strategy, position),
+                            inputRecords,
+                            unparsableRecordMock,
+                            strategy
+                    );
+                })
+                .peek(it -> {
+                    parsableRecord1.data().rewind();
+                    parsableRecord3.data().rewind();
+                    Mockito.reset(unparsableRecordMock);
+                });
+    }
+
+    @ParameterizedTest
+    @MethodSource("unparsableRecordsLists")
+    void testProcessUnparsableRecordWithRawOutputWithCheckpoint(
+            final List<KinesisClientRecord> inputRecords,
+            final KinesisClientRecord kinesisRecord,
+            final SchemaDifferenceHandlingStrategy strategy) throws 
ShutdownException, InvalidStateException {
+        fixture = defaultFixtureWithStrategy(strategy);
         final ProcessRecordsInput processRecordsInput = 
ProcessRecordsInput.builder()
-                .records(Arrays.asList(
-                        
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
-                                .partitionKey("partition-1")
-                                .sequenceNumber("1")
-                                
.data(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8)))
-                                .build(),
-                        kinesisRecord,
-                        
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
-                                .partitionKey("partition-3")
-                                .sequenceNumber("3")
-                                
.data(ByteBuffer.wrap("{\"record\":\"3\"}".getBytes(StandardCharsets.UTF_8)))
-                                .build()
-                ))
+                .records(inputRecords)
                 .checkpointer(checkpointer)
                 .cacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
                 .cacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
@@ -343,6 +412,100 @@ public class TestKinesisRecordProcessorRecord {
         session.assertNotRolledBack();
     }
 
+    @ParameterizedTest
+    @EnumSource(SchemaDifferenceHandlingStrategy.class)
+    void testProcessMultipleRecordInSingleKinesisRecord(final 
SchemaDifferenceHandlingStrategy strategy) throws ShutdownException, 
InvalidStateException {
+        fixture = defaultFixtureWithStrategy(strategy);
+        
testFlowFileContents(List.of("[{\"record\":1},{\"record\":2},{\"record\":3}]"), 
List.of(EmittedFlowFile.of("{\"record\":1}\n{\"record\":2}\n{\"record\":3}", 
3)));
+    }
+
+    @Nested
+    class StateHandlerStrategyTest {
+
+        private static List<String> inputJsonRecords() {
+            return List.of(
+                    "{\"colA\":1}", // inferred to ["colA":int]
+                    "{\"colA\":%d}".formatted(Long.MAX_VALUE), // inferred to 
["colA":long]
+                    "{\"colA\":3}"
+            );
+        }
+
+        @Test
+        void testProcessIncompatibleSchemaKinesisRecordsStrategyGrouping() 
throws ShutdownException, InvalidStateException {
+            fixture = 
defaultFixtureWithStrategy(SchemaDifferenceHandlingStrategy.GROUP_RECORDS);
+            final List<String> inputJsonRecords = inputJsonRecords();
+            testFlowFileContents(
+                    inputJsonRecords,
+                    List.of(
+                            EmittedFlowFile.of(String.join("\n", 
inputJsonRecords.get(0), inputJsonRecords.get(2)), 2),
+                            EmittedFlowFile.of(inputJsonRecords.get(1), 1)
+                    )
+            );
+        }
+
+        @Test
+        void testProcessIncompatibleSchemaKinesisRecordsStrategyRolling() 
throws ShutdownException, InvalidStateException {
+            fixture = 
defaultFixtureWithStrategy(SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE);
+            final List<String> inputJsonRecords = inputJsonRecords();
+            testFlowFileContents(
+                    inputJsonRecords,
+                    List.of(
+                            EmittedFlowFile.of(inputJsonRecords.get(0), 1),
+                            EmittedFlowFile.of(inputJsonRecords.get(1), 1),
+                            EmittedFlowFile.of(inputJsonRecords.get(2), 1)
+                    )
+            );
+        }
+    }
+
+    void testFlowFileContents(final List<String> kinesisRecordJsonContents, 
final List<EmittedFlowFile> emittedFlowFiles) throws ShutdownException, 
InvalidStateException {
+        final List<KinesisClientRecord> kinesisRecords = 
kinesisRecordJsonContents.stream()
+                .map(json -> KinesisClientRecord.builder()
+                        .approximateArrivalTimestamp(null)
+                        .partitionKey("partition-1")
+                        .sequenceNumber("1")
+                        
.data(ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8)))
+                        .build())
+                .toList();
+        final ProcessRecordsInput processRecordsInput = 
ProcessRecordsInput.builder()
+                .records(kinesisRecords)
+                .checkpointer(checkpointer)
+                .cacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
+                .cacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
+                .millisBehindLatest(100L)
+                .build();
+
+        
when(kinesisRecord.data()).thenReturn(ByteBuffer.wrap("invalid-json".getBytes(StandardCharsets.UTF_8)));
+        when(kinesisRecord.partitionKey()).thenReturn("unparsable-partition");
+        when(kinesisRecord.sequenceNumber()).thenReturn("unparsable-sequence");
+        when(kinesisRecord.approximateArrivalTimestamp()).thenReturn(null);
+
+        fixture.setKinesisShardId("test-shard");
+
+        when(processSessionFactory.createSession()).thenReturn(session);
+        fixture.processRecords(processRecordsInput);
+        verify(processSessionFactory, times(1)).createSession();
+
+        // check non-poison pill records are output successfully
+        session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 
emittedFlowFiles.size());
+        final List<MockFlowFile> flowFiles = 
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
+        for (int i = 0; i < emittedFlowFiles.size(); i++) {
+            final EmittedFlowFile emittedFlowFile = emittedFlowFiles.get(i);
+            assertFlowFile(flowFiles.get(i), null, "partition-1", "1", 
"test-shard", emittedFlowFile.content, emittedFlowFile.recordCount);
+        }
+
+        verify(checkpointer, times(1)).checkpoint();
+
+        session.assertCommitted();
+        session.assertNotRolledBack();
+    }
+
+    private record EmittedFlowFile(String content, int recordCount) {
+        public static EmittedFlowFile of(final String content, final int 
recordCount) {
+            return new EmittedFlowFile(content, recordCount);
+        }
+    }
+
     private void assertFlowFile(final MockFlowFile flowFile, final Date 
approxTimestamp, final String partitionKey,
                                 final String sequenceNumber, final String 
shard, final String content, final int recordCount) {
         if (approxTimestamp != null) {

Reply via email to