This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b96d60b244 NIFI-14839 Added Output Strategy property to ConsumeKinesis
(#10443)
b96d60b244 is described below
commit b96d60b244cc2d2ebeea3ad31486bfd11f96f770
Author: MichaĆ Bobowski <[email protected]>
AuthorDate: Fri Oct 24 19:06:32 2025 +0200
NIFI-14839 Added Output Strategy property to ConsumeKinesis (#10443)
Co-authored-by: Mark Payne <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../processors/aws/kinesis/ConsumeKinesis.java | 54 +++++++-
.../aws/kinesis/ReaderRecordProcessor.java | 9 +-
.../converter/InjectMetadataRecordConverter.java | 49 ++++++++
.../kinesis/converter/KinesisRecordConverter.java | 25 ++++
.../kinesis/converter/KinesisRecordMetadata.java | 81 ++++++++++++
.../kinesis/converter/ValueRecordConverter.java | 28 +++++
.../kinesis/converter/WrapperRecordConverter.java | 53 ++++++++
.../additionalDetails.md | 139 +++++++++++++++++++++
.../aws/kinesis/ReaderRecordProcessorTest.java | 27 ++--
.../InjectMetadataRecordConverterTest.java | 84 +++++++++++++
.../converter/KinesisRecordConverterTestUtil.java | 98 +++++++++++++++
.../converter/WrapperRecordConverterTest.java | 76 +++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
13 files changed, 701 insertions(+), 23 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index 8700ac343f..3b4ed83f49 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -41,6 +41,10 @@ import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredential
import org.apache.nifi.processors.aws.kinesis.MemoryBoundRecordBuffer.Lease;
import
org.apache.nifi.processors.aws.kinesis.ReaderRecordProcessor.ProcessingResult;
import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId;
+import
org.apache.nifi.processors.aws.kinesis.converter.InjectMetadataRecordConverter;
+import org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverter;
+import org.apache.nifi.processors.aws.kinesis.converter.ValueRecordConverter;
+import org.apache.nifi.processors.aws.kinesis.converter.WrapperRecordConverter;
import org.apache.nifi.processors.aws.region.RegionUtilV2;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
@@ -219,6 +223,15 @@ public class ConsumeKinesis extends AbstractProcessor {
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
+ static final PropertyDescriptor OUTPUT_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Output Strategy")
+ .description("The format used to output a Kinesis Record into a
FlowFile Record.")
+ .required(true)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+ .defaultValue(OutputStrategy.USE_VALUE)
+ .allowableValues(OutputStrategy.class)
+ .build();
+
static final PropertyDescriptor INITIAL_STREAM_POSITION = new
PropertyDescriptor.Builder()
.name("Initial Stream Position")
.description("The position in the stream where the processor
should start reading.")
@@ -279,6 +292,7 @@ public class ConsumeKinesis extends AbstractProcessor {
PROCESSING_STRATEGY,
RECORD_READER,
RECORD_WRITER,
+ OUTPUT_STRATEGY,
INITIAL_STREAM_POSITION,
STREAM_POSITION_TIMESTAMP,
MAX_BYTES_TO_BUFFER,
@@ -443,7 +457,15 @@ public class ConsumeKinesis extends AbstractProcessor {
private ReaderRecordProcessor createReaderRecordProcessor(final
ProcessContext context) {
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory recordWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- return new ReaderRecordProcessor(recordReaderFactory,
recordWriterFactory, getLogger());
+
+ final OutputStrategy outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+ final KinesisRecordConverter converter = switch (outputStrategy) {
+ case USE_VALUE -> new ValueRecordConverter();
+ case USE_WRAPPER -> new WrapperRecordConverter();
+ case INJECT_METADATA -> new InjectMetadataRecordConverter();
+ };
+
+ return new ReaderRecordProcessor(recordReaderFactory, converter,
recordWriterFactory, getLogger());
}
private static InitialPositionInStreamExtended getInitialPosition(final
ProcessContext context) {
@@ -732,6 +754,36 @@ public class ConsumeKinesis extends AbstractProcessor {
}
}
+ enum OutputStrategy implements DescribedValue {
+ USE_VALUE("Use Content as Value", "Write only the Kinesis Record value
to the FlowFile record."),
+ USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and
metadata into the FlowFile record. (See additional details for more
information.)"),
+ INJECT_METADATA("Inject Metadata",
+ "Write the Kinesis Record value to the FlowFile record and add
a sub-record to it with metadata. (See additional details for more
information.)");
+
+ private final String displayName;
+ private final String description;
+
+ OutputStrategy(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+
// Visible for tests only.
@Nullable URI getKinesisEndpointOverride() {
return null;
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
index 3e36475804..4596278fda 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
@@ -20,6 +20,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@@ -49,14 +50,17 @@ import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RE
final class ReaderRecordProcessor {
private final RecordReaderFactory recordReaderFactory;
+ private final KinesisRecordConverter recordConverter;
private final RecordSetWriterFactory recordWriterFactory;
private final ComponentLog logger;
ReaderRecordProcessor(
final RecordReaderFactory recordReaderFactory,
+ final KinesisRecordConverter recordConverter,
final RecordSetWriterFactory recordWriterFactory,
final ComponentLog logger) {
this.recordReaderFactory = recordReaderFactory;
+ this.recordConverter = recordConverter;
this.recordWriterFactory = recordWriterFactory;
this.logger = logger;
}
@@ -81,7 +85,8 @@ final class ReaderRecordProcessor {
Record record;
while ((record = reader.nextRecord()) != null) {
- final RecordSchema writeSchema =
recordWriterFactory.getSchema(emptyMap(), record.getSchema());
+ final Record convertedRecord =
recordConverter.convert(record, kinesisRecord, streamName, shardId);
+ final RecordSchema writeSchema =
recordWriterFactory.getSchema(emptyMap(), convertedRecord.getSchema());
if (activeFlowFile == null) {
activeFlowFile = ActiveFlowFile.startNewFile(logger,
session, recordWriterFactory, writeSchema, streamName, shardId);
@@ -93,7 +98,7 @@ final class ReaderRecordProcessor {
activeFlowFile = ActiveFlowFile.startNewFile(logger,
session, recordWriterFactory, writeSchema, streamName, shardId);
}
- activeFlowFile.writeRecord(record, kinesisRecord);
+ activeFlowFile.writeRecord(convertedRecord, kinesisRecord);
}
} catch (final IOException | MalformedRecordException |
SchemaNotFoundException e) {
logger.error("Reader or Writer failed to process Kinesis
Record with Stream Name [{}] Shard Id [{}] Sequence Number [{}] SubSequence
Number [{}]",
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverter.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverter.java
new file mode 100644
index 0000000000..d454577d42
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.FIELD_METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.composeMetadataObject;
+
+public final class InjectMetadataRecordConverter implements
KinesisRecordConverter {
+
+ @Override
+ public Record convert(final Record record, final KinesisClientRecord
kinesisRecord, final String streamName, final String shardId) {
+ final List<RecordField> schemaFields = new
ArrayList<>(record.getSchema().getFields());
+ schemaFields.add(FIELD_METADATA);
+ final RecordSchema schema = new SimpleRecordSchema(schemaFields);
+
+ final Record metadata = composeMetadataObject(kinesisRecord,
streamName, shardId);
+ final Map<String, Object> recordValues = new HashMap<>(record.toMap());
+ recordValues.put(METADATA, metadata);
+
+ return new MapRecord(schema, recordValues);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverter.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverter.java
new file mode 100644
index 0000000000..83a65fd81f
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public interface KinesisRecordConverter {
+
+ Record convert(Record record, KinesisClientRecord kinesisRecord, String
streamName, String shardId);
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordMetadata.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordMetadata.java
new file mode 100644
index 0000000000..9fdc480727
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+final class KinesisRecordMetadata {
+
+ static final String METADATA = "kinesisMetadata";
+ static final String APPROX_ARRIVAL_TIMESTAMP = "approximateArrival";
+
+ private static final String STREAM = "stream";
+ private static final String SHARD_ID = "shardId";
+ private static final String SEQUENCE_NUMBER = "sequenceNumber";
+ private static final String SUB_SEQUENCE_NUMBER = "subSequenceNumber";
+ private static final String SHARDED_SEQUENCE_NUMBER =
"shardedSequenceNumber";
+ private static final String PARTITION_KEY = "partitionKey";
+
+ private static final RecordField FIELD_STREAM = new RecordField(STREAM,
RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_SHARD_ID = new
RecordField(SHARD_ID, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_SEQUENCE_NUMBER = new
RecordField(SEQUENCE_NUMBER, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_SUB_SEQUENCE_NUMBER = new
RecordField(SUB_SEQUENCE_NUMBER, RecordFieldType.LONG.getDataType());
+ private static final RecordField FIELD_SHARDED_SEQUENCE_NUMBER = new
RecordField(SHARDED_SEQUENCE_NUMBER, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_PARTITION_KEY = new
RecordField(PARTITION_KEY, RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_APPROX_ARRIVAL_TIMESTAMP = new
RecordField(APPROX_ARRIVAL_TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType());
+
+ private static final RecordSchema SCHEMA_METADATA = new
SimpleRecordSchema(List.of(
+ FIELD_STREAM,
+ FIELD_SHARD_ID,
+ FIELD_SEQUENCE_NUMBER,
+ FIELD_SUB_SEQUENCE_NUMBER,
+ FIELD_SHARDED_SEQUENCE_NUMBER,
+ FIELD_PARTITION_KEY,
+ FIELD_APPROX_ARRIVAL_TIMESTAMP));
+
+ static final RecordField FIELD_METADATA = new RecordField(METADATA,
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
+
+ static Record composeMetadataObject(final KinesisClientRecord
kinesisRecord, final String streamName, final String shardId) {
+ final Map<String, Object> metadata = new HashMap<>(7, 1.0f);
+
+ metadata.put(STREAM, streamName);
+ metadata.put(SHARD_ID, shardId);
+ metadata.put(SEQUENCE_NUMBER, kinesisRecord.sequenceNumber());
+ metadata.put(SUB_SEQUENCE_NUMBER, kinesisRecord.subSequenceNumber());
+ metadata.put(SHARDED_SEQUENCE_NUMBER,
"%s%020d".formatted(kinesisRecord.sequenceNumber(),
kinesisRecord.subSequenceNumber()));
+ metadata.put(PARTITION_KEY, kinesisRecord.partitionKey());
+
+ if (kinesisRecord.approximateArrivalTimestamp() != null) {
+ metadata.put(APPROX_ARRIVAL_TIMESTAMP,
kinesisRecord.approximateArrivalTimestamp().toEpochMilli());
+ }
+
+ return new MapRecord(SCHEMA_METADATA, metadata);
+ }
+
+ private KinesisRecordMetadata() {
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/ValueRecordConverter.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/ValueRecordConverter.java
new file mode 100644
index 0000000000..b67ae22072
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/ValueRecordConverter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public final class ValueRecordConverter implements KinesisRecordConverter {
+
+ @Override
+ public Record convert(final Record record, final KinesisClientRecord
kinesisRecord, final String streamName, final String shardId) {
+ return record;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverter.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverter.java
new file mode 100644
index 0000000000..693077f443
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.FIELD_METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.composeMetadataObject;
+
+public final class WrapperRecordConverter implements KinesisRecordConverter {
+
+ private static final String VALUE = "value";
+
+ @Override
+ public Record convert(final Record record, final KinesisClientRecord
kinesisRecord, final String streamName, final String shardId) {
+ final Record metadata = composeMetadataObject(kinesisRecord,
streamName, shardId);
+
+ final RecordSchema convertedSchema = new SimpleRecordSchema(List.of(
+ FIELD_METADATA,
+ new RecordField(VALUE,
RecordFieldType.RECORD.getRecordDataType(record.getSchema())))
+ );
+ final Map<String, Object> convertedRecord = Map.of(
+ METADATA, metadata,
+ VALUE, record
+ );
+
+ return new MapRecord(convertedSchema, convertedRecord);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
index 5fb223ab89..fee288941b 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
@@ -134,3 +134,142 @@ preserves record ordering for a single Shard, even with
record schema changes.
**Please note**, the processor relies on _Record Reader_ to provide correct
schema for each record.
Using a Reader with schema inference may produce a lot of different schemas,
which may lead to excessive FlowFile creation.
If performance is a concern, it is recommended to use a Reader with a
predefined schema or schema registry.
+
+### Output Strategies
+
+This processor offers multiple strategies configured via processor property
_Output Strategy_ for converting Kinesis records into FlowFiles.
+
+- _Use Content as Value_ (the default) writes only a Kinesis record value to a
FlowFile.
+- _Use Wrapper_ writes a Kinesis Record value as well as metadata into
separate fields of a FlowFile record.
+- _Inject Metadata_ writes a Kinesis Record value to a FlowFile record and
adds a sub-record to it with metadata.
+
+The written metadata includes the following fields:
+- _stream_: The name of the Kinesis stream the record was received from.
+- _shardId_: The identifier of the shard the record was received from.
+- _sequenceNumber_: The sequence number of the record.
+- _subSequenceNumber_: The subsequence number of the record, used when
multiple smaller records are aggregated into a single Kinesis record. If a
record was not part of a batch, this value will be 0.
+- _shardedSequenceNumber_: A combination of the sequence number and
subsequence number. This can be used to uniquely identify a record within a
shard.
+- _partitionKey_: The partition key of the record.
+- _approximateArrival_: The approximate arrival timestamp of the record (in
milliseconds since epoch).
+
+The record schema that is used when _Use Wrapper_ is selected is as follows
(in Avro format):
+
+```json
+{
+ "type": "record",
+ "name": "nifiRecord",
+ "namespace": "org.apache.nifi",
+ "fields": [
+ {
+ "name": "value",
+ "type": [
+ {
+ < Fields as determined by the Record Reader for a Kinesis message >
+ },
+ "null"
+ ]
+ },
+ {
+ "name": "kinesisMetadata",
+ "type": [
+ {
+ "type": "record",
+ "name": "metadataType",
+ "fields": [
+ { "name": "stream", "type": ["string", "null"] },
+ { "name": "shardId", "type": ["string", "null"] },
+ { "name": "sequenceNumber", "type": ["string", "null"] },
+ { "name": "subSequenceNumber", "type": ["long", "null"] },
+ { "name": "shardedSequenceNumber", "type": ["string", "null"] },
+ { "name": "partitionKey", "type": ["string", "null"] },
+ { "name": "approximateArrival", "type": [ { "type": "long",
"logicalType": "timestamp-millis" }, "null" ] }
+ ]
+ },
+ "null"
+ ]
+ }
+ ]
+}
+```
+
+The record schema that is used when "Inject Metadata" is selected is as
follows (in Avro format):
+
+```json
+{
+ "type": "record",
+ "name": "nifiRecord",
+ "namespace": "org.apache.nifi",
+ "fields": [
+ < Fields as determined by the Record Reader for a Kinesis message >,
+ {
+ "name": "kinesisMetadata",
+ "type": [
+ {
+ "type": "record",
+ "name": "metadataType",
+ "fields": [
+ { "name": "stream", "type": ["string", "null"] },
+ { "name": "shardId", "type": ["string", "null"] },
+ { "name": "sequenceNumber", "type": ["string", "null"] },
+ { "name": "subSequenceNumber", "type": ["long", "null"] },
+ { "name": "shardedSequenceNumber", "type": ["string", "null"] },
+ { "name": "partitionKey", "type": ["string", "null"] },
+ { "name": "approximateArrival", "type": [ { "type": "long",
"logicalType": "timestamp-millis" }, "null" ] }
+ ]
+ },
+ "null"
+ ]
+ }
+ ]
+}
+```
+
+Here is an example of FlowFile content that is emitted by JsonRecordSetWriter
when strategy _Use Wrapper_ is selected:
+
+```json
+[
+ {
+ "value": {
+ "address": "1234 First Street",
+ "zip": "12345",
+ "account": {
+ "name": "Acme",
+ "number": "AC1234"
+ }
+ },
+ "kinesisMetadata" : {
+ "stream" : "stream-name",
+ "shardId" : "shardId-000000000000",
+ "sequenceNumber" : "123456789",
+ "subSequenceNumber" : 3,
+ "shardedSequenceNumber" : "12345678900000000000000000003",
+ "partitionKey" : "123",
+ "approximateArrival" : 1756459596788
+ }
+ }
+]
+```
+
+Here is an example of FlowFile content that is emitted by JsonRecordSetWriter
when strategy _Inject Metadata_ is selected:
+
+```json
+[
+ {
+ "address": "1234 First Street",
+ "zip": "12345",
+ "account": {
+ "name": "Acme",
+ "number": "AC1234"
+ },
+ "kinesisMetadata" : {
+ "stream" : "stream-name",
+ "shardId" : "shardId-000000000000",
+ "sequenceNumber" : "123456789",
+ "subSequenceNumber" : 3,
+ "shardedSequenceNumber" : "12345678900000000000000000003",
+ "partitionKey" : "123",
+ "approximateArrival" : 1756459596788
+ }
+ }
+]
+```
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
index 48a4abd687..f685585788 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.logging.ComponentLog;
import
org.apache.nifi.processors.aws.kinesis.ReaderRecordProcessor.ProcessingResult;
+import org.apache.nifi.processors.aws.kinesis.converter.ValueRecordConverter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
@@ -83,8 +84,8 @@ class ReaderRecordProcessorTest {
private MockProcessSession session;
private ComponentLog logger;
- private JsonTreeReader jsonReader;
private JsonRecordSetWriter jsonWriter;
+ private ReaderRecordProcessor processor;
@BeforeEach
void setUp() throws InitializationException {
@@ -93,7 +94,7 @@ class ReaderRecordProcessorTest {
session = new MockProcessSession(sharedState, runner.getProcessor());
logger = runner.getLogger();
- jsonReader = new JsonTreeReader();
+ final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("json-reader", jsonReader);
runner.setProperty(jsonReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaInferenceUtil.INFER_SCHEMA.getValue());
runner.enableControllerService(jsonReader);
@@ -102,12 +103,12 @@ class ReaderRecordProcessorTest {
runner.addControllerService("json-writer", jsonWriter);
runner.setProperty(jsonWriter,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.INHERIT_RECORD_SCHEMA.getValue());
runner.enableControllerService(jsonWriter);
+
+ processor = new ReaderRecordProcessor(jsonReader, new
ValueRecordConverter(), jsonWriter, logger);
}
@Test
void testProcessSingleRecord() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final KinesisClientRecord record = KinesisClientRecord.builder()
.data(ByteBuffer.wrap(USER_JSON_1.getBytes(UTF_8)))
.sequenceNumber("1")
@@ -143,8 +144,6 @@ class ReaderRecordProcessorTest {
@Test
void testProcessMultipleRecordsWithSameSchema() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final List<KinesisClientRecord> records = List.of(
createKinesisRecord(USER_JSON_1, "1"),
createKinesisRecord(USER_JSON_2, "2"),
@@ -171,8 +170,6 @@ class ReaderRecordProcessorTest {
@Test
void testEmptyRecordsList() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final ProcessingResult result = processor.processRecords(session,
TEST_STREAM_NAME, TEST_SHARD_ID, Collections.emptyList());
assertEquals(0, result.successFlowFiles().size());
@@ -181,8 +178,6 @@ class ReaderRecordProcessorTest {
@Test
void testSchemaChangeCreatesNewFlowFile() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final List<KinesisClientRecord> records = List.of(
createKinesisRecord(USER_JSON_1, "1"),
createKinesisRecord(CITY_JSON_1, "2")
@@ -204,8 +199,6 @@ class ReaderRecordProcessorTest {
@Test
void testSchemaChangeWithMultipleRecordsInBetween() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final List<KinesisClientRecord> records = List.of(
createKinesisRecord(USER_JSON_1, "1"),
createKinesisRecord(USER_JSON_2, "2"),
@@ -229,8 +222,6 @@ class ReaderRecordProcessorTest {
@Test
void testSingleMalformedRecord() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final List<KinesisClientRecord> records = List.of(
createKinesisRecord(INVALID_JSON, "1")
);
@@ -251,8 +242,6 @@ class ReaderRecordProcessorTest {
@Test
void testMalformedRecordBetweenValid() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final List<KinesisClientRecord> records = List.of(
createKinesisRecord(USER_JSON_1, "1"),
createKinesisRecord(INVALID_JSON, "2"),
@@ -290,7 +279,7 @@ class ReaderRecordProcessorTest {
}
};
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(failingReaderFactory, jsonWriter, logger);
+ final ReaderRecordProcessor processor = new
ReaderRecordProcessor(failingReaderFactory, new ValueRecordConverter(),
jsonWriter, logger);
final KinesisClientRecord record = createKinesisRecord(USER_JSON_1,
"1");
final List<KinesisClientRecord> records = List.of(record);
@@ -307,7 +296,7 @@ class ReaderRecordProcessorTest {
@Test
void testMalformedRecordExceptionDuringReading() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(getMalformedRecordExceptionReader(), jsonWriter, logger);
+ final ReaderRecordProcessor processor = new
ReaderRecordProcessor(getMalformedRecordExceptionReader(), new
ValueRecordConverter(), jsonWriter, logger);
final KinesisClientRecord record = createKinesisRecord(USER_JSON_1,
"1");
final List<KinesisClientRecord> records =
Collections.singletonList(record);
@@ -324,8 +313,6 @@ class ReaderRecordProcessorTest {
@Test
void testInvalidRecordsWithSchemaEvolution() {
- final ReaderRecordProcessor processor = new
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
final List<KinesisClientRecord> records = List.of(
createKinesisRecord(USER_JSON_1, "1"), // Schema A
createKinesisRecord(USER_JSON_2, "2"), // Schema A
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverterTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverterTest.java
new file mode 100644
index 0000000000..c4261d3087
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverterTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.INPUT_RECORD;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.KINESIS_METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.SCHEMA_METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_ARRIVAL_TIMESTAMP;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_SHARD_ID;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_STREAM_NAME;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.createTestKinesisRecord;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.verifyMetadata;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class InjectMetadataRecordConverterTest {
+
+ private static final RecordSchema EXPECTED_SCHEMA = new
SimpleRecordSchema(List.of(
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType()),
+ new RecordField(KINESIS_METADATA,
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA))
+ ));
+
+ private static final InjectMetadataRecordConverter CONVERTER = new
InjectMetadataRecordConverter();
+
+ @Test
+ void testConvertWithApproximateArrivalTimestamp() {
+ final KinesisClientRecord kinesisRecord =
createTestKinesisRecord(TEST_ARRIVAL_TIMESTAMP);
+
+ final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord,
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+ assertEquals(EXPECTED_SCHEMA, record.getSchema());
+
+ final Map<String, Object> recordValues = new HashMap<>(record.toMap());
+ recordValues.remove(KINESIS_METADATA);
+ assertEquals(INPUT_RECORD.toMap(), recordValues);
+
+ final Record metadata = record.getAsRecord(KINESIS_METADATA,
SCHEMA_METADATA);
+ final boolean expectTimestamp = true;
+ verifyMetadata(metadata, expectTimestamp);
+ }
+
+ @Test
+ void testConvertWithoutApproximateArrivalTimestamp() {
+ final KinesisClientRecord kinesisRecord =
createTestKinesisRecord(null);
+
+ final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord,
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+ assertEquals(EXPECTED_SCHEMA, record.getSchema());
+
+ final Map<String, Object> recordValues = new HashMap<>(record.toMap());
+ recordValues.remove(KINESIS_METADATA);
+ assertEquals(INPUT_RECORD.toMap(), recordValues);
+
+ final Record metadata = record.getAsRecord(KINESIS_METADATA,
SCHEMA_METADATA);
+ final boolean expectTimestamp = false;
+ verifyMetadata(metadata, expectTimestamp);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverterTestUtil.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverterTestUtil.java
new file mode 100644
index 0000000000..12f736656a
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverterTestUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import jakarta.annotation.Nullable;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.APPROX_ARRIVAL_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class KinesisRecordConverterTestUtil {
+
+ static final String KINESIS_METADATA = "kinesisMetadata";
+
+ static final String TEST_STREAM_NAME = "test-stream";
+ static final String TEST_SHARD_ID = "shardId-000000000001";
+ static final String TEST_SEQUENCE_NUMBER =
"49590338271490256608559692538361571095921575989136588801";
+ static final long TEST_SUB_SEQUENCE_NUMBER = 2;
+ static final String TEST_PARTITION_KEY = "test-partition-key";
+ static final Instant TEST_ARRIVAL_TIMESTAMP =
Instant.ofEpochMilli(1640995200000L);
+
+ static final String EXPECTED_SHARDED_SEQUENCE_NUMBER =
"4959033827149025660855969253836157109592157598913658880100000000000000000002";
+
+ static final RecordSchema INPUT_SCHEMA = new SimpleRecordSchema(List.of(
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType())
+ ));
+
+ static final Record INPUT_RECORD = new MapRecord(INPUT_SCHEMA, Map.of(
+ "name", "John Doe",
+ "age", 30
+ ));
+
+ static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(List.of(
+ new RecordField("stream", RecordFieldType.STRING.getDataType()),
+ new RecordField("shardId", RecordFieldType.STRING.getDataType()),
+ new RecordField("sequenceNumber",
RecordFieldType.STRING.getDataType()),
+ new RecordField("subSequenceNumber",
RecordFieldType.LONG.getDataType()),
+ new RecordField("shardedSequenceNumber",
RecordFieldType.STRING.getDataType()),
+ new RecordField("partitionKey",
RecordFieldType.STRING.getDataType()),
+ new RecordField(APPROX_ARRIVAL_TIMESTAMP,
RecordFieldType.TIMESTAMP.getDataType())
+ ));
+
+ private KinesisRecordConverterTestUtil() {
+ // Utility class
+ }
+
+ static KinesisClientRecord createTestKinesisRecord(final @Nullable Instant
arrivalTimestamp) {
+ return KinesisClientRecord.builder()
+ .data(ByteBuffer.allocate(0))
+ .sequenceNumber(TEST_SEQUENCE_NUMBER)
+ .subSequenceNumber(TEST_SUB_SEQUENCE_NUMBER)
+ .partitionKey(TEST_PARTITION_KEY)
+ .approximateArrivalTimestamp(arrivalTimestamp)
+ .build();
+ }
+
+ static void verifyMetadata(final Record metadata, final boolean
expectTimestamp) {
+ assertEquals(TEST_STREAM_NAME, metadata.getValue("stream"));
+ assertEquals(TEST_SHARD_ID, metadata.getValue("shardId"));
+ assertEquals(TEST_SEQUENCE_NUMBER,
metadata.getValue("sequenceNumber"));
+ assertEquals(TEST_SUB_SEQUENCE_NUMBER,
metadata.getValue("subSequenceNumber"));
+ assertEquals(EXPECTED_SHARDED_SEQUENCE_NUMBER,
metadata.getValue("shardedSequenceNumber"));
+ assertEquals(TEST_PARTITION_KEY, metadata.getValue("partitionKey"));
+
+ if (expectTimestamp) {
+ assertEquals(TEST_ARRIVAL_TIMESTAMP.toEpochMilli(),
metadata.getValue(APPROX_ARRIVAL_TIMESTAMP));
+ } else {
+ assertNull(metadata.getValue(APPROX_ARRIVAL_TIMESTAMP));
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverterTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverterTest.java
new file mode 100644
index 0000000000..b15e1efc5c
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverterTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.List;
+
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.INPUT_RECORD;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.INPUT_SCHEMA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.KINESIS_METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.SCHEMA_METADATA;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_ARRIVAL_TIMESTAMP;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_SHARD_ID;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_STREAM_NAME;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.createTestKinesisRecord;
+import static
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.verifyMetadata;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class WrapperRecordConverterTest {
+
+ private static final RecordSchema EXPECTED_SCHEMA = new
SimpleRecordSchema(List.of(
+ new RecordField(KINESIS_METADATA,
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA)),
+ new RecordField("value",
RecordFieldType.RECORD.getRecordDataType(INPUT_SCHEMA))
+ ));
+
+ private static final WrapperRecordConverter CONVERTER = new
WrapperRecordConverter();
+
+ @Test
+ void testConvertWithApproximateArrivalTimestamp() {
+ final KinesisClientRecord kinesisRecord =
createTestKinesisRecord(TEST_ARRIVAL_TIMESTAMP);
+
+ final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord,
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+ assertEquals(EXPECTED_SCHEMA, record.getSchema());
+ assertEquals(INPUT_RECORD, record.getValue("value"));
+
+ final Record metadata = record.getAsRecord(KINESIS_METADATA,
SCHEMA_METADATA);
+ final boolean expectTimestamp = true;
+ verifyMetadata(metadata, expectTimestamp);
+ }
+
+ @Test
+ void testConvertWithoutApproximateArrivalTimestamp() {
+ final KinesisClientRecord kinesisRecord =
createTestKinesisRecord(null);
+
+ final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord,
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+ assertEquals(EXPECTED_SCHEMA, record.getSchema());
+ assertEquals(INPUT_RECORD, record.getValue("value"));
+
+ final Record metadata = record.getAsRecord(KINESIS_METADATA,
SCHEMA_METADATA);
+ final boolean expectTimestamp = false;
+ verifyMetadata(metadata, expectTimestamp);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 263ca63e09..b8885827c0 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
org.apache.nifi.processors.aws.s3.CopyS3Object
org.apache.nifi.processors.aws.s3.GetS3ObjectMetadata
org.apache.nifi.processors.aws.s3.GetS3ObjectTags