This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9e8c914630 NIFI-14696 Improved ConsumeKinesisStream handling of Record
Schema Differences (#10053)
9e8c914630 is described below
commit 9e8c914630bbc04c6fc0e5ab1514dde05f535a30
Author: Dariusz Seweryn <[email protected]>
AuthorDate: Tue Aug 5 22:59:33 2025 +0200
NIFI-14696 Improved ConsumeKinesisStream handling of Record Schema
Differences (#10053)
Signed-off-by: David Handermann <[email protected]>
---
.../property/SchemaDifferenceHandlingStrategy.java | 49 ++++
.../aws/kinesis/stream/ConsumeKinesisStream.java | 17 +-
.../record/AbstractKinesisRecordProcessor.java | 46 ++--
.../stream/record/KinesisRecordProcessorRaw.java | 16 +-
.../record/KinesisRecordProcessorRecord.java | 268 ++++++++++++++-------
.../stream/record/StateHandlerStrategy.java | 105 ++++++++
.../record/TestAbstractKinesisRecordProcessor.java | 6 +-
.../record/TestKinesisRecordProcessorRecord.java | 245 +++++++++++++++----
8 files changed, 597 insertions(+), 155 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/SchemaDifferenceHandlingStrategy.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/SchemaDifferenceHandlingStrategy.java
new file mode 100644
index 0000000000..ef9c647c31
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/SchemaDifferenceHandlingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum SchemaDifferenceHandlingStrategy implements DescribedValue {
+ CREATE_FLOW_FILE("Create FlowFile", "Create a new FlowFile for each record
with a different schema. The previous FlowFile will be completed with the
records that have the previous schema." +
+ " Emitted FlowFiles will contain continuous record sequences."),
+ GROUP_RECORDS("Group Records By Schema", "Group records with the same
schema into a single FlowFile. If a record with a different schema is
encountered, a new FlowFile will be created for" +
+ " the new schema. Emitted FlowFiles may contain non-sequential
records. This strategy is useful when the schema changes frequently and highest
performance is required.");
+
+ private final String displayName;
+ private final String description;
+
+ SchemaDifferenceHandlingStrategy(final String displayName, final String
description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
index 7d1b8738c5..9cf3dd8e9e 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
@@ -49,6 +49,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
import org.apache.nifi.processors.aws.kinesis.property.OutputStrategy;
import
org.apache.nifi.processors.aws.kinesis.stream.pause.StandardRecordProcessorBlocker;
import
org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
@@ -319,6 +320,15 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
.dependsOn(RECORD_WRITER)
.build();
+ public static final PropertyDescriptor
FLOW_FILE_HANDLING_ON_SCHEMA_CHANGE_STRATEGY = new PropertyDescriptor.Builder()
+ .name("FlowFile Handling On Schema Difference")
+ .description("The strategy used when records in a Kinesis Stream
change their schema in a single batch.")
+ .required(true)
+ .defaultValue(SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE)
+ .allowableValues(SchemaDifferenceHandlingStrategy.class)
+ .dependsOn(RECORD_WRITER)
+ .build();
+
public static final Relationship REL_PARSE_FAILURE = new
Relationship.Builder()
.name("parse.failure")
.description("If a message from Kinesis cannot be parsed using the
configured Record Reader" +
@@ -333,6 +343,7 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
RECORD_READER,
RECORD_WRITER,
OUTPUT_STRATEGY,
+ FLOW_FILE_HANDLING_ON_SCHEMA_CHANGE_STRATEGY,
REGION,
ENDPOINT_OVERRIDE,
DYNAMODB_ENDPOINT_OVERRIDE,
@@ -727,12 +738,14 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
final RecordConverter recordConverter =
OutputStrategy.USE_WRAPPER == outputStrategy
? new RecordConverterWrapper()
: new RecordConverterIdentity();
+ final SchemaDifferenceHandlingStrategy
schemaDifferenceHandlingStrategy =
context.getProperty(FLOW_FILE_HANDLING_ON_SCHEMA_CHANGE_STRATEGY)
+
.asAllowableValue(SchemaDifferenceHandlingStrategy.class);
return new KinesisRecordProcessorRecord(
sessionFactory, getLogger(), getStreamName(context),
getEndpointPrefix(context),
getKinesisEndpoint(context).orElse(null),
getCheckpointIntervalMillis(context),
getRetryWaitMillis(context), getNumRetries(context),
getDateTimeFormatter(context),
- getReaderFactory(context), getWriterFactory(context),
recordConverter, recordProcessorBlocker
- );
+ getReaderFactory(context), getWriterFactory(context),
recordConverter, recordProcessorBlocker,
+ schemaDifferenceHandlingStrategy);
} else {
return new KinesisRecordProcessorRaw(
sessionFactory, getLogger(), getStreamName(context),
getEndpointPrefix(context),
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
index e2da8e9297..014b91f756 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
@@ -133,8 +133,10 @@ public abstract class AbstractKinesisRecordProcessor
implements ShardRecordProce
final StopWatch stopWatch = new StopWatch(true);
session = sessionFactory.createSession();
+ final BatchProcessingContext batchProcessingContext = new
BatchProcessingContext(session, flowFiles, stopWatch);
startProcessingRecords();
- final int recordsTransformed =
processRecordsWithRetries(records, flowFiles, session, stopWatch);
+ final int recordsTransformed =
processRecordsWithRetries(records, batchProcessingContext);
+ finishProcessingRecords(batchProcessingContext);
transferTo(ConsumeKinesisStream.REL_SUCCESS, session,
records.size(), recordsTransformed, flowFiles);
session.commitAsync(() -> {
@@ -157,14 +159,14 @@ public abstract class AbstractKinesisRecordProcessor
implements ShardRecordProce
processingRecords = true;
}
- private int processRecordsWithRetries(final List<KinesisClientRecord>
records, final List<FlowFile> flowFiles,
- final ProcessSession session, final
StopWatch stopWatch) {
+ void finishProcessingRecords(final BatchProcessingContext
batchProcessingContext) { }
+
+ private int processRecordsWithRetries(final List<KinesisClientRecord>
records, final BatchProcessingContext batchProcessingContext) {
int recordsTransformed = 0;
- for (int r = 0; r < records.size(); r++) {
- final KinesisClientRecord kinesisRecord = records.get(r);
+ for (final KinesisClientRecord kinesisRecord : records) {
boolean processedSuccessfully = false;
for (int i = 0; !processedSuccessfully && i < numRetries; i++) {
- processedSuccessfully = attemptProcessRecord(flowFiles,
kinesisRecord, r == records.size() - 1, session, stopWatch);
+ processedSuccessfully = attemptProcessRecord(kinesisRecord,
batchProcessingContext);
}
if (processedSuccessfully) {
@@ -177,12 +179,13 @@ public abstract class AbstractKinesisRecordProcessor
implements ShardRecordProce
return recordsTransformed;
}
- private boolean attemptProcessRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
- final ProcessSession session, final
StopWatch stopWatch) {
+ private boolean attemptProcessRecord(final KinesisClientRecord
kinesisRecord, final BatchProcessingContext batchProcessingContext) {
boolean processedSuccessfully = false;
try {
- processRecord(flowFiles, kinesisRecord, lastRecord, session,
stopWatch);
+ processRecord(kinesisRecord, batchProcessingContext);
processedSuccessfully = true;
+ } catch (final KinesisBatchUnrecoverableException e) {
+ throw e;
} catch (final Exception e) {
log.error("Caught Exception while processing Kinesis record {}",
kinesisRecord, e);
@@ -200,16 +203,11 @@ public abstract class AbstractKinesisRecordProcessor
implements ShardRecordProce
/**
* Process an individual {@link Record} and serialise to {@link FlowFile}
*
- * @param flowFiles {@link List} of {@link FlowFile}s to be output after
all processing is complete
* @param kinesisRecord the Kinesis {@link Record} to be processed
- * @param lastRecord whether this is the last {@link Record} to be
processed in this batch
- * @param session {@link ProcessSession} into which {@link FlowFile}s will
be transferred
- * @param stopWatch {@link StopWatch} tracking how much time has been
spent processing the current batch
- *
+ * @param batchProcessingContext the {@link BatchProcessingContext} for
the current batch of records being processed, containing the session, flow
files and stopwatch
* @throws RuntimeException if there are any unhandled Exceptions that
should be retried
*/
- abstract void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
- final ProcessSession session, final StopWatch
stopWatch);
+ abstract void processRecord(final KinesisClientRecord kinesisRecord, final
BatchProcessingContext batchProcessingContext) throws RuntimeException;
void reportProvenance(final ProcessSession session, final FlowFile
flowFile, final String partitionKey,
final String sequenceNumber, final StopWatch stopWatch) {
@@ -345,4 +343,20 @@ public abstract class AbstractKinesisRecordProcessor
implements ShardRecordProce
void setProcessingRecords(final boolean processingRecords) {
this.processingRecords = processingRecords;
}
+
+ protected static class KinesisBatchUnrecoverableException extends
RuntimeException {
+ public KinesisBatchUnrecoverableException(final String message, final
Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * contains:
+ * <ol>
+ * <li>{@link ProcessSession} into which {@link FlowFile}s will be
transferred
+ * <li>{@link List} of {@link FlowFile}s to be output after all
processing is complete
+ * <li>{@link StopWatch} tracking how much time has been spent
processing the current batch;
+ * </ol>
+ */
+ protected record BatchProcessingContext(ProcessSession session,
List<FlowFile> flowFiles, StopWatch stopWatch) { }
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
index 3d39305c5e..6f91db60fd 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRaw.java
@@ -21,13 +21,11 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import
org.apache.nifi.processors.aws.kinesis.stream.pause.RecordProcessorBlocker;
-import org.apache.nifi.util.StopWatch;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
-import java.util.List;
import java.util.Map;
public class KinesisRecordProcessorRaw extends AbstractKinesisRecordProcessor {
@@ -40,29 +38,29 @@ public class KinesisRecordProcessorRaw extends
AbstractKinesisRecordProcessor {
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
- final ProcessSession session, final StopWatch
stopWatch) {
+ void processRecord(final KinesisClientRecord kinesisRecord, final
BatchProcessingContext batchProcessingContext) {
final String partitionKey = kinesisRecord.partitionKey();
final String sequenceNumber = kinesisRecord.sequenceNumber();
final Instant approximateArrivalTimestamp =
kinesisRecord.approximateArrivalTimestamp();
final ByteBuffer dataBuffer = kinesisRecord.data();
- byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] :
new byte[0];
+ final byte[] data = dataBuffer != null ? new
byte[dataBuffer.remaining()] : new byte[0];
if (dataBuffer != null) {
dataBuffer.get(data);
}
- FlowFile flowFile = session.create();
+ final ProcessSession session = batchProcessingContext.session();
+ final FlowFile flowFile = session.create();
session.write(flowFile, out -> out.write(data));
if (getLogger().isDebugEnabled()) {
getLogger().debug("Sequence No: {}, Partition Key: {}, Data: {}",
sequenceNumber, partitionKey, BASE_64_ENCODER.encodeToString(data));
}
- reportProvenance(session, flowFile, partitionKey, sequenceNumber,
stopWatch);
+ reportProvenance(session, flowFile, partitionKey, sequenceNumber,
batchProcessingContext.stopWatch());
final Map<String, String> attributes =
getDefaultAttributes(sequenceNumber, partitionKey, approximateArrivalTimestamp);
- flowFile = session.putAllAttributes(flowFile, attributes);
+ final FlowFile attributedFlowFile = session.putAllAttributes(flowFile,
attributes);
- flowFiles.add(flowFile);
+ batchProcessingContext.flowFiles().add(attributedFlowFile);
}
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
index be52be9d4a..43fa88efa0 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
@@ -21,6 +21,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
+import
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import
org.apache.nifi.processors.aws.kinesis.stream.pause.RecordProcessorBlocker;
import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
@@ -34,10 +35,10 @@ import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StopWatch;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.io.ByteArrayInputStream;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -49,20 +50,19 @@ import java.util.List;
import java.util.Map;
public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor {
- final RecordReaderFactory readerFactory;
- final RecordSetWriterFactory writerFactory;
- final Map<String, String> schemaRetrievalVariables;
-
- private RecordSetWriter writer;
- private OutputStream outputStream;
+ private final RecordReaderFactory readerFactory;
+ private final RecordSetWriterFactory writerFactory;
+ private final Map<String, String> schemaRetrievalVariables;
private final RecordConverter recordConverter;
+ private final StateHandlerStrategy stateHandlerStrategy;
public KinesisRecordProcessorRecord(final ProcessSessionFactory
sessionFactory, final ComponentLog log, final String streamName,
final String endpointPrefix, final
String kinesisEndpoint,
final long checkpointIntervalMillis,
final long retryWaitMillis,
final int numRetries, final
DateTimeFormatter dateTimeFormatter,
final RecordReaderFactory
readerFactory, final RecordSetWriterFactory writerFactory,
- final RecordConverter recordConverter,
final RecordProcessorBlocker recordProcessorBlocker) {
+ final RecordConverter recordConverter,
final RecordProcessorBlocker recordProcessorBlocker,
+ final SchemaDifferenceHandlingStrategy
schemaDifferenceHandlingStrategy) {
super(sessionFactory, log, streamName, endpointPrefix,
kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis,
numRetries, dateTimeFormatter, recordProcessorBlocker);
this.readerFactory = readerFactory;
@@ -70,56 +70,63 @@ public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor
schemaRetrievalVariables =
Collections.singletonMap(KINESIS_RECORD_SCHEMA_KEY, streamName);
this.recordConverter = recordConverter;
+ this.stateHandlerStrategy = new
StateHandlerStrategy(schemaDifferenceHandlingStrategy,
this::initializeFlowFileState, this::completeFlowFileState);
}
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ FlowFileState flowFileState;
+ while ((flowFileState = stateHandlerStrategy.pop()) != null) {
+ // this may happen if the previous processing has not been
completed successfully, close the leftover state
+ closeSafe(flowFileState.asClosable(), "FlowFile State");
+ }
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
- final ProcessSession session, final StopWatch
stopWatch) {
- boolean firstOutputRecord = true;
- int recordCount = 0;
- final ByteBuffer dataBuffer = kinesisRecord.data();
- byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] :
new byte[0];
- if (dataBuffer != null) {
- dataBuffer.get(data);
+ void finishProcessingRecords(final BatchProcessingContext
batchProcessingContext) {
+ super.finishProcessingRecords(batchProcessingContext);
+ final List<FlowFile> flowFiles = batchProcessingContext.flowFiles();
+ FlowFileState flowFileState;
+ while ((flowFileState = stateHandlerStrategy.pop()) != null) {
+ if (!flowFiles.contains(flowFileState.flowFile)) {
+ // this is unexpected, flowFiles have been altered not in this
class after the start of processing
+ throw new IllegalStateException("%s is not available in
provided FlowFiles [%d]".formatted(flowFileState.flowFile, flowFiles.size()));
+ }
+ try {
+ completeFlowFileState(flowFileState, batchProcessingContext);
+ } catch (final FlowFileCompletionException e) {
+ handleFlowFileCompletionException(e, batchProcessingContext);
+ }
}
+ }
+
+ @Override
+ void processRecord(final KinesisClientRecord kinesisRecord, final
BatchProcessingContext batchProcessingContext) {
+ final byte[] data = getData(kinesisRecord);
- FlowFile flowFile = null;
try (final InputStream in = new ByteArrayInputStream(data);
final RecordReader reader =
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length,
getLogger())
) {
Record intermediateRecord;
final PushBackRecordSet recordSet = new
PushBackRecordSet(reader.createRecordSet());
while ((intermediateRecord = recordSet.next()) != null) {
- Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
- if (flowFiles.isEmpty()) {
- flowFile = session.create();
- flowFiles.add(flowFile);
-
- // initialize the writer when the first record is read.
- createWriter(flowFile, session, outputRecord);
- }
-
- final WriteResult writeResult = writer.write(outputRecord);
- recordCount += writeResult.getRecordCount();
-
- // complete the FlowFile if there are no more incoming Kinesis
Records and no more records in this RecordSet
- if (lastRecord && !recordSet.isAnotherRecord()) {
- completeFlowFile(flowFiles, session, recordCount,
writeResult, kinesisRecord, stopWatch);
+ FlowFileState flowFileState;
+ final Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
+ try {
+ flowFileState =
stateHandlerStrategy.getOrCreate(outputRecord, batchProcessingContext);
+ } catch (final FlowFileCompletionException e) {
+ stateHandlerStrategy.drop(e.flowFileState.recordSchema);
+ handleFlowFileCompletionException(e,
batchProcessingContext);
+ flowFileState = stateHandlerStrategy.create(outputRecord,
batchProcessingContext);
}
- firstOutputRecord = false;
+ flowFileState.write(outputRecord, kinesisRecord);
}
} catch (final MalformedRecordException | IOException |
SchemaNotFoundException e) {
// write raw Kinesis Record to the parse failure relationship
getLogger().error("Failed to parse message from Kinesis Stream
using configured Record Reader and Writer due to {}",
e.getLocalizedMessage(), e);
- outputRawRecordOnException(firstOutputRecord, flowFile, flowFiles,
session, data, kinesisRecord, e);
+ outputRawRecordOnException(batchProcessingContext.session(), data,
kinesisRecord, e);
}
if (getLogger().isDebugEnabled()) {
@@ -128,63 +135,93 @@ public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor
}
}
- private void createWriter(final FlowFile flowFile, final ProcessSession
session, final Record outputRecord)
- throws IOException, SchemaNotFoundException {
-
- final RecordSchema readerSchema = outputRecord.getSchema();
- final RecordSchema writeSchema =
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
- outputStream = session.write(flowFile);
- writer = writerFactory.createWriter(getLogger(), writeSchema,
outputStream, flowFile);
- writer.beginRecordSet();
+ private void handleFlowFileCompletionException(final
FlowFileCompletionException e, final BatchProcessingContext
batchProcessingContext) {
+ if (!e.flowFileState.containsDataFromExactlyOneKinesisRecord()) {
+ throw new KinesisBatchUnrecoverableException("Not all
KinesisClientRecords contained in FlowFile contents can be routed to Failure
relationship", e);
+ }
+ dropFlowFileState(e.flowFileState, batchProcessingContext);
+ final KinesisClientRecord failedKinesisRecord =
e.flowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+ final byte[] failedRecordData = getData(failedKinesisRecord);
+ outputRawRecordOnException(batchProcessingContext.session(),
failedRecordData, failedKinesisRecord, e);
}
- private void completeFlowFile(final List<FlowFile> flowFiles, final
ProcessSession session, final int recordCount,
- final WriteResult writeResult, final
KinesisClientRecord lastRecord, final StopWatch stopWatch)
- throws IOException {
+ private static byte[] getData(final KinesisClientRecord kinesisRecord) {
+ final ByteBuffer dataBuffer = kinesisRecord.data();
+ final byte[] data = dataBuffer != null ? new
byte[dataBuffer.remaining()] : new byte[0];
+ if (dataBuffer != null) {
+ dataBuffer.get(data);
+ }
+ return data;
+ }
+ /**
+ * Initializes the FlowFile state for the current processing. This
includes creating a new FlowFile, initializing the RecordSetWriter, and setting
up the output stream.
+ * In case of an exception during initialization, the FlowFile is removed
from the session and the resources are closed properly.
+ */
+ private FlowFileState initializeFlowFileState(final Record record, final
BatchProcessingContext batchProcessingContext) throws IOException,
SchemaNotFoundException {
+ final ProcessSession session = batchProcessingContext.session();
+ FlowFile flowFile = null;
+ OutputStream outputStream = null;
+ RecordSetWriter writer = null;
try {
- writer.finishRecordSet();
- } catch (IOException e) {
- getLogger().error("Failed to finish record output due to {}",
e.getLocalizedMessage(), e);
- session.remove(flowFiles.get(0));
- flowFiles.remove(0);
- throw e;
- } finally {
- try {
- writer.close();
- outputStream.close();
- } catch (final IOException e) {
- getLogger().warn("Failed to close Record Writer due to {}",
e.getLocalizedMessage(), e);
+ flowFile = session.create();
+ final RecordSchema newReadSchema = record.getSchema();
+ final RecordSchema writeSchema =
writerFactory.getSchema(schemaRetrievalVariables, newReadSchema);
+ outputStream = session.write(flowFile);
+ writer = writerFactory.createWriter(getLogger(), writeSchema,
outputStream, flowFile);
+ writer.beginRecordSet();
+ batchProcessingContext.flowFiles().add(flowFile);
+ } catch (final Exception e) {
+ if (flowFile != null) {
+ session.remove(flowFile);
}
+ closeSafe(writer, "Record Writer");
+ closeSafe(outputStream, "Output Stream");
+ throw e;
}
+ return new FlowFileState(flowFile, writer, outputStream,
record.getSchema(), getLogger());
+ }
- reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
+ private void completeFlowFileState(final FlowFileState flowFileState,
final BatchProcessingContext batchProcessingContext)
+ throws FlowFileCompletionException {
+ final ProcessSession session = batchProcessingContext.session();
+ final List<FlowFile> flowFiles = batchProcessingContext.flowFiles();
+ if (flowFileState.isFlowFileEmpty()) {
+ dropFlowFileState(flowFileState, batchProcessingContext);
+ return;
+ }
+ try {
+ flowFileState.writer.finishRecordSet();
+ closeSafe(flowFileState.asClosable(), "FlowFile State");
+ reportProvenance(session, flowFileState.flowFile, null, null,
batchProcessingContext.stopWatch());
- final Map<String, String> attributes =
getDefaultAttributes(lastRecord);
- attributes.put("record.count", String.valueOf(recordCount));
- attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- flowFiles.set(0, session.putAllAttributes(flowFiles.get(0),
attributes));
+ final Map<String, String> attributes =
getDefaultAttributes(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+ attributes.put("record.count",
String.valueOf(flowFileState.lastSuccessfulWriteInfo.writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
flowFileState.writer.getMimeType());
+
attributes.putAll(flowFileState.lastSuccessfulWriteInfo.writeResult.getAttributes());
+ final int flowFileIndex =
flowFiles.indexOf(flowFileState.flowFile);
+ flowFiles.set(flowFileIndex,
session.putAllAttributes(flowFileState.flowFile, attributes));
+ } catch (final IOException e) {
+ dropFlowFileState(flowFileState, batchProcessingContext);
+ final String message = "Failed to complete a FlowFile containing
records from Stream Name: %s, Shard Id: %s, Sequence/Subsequence No range:
[%s/%d, %s/%d)".formatted(
+ getStreamName(),
+ getKinesisShardId(),
+
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.subSequenceNumber(),
+
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.subSequenceNumber()
+ );
+ throw new FlowFileCompletionException(message, e, flowFileState);
+ }
+ }
- writer = null;
- outputStream = null;
+ private void dropFlowFileState(final FlowFileState flowFileState, final
BatchProcessingContext batchProcessingContext) {
+ closeSafe(flowFileState.asClosable(), "FlowFile State");
+ batchProcessingContext.session().remove(flowFileState.flowFile);
+ batchProcessingContext.flowFiles().remove(flowFileState.flowFile);
}
- private void outputRawRecordOnException(final boolean firstOutputRecord,
final FlowFile flowFile,
- final List<FlowFile> flowFiles,
final ProcessSession session,
- final byte[] data, final
KinesisClientRecord kinesisRecord, final Exception e) {
- if (firstOutputRecord && flowFile != null) {
- session.remove(flowFile);
- flowFiles.remove(0);
- if (writer != null) {
- try {
- writer.close();
- outputStream.close();
- } catch (IOException ioe) {
- getLogger().warn("Failed to close Record Writer due to
{}", ioe.getLocalizedMessage(), ioe);
- }
- }
- }
+ private void outputRawRecordOnException(final ProcessSession session,
final byte[] data, final KinesisClientRecord kinesisRecord, final Exception e) {
FlowFile failed = session.create();
session.write(failed, o -> o.write(data));
final Map<String, String> attributes =
getDefaultAttributes(kinesisRecord);
@@ -200,4 +237,71 @@ public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor
final Instant approximateArrivalTimestamp =
kinesisRecord.approximateArrivalTimestamp();
return getDefaultAttributes(sequenceNumber, partitionKey,
approximateArrivalTimestamp);
}
+
+ private void closeSafe(final Closeable closeable, final String
closeableName) {
+ closeSafe(closeable, closeableName, getLogger());
+ }
+
+ private static void closeSafe(final Closeable closeable, final String
closeableName, final ComponentLog logger) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ logger.warn("Failed to close {}", closeableName, e);
+ }
+ }
+ }
+
+ record SuccessfulWriteInfo(KinesisClientRecord kinesisRecord, WriteResult
writeResult) { }
+
+ static class FlowFileState {
+ private final FlowFile flowFile;
+ private final RecordSetWriter writer;
+ private final OutputStream outputStream;
+ private final RecordSchema recordSchema;
+ private final ComponentLog componentLog;
+ private SuccessfulWriteInfo firstSuccessfulWriteInfo;
+ private SuccessfulWriteInfo lastSuccessfulWriteInfo;
+
+ private FlowFileState(final FlowFile flowFile, final RecordSetWriter
writer, final OutputStream outputStream, final RecordSchema recordSchema, final
ComponentLog componentLog) {
+ this.flowFile = flowFile;
+ this.writer = writer;
+ this.outputStream = outputStream;
+ this.recordSchema = recordSchema;
+ this.componentLog = componentLog;
+ }
+
+ private boolean isFlowFileEmpty() {
+ return lastSuccessfulWriteInfo == null;
+ }
+
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ private boolean containsDataFromExactlyOneKinesisRecord() {
+ return !isFlowFileEmpty() &&
firstSuccessfulWriteInfo.kinesisRecord == lastSuccessfulWriteInfo.kinesisRecord;
+ }
+
+ private void write(final Record outputRecord, final
KinesisClientRecord kinesisRecord) throws IOException {
+ final WriteResult writeResult = writer.write(outputRecord);
+ firstSuccessfulWriteInfo = firstSuccessfulWriteInfo == null
+ ? new SuccessfulWriteInfo(kinesisRecord, writeResult)
+ : firstSuccessfulWriteInfo;
+ lastSuccessfulWriteInfo = new SuccessfulWriteInfo(kinesisRecord,
writeResult);
+ }
+
+ private Closeable asClosable() {
+ return () -> {
+ closeSafe(writer, "Record Writer", componentLog);
+ closeSafe(outputStream, "Output Stream", componentLog);
+ };
+ }
+ }
+
+ static class FlowFileCompletionException extends Exception {
+ private final FlowFileState flowFileState;
+
+ private FlowFileCompletionException(final String message, final
Throwable cause, final FlowFileState flowFileState) {
+ super(message, cause);
+ this.flowFileState = flowFileState;
+ }
+ }
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/StateHandlerStrategy.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/StateHandlerStrategy.java
new file mode 100644
index 0000000000..a69c68e959
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/StateHandlerStrategy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record;
+
+import
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor.BatchProcessingContext;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRecord.FlowFileCompletionException;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRecord.FlowFileState;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * StateHandlerStrategy is responsible for managing the state of FlowFiles
created for Records processed in a single batch. In general this class decides
what should happen when a new RecordSchema
+ * is encountered — whether previous State should be completed and new state
created or only a new state should be created.
+ */
+class StateHandlerStrategy {
+
+ private final SchemaDifferenceHandlingStrategy strategy;
+ private final StateInitializerAction stateInitializerAction;
+ private final StateFinalizerAction stateFinalizerAction;
+ private final Map<RecordSchema, FlowFileState> activeStateMap = new
HashMap<>();
+
+ StateHandlerStrategy(final SchemaDifferenceHandlingStrategy strategy,
final StateInitializerAction stateInitializerAction, final StateFinalizerAction
stateFinalizerAction) {
+ this.strategy = strategy;
+ this.stateInitializerAction = stateInitializerAction;
+ this.stateFinalizerAction = stateFinalizerAction;
+ }
+
+ FlowFileState getOrCreate(final Record record, final
BatchProcessingContext flowFileContext) throws FlowFileCompletionException,
IOException, SchemaNotFoundException {
+ final FlowFileState previousState =
activeStateMap.get(record.getSchema());
+ if (previousState != null) {
+ return previousState;
+ }
+ if (strategy == SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE) {
+ // for create flow file strategy we need to complete the possible
previous state before creating a new one
+ completeAllAvailableFlowFileStates(flowFileContext);
+ }
+ return create(record, flowFileContext);
+ }
+
+ private void completeAllAvailableFlowFileStates(BatchProcessingContext
flowFileContext) throws FlowFileCompletionException {
+ FlowFileState previousStateForDifferentSchema;
+ while ((previousStateForDifferentSchema = pop()) != null) {
+ stateFinalizerAction.complete(previousStateForDifferentSchema,
flowFileContext);
+ }
+ }
+
+ FlowFileState create(final Record record, final BatchProcessingContext
flowFileContext) throws IOException, SchemaNotFoundException {
+ final FlowFileState previousState =
activeStateMap.get(record.getSchema());
+ if (previousState != null) {
+ throw new IllegalStateException(
+ "FlowFile state already exists for schema: " +
record.getSchema() + ". This should not happen in a batch processing context."
+ );
+ }
+ if (strategy == SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE &&
!activeStateMap.isEmpty()) {
+ throw new IllegalStateException(
+ "An uncompleted FlowFileState found while using
SchemaDifferenceHandlingStrategy: " +
+ SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE + ".
Cannot create a new state until previous is completed or dropped."
+ );
+ }
+ final FlowFileState newState = stateInitializerAction.init(record,
flowFileContext);
+ activeStateMap.put(record.getSchema(), newState);
+ return newState;
+ }
+
+ FlowFileState pop() {
+ final Iterator<Map.Entry<RecordSchema, FlowFileState>> iterator =
activeStateMap.entrySet().iterator();
+ if (!iterator.hasNext()) {
+ return null;
+ }
+ return activeStateMap.remove(iterator.next().getKey());
+ }
+
+ void drop(final RecordSchema recordSchema) {
+ activeStateMap.remove(recordSchema);
+ }
+
+ interface StateInitializerAction {
+ FlowFileState init(Record record, BatchProcessingContext
flowFileContext) throws IOException, SchemaNotFoundException;
+ }
+
+ interface StateFinalizerAction {
+ void complete(FlowFileState flowFile, BatchProcessingContext
flowFileContext) throws FlowFileCompletionException;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
index 5d24264538..c6fca27f00 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestAbstractKinesisRecordProcessor.java
@@ -16,14 +16,11 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import
org.apache.nifi.processors.aws.kinesis.stream.pause.StandardRecordProcessorBlocker;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
-import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
@@ -40,7 +37,6 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.format.DateTimeFormatter;
-import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,7 +73,7 @@ public class TestAbstractKinesisRecordProcessor {
fixture = new AbstractKinesisRecordProcessor(processSessionFactory,
runner.getLogger(), "kinesis-test",
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER,
NOOP_RECORD_PROCESSOR_BLOCKER) {
@Override
- void processRecord(List<FlowFile> flowFiles, KinesisClientRecord
kinesisRecord, boolean lastRecord, ProcessSession session, StopWatch stopWatch)
{
+ void processRecord(KinesisClientRecord kinesisRecord,
BatchProcessingContext batchProcessingContext) {
// intentionally blank
}
};
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
index 7c8baf57c7..fa8377b346 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream.record;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessSessionFactory;
+import
org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import
org.apache.nifi.processors.aws.kinesis.stream.pause.StandardRecordProcessorBlocker;
import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterIdentity;
@@ -36,8 +38,14 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
+import org.mockito.Mockito;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@@ -56,6 +64,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -103,11 +112,13 @@ public class TestKinesisRecordProcessorRecord {
runner.setProperty(writer, "output-grouping", "output-oneline");
runner.enableControllerService(writer);
runner.setProperty(ConsumeKinesisStream.RECORD_WRITER,
"record-writer");
+ }
+ private KinesisRecordProcessorRecord defaultFixtureWithStrategy(final
SchemaDifferenceHandlingStrategy strategy) {
// default test fixture will try operations twice with very little
wait in between
- fixture = new KinesisRecordProcessorRecord(processSessionFactory,
runner.getLogger(), "kinesis-test",
+ return new KinesisRecordProcessorRecord(processSessionFactory,
runner.getLogger(), "kinesis-test",
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER,
- reader, writer, new RecordConverterIdentity(),
NOOP_RECORD_PROCESSOR_BLOCKER);
+ reader, writer, new RecordConverterIdentity(),
NOOP_RECORD_PROCESSOR_BLOCKER, strategy);
}
@AfterEach
@@ -116,8 +127,10 @@ public class TestKinesisRecordProcessorRecord {
reset(checkpointer, kinesisRecord, processSessionFactory);
}
- @Test
- public void testProcessRecordsEmpty() {
+ @ParameterizedTest
+ @EnumSource(SchemaDifferenceHandlingStrategy.class)
+ public void testProcessRecordsEmpty(final SchemaDifferenceHandlingStrategy
strategy) {
+ fixture = defaultFixtureWithStrategy(strategy);
final ProcessRecordsInput processRecordsInput =
ProcessRecordsInput.builder()
.records(Collections.emptyList())
.checkpointer(checkpointer)
@@ -137,27 +150,33 @@ public class TestKinesisRecordProcessorRecord {
session.assertNotRolledBack();
}
- @Test
- public void testProcessRecordsNoCheckpoint() {
- processMultipleRecordsAssertProvenance(false, false);
+ private static Stream<Arguments.ArgumentSet> testProcessRecordsArgs() {
+ final List<Pair<String, Boolean>> endpointOverriden = List.of(
+ Pair.of("Overriden Endpoint", true),
+ Pair.of("Default Endpoint", false)
+ );
+ final List<Pair<String, Boolean>> usingWrapper = List.of(
+ Pair.of("Using Schema Wrapper", true),
+ Pair.of("Default Schema", false)
+ );
+ final List<Pair<String, SchemaDifferenceHandlingStrategy>>
schemaChangeStrategy = Arrays.stream(SchemaDifferenceHandlingStrategy.values())
+ .map(strategy -> Pair.of(strategy.name(), strategy))
+ .toList();
+ return endpointOverriden.stream()
+ .flatMap(endpoint -> schemaChangeStrategy.stream()
+ .flatMap(strategy -> usingWrapper.stream()
+ .map(wrapper -> Arguments.argumentSet(
+ String.format("%s, %s, %s",
endpoint.getLeft(), wrapper.getLeft(), strategy.getLeft()),
+ endpoint.getRight(),
+ wrapper.getRight(),
+ strategy.getRight()
+ ))));
}
- @Test
- public void testProcessRecordsWithEndpointOverride() {
- processMultipleRecordsAssertProvenance(true, false);
- }
-
- @Test
- public void testProcessWrappedRecordsNoCheckpoint() {
- processMultipleRecordsAssertProvenance(false, true);
- }
-
- @Test
- public void testProcessWrappedRecordsWithEndpointOverride() {
- processMultipleRecordsAssertProvenance(true, true);
- }
-
- private void processMultipleRecordsAssertProvenance(final boolean
endpointOverridden, final boolean useWrapper) {
+ @ParameterizedTest
+ @MethodSource("testProcessRecordsArgs")
+ void processMultipleRecordsAssertProvenance(final boolean
endpointOverridden, final boolean useWrapper, final
SchemaDifferenceHandlingStrategy strategy) {
+ fixture = defaultFixtureWithStrategy(strategy);
final Instant referenceInstant =
Instant.parse("2021-01-01T00:00:00.000Z");
final Date firstDate = Date.from(referenceInstant.minus(1,
ChronoUnit.MINUTES));
final Date secondDate = Date.from(referenceInstant);
@@ -189,7 +208,7 @@ public class TestKinesisRecordProcessorRecord {
final String transitUriPrefix = endpointOverridden ?
"https://another-endpoint.com:8443" : "http://endpoint-prefix.amazonaws.com";
fixture = new KinesisRecordProcessorRecord(processSessionFactory,
runner.getLogger(), "kinesis-test",
"endpoint-prefix", transitUriPrefix, 10_000L, 1L, 2,
DATE_TIME_FORMATTER,
- reader, writer, useWrapper ? new RecordConverterWrapper() :
new RecordConverterIdentity(), NOOP_RECORD_PROCESSOR_BLOCKER);
+ reader, writer, useWrapper ? new RecordConverterWrapper() :
new RecordConverterIdentity(), NOOP_RECORD_PROCESSOR_BLOCKER, strategy);
// skip checkpoint
fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() +
10_000L);
@@ -233,8 +252,10 @@ public class TestKinesisRecordProcessorRecord {
"shardedSequenceNumber":"200000000000000000000","partitionKey":"partition-2","approximateArrival":1609459200000},"value":{"record":"2"}}""";
}
- @Test
- public void testProcessPoisonPillRecordButNoRawOutputWithCheckpoint()
throws ShutdownException, InvalidStateException {
+ @ParameterizedTest
+ @EnumSource(SchemaDifferenceHandlingStrategy.class)
+ public void testProcessPoisonPillRecordButNoRawOutputWithCheckpoint(final
SchemaDifferenceHandlingStrategy strategy) throws ShutdownException,
InvalidStateException {
+ fixture = defaultFixtureWithStrategy(strategy);
final ProcessRecordsInput processRecordsInput =
ProcessRecordsInput.builder()
.records(Arrays.asList(
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
@@ -284,22 +305,70 @@ public class TestKinesisRecordProcessorRecord {
session.assertNotRolledBack();
}
- @Test
- public void testProcessUnparsableRecordWithRawOutputWithCheckpoint()
throws ShutdownException, InvalidStateException {
+ private static Stream<Arguments.ArgumentSet> unparsableRecordsLists() {
+ final KinesisClientRecord unparsableRecordMock =
mock(KinesisClientRecord.class);
+ final KinesisClientRecord parsableRecord1 =
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
+ .partitionKey("partition-1")
+ .sequenceNumber("1")
+
.data(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8)))
+ .build();
+ final KinesisClientRecord parsableRecord3 =
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
+ .partitionKey("partition-3")
+ .sequenceNumber("3")
+
.data(ByteBuffer.wrap("{\"record\":\"3\"}".getBytes(StandardCharsets.UTF_8)))
+ .build();
+ final List<List<KinesisClientRecord>> recordLists = List.of(
+ Arrays.asList(
+ unparsableRecordMock,
+ parsableRecord1,
+ parsableRecord3
+ ),
+ Arrays.asList(
+ parsableRecord1,
+ unparsableRecordMock,
+ parsableRecord3
+ ),
+ Arrays.asList(
+ parsableRecord1,
+ parsableRecord3,
+ unparsableRecordMock
+ )
+ );
+ return Arrays.stream(SchemaDifferenceHandlingStrategy.values())
+ .flatMap(strategy -> recordLists.stream().map(recordList ->
Pair.of(strategy, recordList)))
+ .map(pair -> {
+ final SchemaDifferenceHandlingStrategy strategy =
pair.getLeft();
+ final List<KinesisClientRecord> inputRecords =
pair.getRight();
+ String position = "Middle";
+ if (inputRecords.getFirst() == unparsableRecordMock) {
+ position = "Beginning";
+ }
+ if (inputRecords.getLast() == unparsableRecordMock) {
+ position = "End";
+ }
+ return Arguments.argumentSet(
+ String.format("Strategy=%s, Unparsable Record
Position=%s", strategy, position),
+ inputRecords,
+ unparsableRecordMock,
+ strategy
+ );
+ })
+ .peek(it -> {
+ parsableRecord1.data().rewind();
+ parsableRecord3.data().rewind();
+ Mockito.reset(unparsableRecordMock);
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("unparsableRecordsLists")
+ void testProcessUnparsableRecordWithRawOutputWithCheckpoint(
+ final List<KinesisClientRecord> inputRecords,
+ final KinesisClientRecord kinesisRecord,
+ final SchemaDifferenceHandlingStrategy strategy) throws
ShutdownException, InvalidStateException {
+ fixture = defaultFixtureWithStrategy(strategy);
final ProcessRecordsInput processRecordsInput =
ProcessRecordsInput.builder()
- .records(Arrays.asList(
-
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
- .partitionKey("partition-1")
- .sequenceNumber("1")
-
.data(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8)))
- .build(),
- kinesisRecord,
-
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
- .partitionKey("partition-3")
- .sequenceNumber("3")
-
.data(ByteBuffer.wrap("{\"record\":\"3\"}".getBytes(StandardCharsets.UTF_8)))
- .build()
- ))
+ .records(inputRecords)
.checkpointer(checkpointer)
.cacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
.cacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
@@ -343,6 +412,100 @@ public class TestKinesisRecordProcessorRecord {
session.assertNotRolledBack();
}
+ @ParameterizedTest
+ @EnumSource(SchemaDifferenceHandlingStrategy.class)
+ void testProcessMultipleRecordInSingleKinesisRecord(final
SchemaDifferenceHandlingStrategy strategy) throws ShutdownException,
InvalidStateException {
+ fixture = defaultFixtureWithStrategy(strategy);
+
testFlowFileContents(List.of("[{\"record\":1},{\"record\":2},{\"record\":3}]"),
List.of(EmittedFlowFile.of("{\"record\":1}\n{\"record\":2}\n{\"record\":3}",
3)));
+ }
+
+ @Nested
+ class StateHandlerStrategyTest {
+
+ private static List<String> inputJsonRecords() {
+ return List.of(
+ "{\"colA\":1}", // inferred to ["colA":int]
+ "{\"colA\":%d}".formatted(Long.MAX_VALUE), // inferred to
["colA":long]
+ "{\"colA\":3}"
+ );
+ }
+
+ @Test
+ void testProcessIncompatibleSchemaKinesisRecordsStrategyGrouping()
throws ShutdownException, InvalidStateException {
+ fixture =
defaultFixtureWithStrategy(SchemaDifferenceHandlingStrategy.GROUP_RECORDS);
+ final List<String> inputJsonRecords = inputJsonRecords();
+ testFlowFileContents(
+ inputJsonRecords,
+ List.of(
+ EmittedFlowFile.of(String.join("\n",
inputJsonRecords.get(0), inputJsonRecords.get(2)), 2),
+ EmittedFlowFile.of(inputJsonRecords.get(1), 1)
+ )
+ );
+ }
+
+ @Test
+ void testProcessIncompatibleSchemaKinesisRecordsStrategyRolling()
throws ShutdownException, InvalidStateException {
+ fixture =
defaultFixtureWithStrategy(SchemaDifferenceHandlingStrategy.CREATE_FLOW_FILE);
+ final List<String> inputJsonRecords = inputJsonRecords();
+ testFlowFileContents(
+ inputJsonRecords,
+ List.of(
+ EmittedFlowFile.of(inputJsonRecords.get(0), 1),
+ EmittedFlowFile.of(inputJsonRecords.get(1), 1),
+ EmittedFlowFile.of(inputJsonRecords.get(2), 1)
+ )
+ );
+ }
+ }
+
+ void testFlowFileContents(final List<String> kinesisRecordJsonContents,
final List<EmittedFlowFile> emittedFlowFiles) throws ShutdownException,
InvalidStateException {
+ final List<KinesisClientRecord> kinesisRecords =
kinesisRecordJsonContents.stream()
+ .map(json -> KinesisClientRecord.builder()
+ .approximateArrivalTimestamp(null)
+ .partitionKey("partition-1")
+ .sequenceNumber("1")
+
.data(ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8)))
+ .build())
+ .toList();
+ final ProcessRecordsInput processRecordsInput =
ProcessRecordsInput.builder()
+ .records(kinesisRecords)
+ .checkpointer(checkpointer)
+ .cacheEntryTime(Instant.now().minus(1, ChronoUnit.MINUTES))
+ .cacheExitTime(Instant.now().minus(1, ChronoUnit.SECONDS))
+ .millisBehindLatest(100L)
+ .build();
+
+
when(kinesisRecord.data()).thenReturn(ByteBuffer.wrap("invalid-json".getBytes(StandardCharsets.UTF_8)));
+ when(kinesisRecord.partitionKey()).thenReturn("unparsable-partition");
+ when(kinesisRecord.sequenceNumber()).thenReturn("unparsable-sequence");
+ when(kinesisRecord.approximateArrivalTimestamp()).thenReturn(null);
+
+ fixture.setKinesisShardId("test-shard");
+
+ when(processSessionFactory.createSession()).thenReturn(session);
+ fixture.processRecords(processRecordsInput);
+ verify(processSessionFactory, times(1)).createSession();
+
+ // check non-poison pill records are output successfully
+ session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS,
emittedFlowFiles.size());
+ final List<MockFlowFile> flowFiles =
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
+ for (int i = 0; i < emittedFlowFiles.size(); i++) {
+ final EmittedFlowFile emittedFlowFile = emittedFlowFiles.get(i);
+ assertFlowFile(flowFiles.get(i), null, "partition-1", "1",
"test-shard", emittedFlowFile.content, emittedFlowFile.recordCount);
+ }
+
+ verify(checkpointer, times(1)).checkpoint();
+
+ session.assertCommitted();
+ session.assertNotRolledBack();
+ }
+
+ private record EmittedFlowFile(String content, int recordCount) {
+ public static EmittedFlowFile of(final String content, final int
recordCount) {
+ return new EmittedFlowFile(content, recordCount);
+ }
+ }
+
private void assertFlowFile(final MockFlowFile flowFile, final Date
approxTimestamp, final String partitionKey,
final String sequenceNumber, final String
shard, final String content, final int recordCount) {
if (approxTimestamp != null) {