This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6d7805f203 NIFI-14285 Added Output Strategy with Wrapper to
ConsumeKinesisStream (#9738)
6d7805f203 is described below
commit 6d7805f2035f9d14683795cebfd2bd3b98634b67
Author: Dariusz Seweryn <[email protected]>
AuthorDate: Tue Mar 4 22:11:19 2025 +0100
NIFI-14285 Added Output Strategy with Wrapper to ConsumeKinesisStream
(#9738)
Signed-off-by: David Handermann <[email protected]>
---
.../aws/kinesis/property/OutputStrategy.java | 50 +++++++++++++++
.../aws/kinesis/stream/ConsumeKinesisStream.java | 24 +++++--
.../record/AbstractKinesisRecordProcessor.java | 4 ++
.../record/KinesisRecordProcessorRecord.java | 15 +++--
.../stream/record/converter/RecordConverter.java | 25 ++++++++
.../record/converter/RecordConverterIdentity.java | 27 ++++++++
.../record/converter/RecordConverterWrapper.java | 73 ++++++++++++++++++++++
.../additionalDetails.md | 7 ++-
.../record/TestKinesisRecordProcessorRecord.java | 67 ++++++++++++++------
9 files changed, 263 insertions(+), 29 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/OutputStrategy.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/OutputStrategy.java
new file mode 100644
index 0000000000..885acbdd69
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/OutputStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Kinesis Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {
+ USE_VALUE("Use Content as Value", "Write only the Kinesis Record value to
the FlowFile record."),
+ USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and metadata
into the FlowFile record.");
+
+ private final String displayName;
+ private final String description;
+
+ OutputStrategy(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
index 3771e3835a..75ba3496dd 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
@@ -48,9 +48,13 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.kinesis.property.OutputStrategy;
import
org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
import
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRaw;
import
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRecord;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterIdentity;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterWrapper;
import org.apache.nifi.processors.aws.v2.AbstractAwsAsyncProcessor;
import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -107,7 +111,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@@ -304,6 +307,15 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
.required(true)
.build();
+ public static final PropertyDescriptor OUTPUT_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Output Strategy")
+ .description("The format used to output the Kinesis Record into a
FlowFile Record.")
+ .required(true)
+ .defaultValue(OutputStrategy.USE_VALUE)
+ .allowableValues(OutputStrategy.class)
+ .dependsOn(RECORD_WRITER)
+ .build();
+
public static final Relationship REL_PARSE_FAILURE = new
Relationship.Builder()
.name("parse.failure")
.description("If a message from Kinesis cannot be parsed using the
configured Record Reader" +
@@ -317,6 +329,7 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
APPLICATION_NAME,
RECORD_READER,
RECORD_WRITER,
+ OUTPUT_STRATEGY,
REGION,
ENDPOINT_OVERRIDE,
DYNAMODB_ENDPOINT_OVERRIDE,
@@ -645,8 +658,7 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
.keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
- .collect(Collectors.toList());
-
+ .toList();
final RetrievalConfig retrievalConfig =
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName,
kinesisClient));
@@ -702,11 +714,15 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
private ShardRecordProcessorFactory prepareRecordProcessorFactory(final
ProcessContext context, final ProcessSessionFactory sessionFactory) {
return () -> {
if (isRecordReaderSet && isRecordWriterSet) {
+ final OutputStrategy outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+ final RecordConverter recordConverter =
OutputStrategy.USE_WRAPPER == outputStrategy
+ ? new RecordConverterWrapper()
+ : new RecordConverterIdentity();
return new KinesisRecordProcessorRecord(
sessionFactory, getLogger(), getStreamName(context),
getEndpointPrefix(context),
getKinesisEndpoint(context).orElse(null),
getCheckpointIntervalMillis(context),
getRetryWaitMillis(context), getNumRetries(context),
getDateTimeFormatter(context),
- getReaderFactory(context), getWriterFactory(context)
+ getReaderFactory(context), getWriterFactory(context),
recordConverter
);
} else {
return new KinesisRecordProcessorRaw(
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
index ace24b71ae..d219067309 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
@@ -313,6 +313,10 @@ public abstract class AbstractKinesisRecordProcessor
implements ShardRecordProce
return log;
}
+ String getStreamName() {
+ return streamName;
+ }
+
String getKinesisShardId() {
return kinesisShardId;
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
index 03ba1ca164..d39d791e34 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
@@ -22,6 +22,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@@ -30,6 +31,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
@@ -52,18 +54,21 @@ public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor
private RecordSetWriter writer;
private OutputStream outputStream;
+ private final RecordConverter recordConverter;
public KinesisRecordProcessorRecord(final ProcessSessionFactory
sessionFactory, final ComponentLog log, final String streamName,
final String endpointPrefix, final
String kinesisEndpoint,
final long checkpointIntervalMillis,
final long retryWaitMillis,
final int numRetries, final
DateTimeFormatter dateTimeFormatter,
- final RecordReaderFactory
readerFactory, final RecordSetWriterFactory writerFactory) {
+ final RecordReaderFactory
readerFactory, final RecordSetWriterFactory writerFactory,
+ final RecordConverter recordConverter)
{
super(sessionFactory, log, streamName, endpointPrefix,
kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis,
numRetries, dateTimeFormatter);
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
schemaRetrievalVariables =
Collections.singletonMap(KINESIS_RECORD_SCHEMA_KEY, streamName);
+ this.recordConverter = recordConverter;
}
@Override
@@ -88,9 +93,10 @@ public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor
try (final InputStream in = new ByteArrayInputStream(data);
final RecordReader reader =
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length,
getLogger())
) {
- org.apache.nifi.serialization.record.Record outputRecord;
+ Record intermediateRecord;
final PushBackRecordSet recordSet = new
PushBackRecordSet(reader.createRecordSet());
- while ((outputRecord = recordSet.next()) != null) {
+ while ((intermediateRecord = recordSet.next()) != null) {
+ Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
if (flowFiles.isEmpty()) {
flowFile = session.create();
flowFiles.add(flowFile);
@@ -121,8 +127,7 @@ public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor
}
}
- private void createWriter(final FlowFile flowFile, final ProcessSession
session,
- final
org.apache.nifi.serialization.record.Record outputRecord)
+ private void createWriter(final FlowFile flowFile, final ProcessSession
session, final Record outputRecord)
throws IOException, SchemaNotFoundException {
final RecordSchema readerSchema = outputRecord.getSchema();
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverter.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverter.java
new file mode 100644
index 0000000000..886c2e5cbd
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public interface RecordConverter {
+
+ Record convert(final Record record, final KinesisClientRecord
kinesisRecord, final String streamName, final String shardId);
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterIdentity.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterIdentity.java
new file mode 100644
index 0000000000..b2ac42113e
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterIdentity.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public class RecordConverterIdentity implements RecordConverter {
+ @Override
+ public Record convert(final Record record, final KinesisClientRecord
kinesisRecord, final String streamName, final String shardId) {
+ return record;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterWrapper.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterWrapper.java
new file mode 100644
index 0000000000..208eb5dcf4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordConverterWrapper implements RecordConverter {
+
+ private static final String VALUE = "value";
+ private static final String METADATA = "metadata";
+
+ private static final String STREAM = "stream";
+ private static final String SHARD_ID = "shardId";
+ private static final String SEQUENCE_NUMBER = "sequenceNumber";
+ private static final String PARTITION_KEY = "partitionKey";
+ private static final String APPROX_ARRIVAL_TIMESTAMP =
"approximateArrival";
+
+ private static final RecordField FIELD_STREAM = new RecordField(STREAM,
RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_SHARD_ID = new
RecordField(SHARD_ID, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_SEQUENCE_NUMBER = new
RecordField(SEQUENCE_NUMBER, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_PARTITION_KEY = new
RecordField(PARTITION_KEY, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_APPROX_ARRIVAL_TIMESTAMP = new
RecordField(APPROX_ARRIVAL_TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType());
+ private static final RecordSchema SCHEMA_METADATA = new
SimpleRecordSchema(Arrays.asList(
+ FIELD_STREAM, FIELD_SHARD_ID, FIELD_SEQUENCE_NUMBER,
FIELD_PARTITION_KEY, FIELD_APPROX_ARRIVAL_TIMESTAMP));
+
+ public static final RecordField FIELD_METADATA = new RecordField(METADATA,
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
+
+
+ @Override
+ public Record convert(final Record valueRecord, final KinesisClientRecord
kinesisRecord, final String streamName, final String shardId) {
+ final Map<String, Object> metadata = new LinkedHashMap<>();
+ metadata.put(STREAM, streamName);
+ metadata.put(SHARD_ID, shardId);
+ metadata.put(SEQUENCE_NUMBER, kinesisRecord.sequenceNumber());
+ metadata.put(PARTITION_KEY, kinesisRecord.partitionKey());
+ final Instant approxArrivalTimestamp =
kinesisRecord.approximateArrivalTimestamp();
+ metadata.put(APPROX_ARRIVAL_TIMESTAMP, approxArrivalTimestamp == null
? null : approxArrivalTimestamp.toEpochMilli());
+ final Record metadataRecord = new MapRecord(SCHEMA_METADATA, metadata);
+
+ return new MapRecord(convertToWriteSchema(valueRecord.getSchema()),
Map.of(METADATA, metadataRecord, VALUE, valueRecord));
+ }
+
+ private RecordSchema convertToWriteSchema(final RecordSchema readerSchema)
{
+ final RecordField recordField = new RecordField(VALUE,
RecordFieldType.RECORD.getRecordDataType(readerSchema));
+ return new SimpleRecordSchema(List.of(FIELD_METADATA, recordField));
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
index d7363cf9fb..24bee7cc8e 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
@@ -36,4 +36,9 @@ within the batch of Kinesis Records (messages), instead of a
separate FlowFile p
The FlowFiles emitted in this mode will include the standard `record.*`
attributes along with the same Kinesis Shard ID,
Sequence Number and Approximate Arrival Timestamp; but the values will relate
to the **last** Kinesis Record that was
-processed in the batch of messages constituting the content of the FlowFile.
\ No newline at end of file
+processed in the batch of messages constituting the content of the FlowFile.
+
+Once a Record Writer is set the Output Strategy can be set to `Use Wrapper` or
`Use Content`. When `Use Wrapper` is
+picked the original content of the Kinesis Record will be wrapped under
`value` key and an additional `metadata`
+key will be populated with Stream Name, Shard ID, Partition Key, Sequence
Number and Approximate Arrival Timestamp.
+When `Use Content` is picked the original content of the Kinesis Record will
be used as is.
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
index 81296efa37..f877362c65 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
@@ -21,6 +21,8 @@ import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterIdentity;
+import
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterWrapper;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
@@ -101,7 +103,7 @@ public class TestKinesisRecordProcessorRecord {
// default test fixture will try operations twice with very little
wait in between
fixture = new KinesisRecordProcessorRecord(processSessionFactory,
runner.getLogger(), "kinesis-test",
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER,
- reader, writer);
+ reader, writer, new RecordConverterIdentity());
}
@AfterEach
@@ -133,17 +135,28 @@ public class TestKinesisRecordProcessorRecord {
@Test
public void testProcessRecordsNoCheckpoint() {
- processMultipleRecordsAssertProvenance(false);
+ processMultipleRecordsAssertProvenance(false, false);
}
@Test
public void testProcessRecordsWithEndpointOverride() {
- processMultipleRecordsAssertProvenance(true);
+ processMultipleRecordsAssertProvenance(true, false);
}
- private void processMultipleRecordsAssertProvenance(final boolean
endpointOverridden) {
- final Date firstDate = Date.from(Instant.now().minus(1,
ChronoUnit.MINUTES));
- final Date secondDate = new Date();
+ @Test
+ public void testProcessWrappedRecordsNoCheckpoint() {
+ processMultipleRecordsAssertProvenance(false, true);
+ }
+
+ @Test
+ public void testProcessWrappedRecordsWithEndpointOverride() {
+ processMultipleRecordsAssertProvenance(true, true);
+ }
+
+ private void processMultipleRecordsAssertProvenance(final boolean
endpointOverridden, final boolean useWrapper) {
+ final Instant referenceInstant =
Instant.parse("2021-01-01T00:00:00.000Z");
+ final Date firstDate = Date.from(referenceInstant.minus(1,
ChronoUnit.MINUTES));
+ final Date secondDate = Date.from(referenceInstant);
final ProcessRecordsInput processRecordsInput =
ProcessRecordsInput.builder()
.records(Arrays.asList(
@@ -170,11 +183,9 @@ public class TestKinesisRecordProcessorRecord {
.build();
final String transitUriPrefix = endpointOverridden ?
"https://another-endpoint.com:8443" : "http://endpoint-prefix.amazonaws.com";
- if (endpointOverridden) {
- fixture = new KinesisRecordProcessorRecord(processSessionFactory,
runner.getLogger(), "kinesis-test",
- "endpoint-prefix", "https://another-endpoint.com:8443",
10_000L, 1L, 2, DATE_TIME_FORMATTER,
- reader, writer);
- }
+ fixture = new KinesisRecordProcessorRecord(processSessionFactory,
runner.getLogger(), "kinesis-test",
+ "endpoint-prefix", transitUriPrefix, 10_000L, 1L, 2,
DATE_TIME_FORMATTER,
+ reader, writer, useWrapper ? new RecordConverterWrapper() :
new RecordConverterIdentity());
// skip checkpoint
fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() +
10_000L);
@@ -190,16 +201,34 @@ public class TestKinesisRecordProcessorRecord {
final List<MockFlowFile> flowFiles =
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
// 4 records in single output file, attributes equating to that of the
last record
- assertFlowFile(flowFiles.get(0), secondDate, "partition-2", "2",
"another-shard", "{\"record\":\"1\"}\n" +
- "{\"record\":\"1b\"}\n" +
- "{\"record\":\"no-date\"}\n" +
- "{\"record\":\"2\"}", 4);
+ final String expectedContent = useWrapper ?
expectRecordContentWrapper() : expectRecordContentIdentity();
+ assertFlowFile(flowFiles.getFirst(), secondDate, "partition-2", "2",
"another-shard", expectedContent, 4);
session.assertTransferCount(ConsumeKinesisStream.REL_PARSE_FAILURE, 0);
session.assertCommitted();
session.assertNotRolledBack();
}
+ private String expectRecordContentIdentity() {
+ return """
+ {"record":"1"}
+ {"record":"1b"}
+ {"record":"no-date"}
+ {"record":"2"}""";
+ }
+
+ private String expectRecordContentWrapper() {
+ return """
+
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"1","partitionKey":"partition-1",\
+ "approximateArrival":1609459140000},"value":{"record":"1"}}
+
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"1","partitionKey":"partition-1",\
+ "approximateArrival":1609459140000},"value":{"record":"1b"}}
+
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"no-date","partitionKey":"partition-no-date",\
+ "approximateArrival":null},"value":{"record":"no-date"}}
+
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"2","partitionKey":"partition-2",\
+ "approximateArrival":1609459200000},"value":{"record":"2"}}""";
+ }
+
@Test
public void testProcessPoisonPillRecordButNoRawOutputWithCheckpoint()
throws ShutdownException, InvalidStateException {
final ProcessRecordsInput processRecordsInput =
ProcessRecordsInput.builder()
@@ -235,7 +264,7 @@ public class TestKinesisRecordProcessorRecord {
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles =
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
// 2 successful records in single output file, attributes equating to
that of the last successful record
- assertFlowFile(flowFiles.get(0), null, "partition-3", "3",
"test-shard", "{\"record\":\"1\"}\n" +
+ assertFlowFile(flowFiles.getFirst(), null, "partition-3", "3",
"test-shard", "{\"record\":\"1\"}\n" +
"{\"record\":\"3\"}", 2);
// check no poison-pill output (as the raw data could not be retrieved)
@@ -288,14 +317,14 @@ public class TestKinesisRecordProcessorRecord {
session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles =
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
// 2 successful records in single output file, attributes equating to
that of the last successful record
- assertFlowFile(flowFiles.get(0), null, "partition-3", "3",
"test-shard", "{\"record\":\"1\"}\n" +
+ assertFlowFile(flowFiles.getFirst(), null, "partition-3", "3",
"test-shard", "{\"record\":\"1\"}\n" +
"{\"record\":\"3\"}", 2);
// check poison-pill output (as the raw data could not be retrieved)
session.assertTransferCount(ConsumeKinesisStream.REL_PARSE_FAILURE, 1);
final List<MockFlowFile> failureFlowFiles =
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_PARSE_FAILURE);
- assertFlowFile(failureFlowFiles.get(0), null, "unparsable-partition",
"unparsable-sequence", "test-shard", "invalid-json", 0);
- failureFlowFiles.get(0).assertAttributeExists("record.error.message");
+ assertFlowFile(failureFlowFiles.getFirst(), null,
"unparsable-partition", "unparsable-sequence", "test-shard", "invalid-json", 0);
+
failureFlowFiles.getFirst().assertAttributeExists("record.error.message");
// check the invalid json record was *not* retried a 2nd time
assertNull(verify(kinesisRecord, times(1)).partitionKey());