This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b96d60b244 NIFI-14839 Added Output Strategy property to ConsumeKinesis 
(#10443)
b96d60b244 is described below

commit b96d60b244cc2d2ebeea3ad31486bfd11f96f770
Author: MichaƂ Bobowski <[email protected]>
AuthorDate: Fri Oct 24 19:06:32 2025 +0200

    NIFI-14839 Added Output Strategy property to ConsumeKinesis (#10443)
    
    Co-authored-by: Mark Payne <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/aws/kinesis/ConsumeKinesis.java     |  54 +++++++-
 .../aws/kinesis/ReaderRecordProcessor.java         |   9 +-
 .../converter/InjectMetadataRecordConverter.java   |  49 ++++++++
 .../kinesis/converter/KinesisRecordConverter.java  |  25 ++++
 .../kinesis/converter/KinesisRecordMetadata.java   |  81 ++++++++++++
 .../kinesis/converter/ValueRecordConverter.java    |  28 +++++
 .../kinesis/converter/WrapperRecordConverter.java  |  53 ++++++++
 .../additionalDetails.md                           | 139 +++++++++++++++++++++
 .../aws/kinesis/ReaderRecordProcessorTest.java     |  27 ++--
 .../InjectMetadataRecordConverterTest.java         |  84 +++++++++++++
 .../converter/KinesisRecordConverterTestUtil.java  |  98 +++++++++++++++
 .../converter/WrapperRecordConverterTest.java      |  76 +++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 13 files changed, 701 insertions(+), 23 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index 8700ac343f..3b4ed83f49 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -41,6 +41,10 @@ import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredential
 import org.apache.nifi.processors.aws.kinesis.MemoryBoundRecordBuffer.Lease;
 import 
org.apache.nifi.processors.aws.kinesis.ReaderRecordProcessor.ProcessingResult;
 import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId;
+import 
org.apache.nifi.processors.aws.kinesis.converter.InjectMetadataRecordConverter;
+import org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverter;
+import org.apache.nifi.processors.aws.kinesis.converter.ValueRecordConverter;
+import org.apache.nifi.processors.aws.kinesis.converter.WrapperRecordConverter;
 import org.apache.nifi.processors.aws.region.RegionUtilV2;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxyConfigurationService;
@@ -219,6 +223,15 @@ public class ConsumeKinesis extends AbstractProcessor {
             .identifiesControllerService(RecordSetWriterFactory.class)
             .build();
 
+    static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Output Strategy")
+            .description("The format used to output a Kinesis Record into a 
FlowFile Record.")
+            .required(true)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+            .defaultValue(OutputStrategy.USE_VALUE)
+            .allowableValues(OutputStrategy.class)
+            .build();
+
     static final PropertyDescriptor INITIAL_STREAM_POSITION = new 
PropertyDescriptor.Builder()
             .name("Initial Stream Position")
             .description("The position in the stream where the processor 
should start reading.")
@@ -279,6 +292,7 @@ public class ConsumeKinesis extends AbstractProcessor {
             PROCESSING_STRATEGY,
             RECORD_READER,
             RECORD_WRITER,
+            OUTPUT_STRATEGY,
             INITIAL_STREAM_POSITION,
             STREAM_POSITION_TIMESTAMP,
             MAX_BYTES_TO_BUFFER,
@@ -443,7 +457,15 @@ public class ConsumeKinesis extends AbstractProcessor {
     private ReaderRecordProcessor createReaderRecordProcessor(final 
ProcessContext context) {
         final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory recordWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        return new ReaderRecordProcessor(recordReaderFactory, 
recordWriterFactory, getLogger());
+
+        final OutputStrategy outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+        final KinesisRecordConverter converter = switch (outputStrategy) {
+            case USE_VALUE -> new ValueRecordConverter();
+            case USE_WRAPPER -> new WrapperRecordConverter();
+            case INJECT_METADATA -> new InjectMetadataRecordConverter();
+        };
+
+        return new ReaderRecordProcessor(recordReaderFactory, converter, 
recordWriterFactory, getLogger());
     }
 
     private static InitialPositionInStreamExtended getInitialPosition(final 
ProcessContext context) {
@@ -732,6 +754,36 @@ public class ConsumeKinesis extends AbstractProcessor {
         }
     }
 
+    enum OutputStrategy implements DescribedValue {
+        USE_VALUE("Use Content as Value", "Write only the Kinesis Record value 
to the FlowFile record."),
+        USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and 
metadata into the FlowFile record. (See additional details for more 
information.)"),
+        INJECT_METADATA("Inject Metadata",
+                "Write the Kinesis Record value to the FlowFile record and add 
a sub-record to it with metadata. (See additional details for more 
information.)");
+
+        private final String displayName;
+        private final String description;
+
+        OutputStrategy(final String displayName, final String description) {
+            this.displayName = displayName;
+            this.description = description;
+        }
+
+        @Override
+        public String getValue() {
+            return name();
+        }
+
+        @Override
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+
     // Visible for tests only.
     @Nullable URI getKinesisEndpointOverride() {
         return null;
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
index 3e36475804..4596278fda 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java
@@ -20,6 +20,7 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverter;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
@@ -49,14 +50,17 @@ import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RE
 final class ReaderRecordProcessor {
 
     private final RecordReaderFactory recordReaderFactory;
+    private final KinesisRecordConverter recordConverter;
     private final RecordSetWriterFactory recordWriterFactory;
     private final ComponentLog logger;
 
     ReaderRecordProcessor(
             final RecordReaderFactory recordReaderFactory,
+            final KinesisRecordConverter recordConverter,
             final RecordSetWriterFactory recordWriterFactory,
             final ComponentLog logger) {
         this.recordReaderFactory = recordReaderFactory;
+        this.recordConverter = recordConverter;
         this.recordWriterFactory = recordWriterFactory;
         this.logger = logger;
     }
@@ -81,7 +85,8 @@ final class ReaderRecordProcessor {
 
                 Record record;
                 while ((record = reader.nextRecord()) != null) {
-                    final RecordSchema writeSchema = 
recordWriterFactory.getSchema(emptyMap(), record.getSchema());
+                    final Record convertedRecord = 
recordConverter.convert(record, kinesisRecord, streamName, shardId);
+                    final RecordSchema writeSchema = 
recordWriterFactory.getSchema(emptyMap(), convertedRecord.getSchema());
 
                     if (activeFlowFile == null) {
                         activeFlowFile = ActiveFlowFile.startNewFile(logger, 
session, recordWriterFactory, writeSchema, streamName, shardId);
@@ -93,7 +98,7 @@ final class ReaderRecordProcessor {
                         activeFlowFile = ActiveFlowFile.startNewFile(logger, 
session, recordWriterFactory, writeSchema, streamName, shardId);
                     }
 
-                    activeFlowFile.writeRecord(record, kinesisRecord);
+                    activeFlowFile.writeRecord(convertedRecord, kinesisRecord);
                 }
             } catch (final IOException | MalformedRecordException | 
SchemaNotFoundException e) {
                 logger.error("Reader or Writer failed to process Kinesis 
Record with Stream Name [{}] Shard Id [{}] Sequence Number [{}] SubSequence 
Number [{}]",
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverter.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverter.java
new file mode 100644
index 0000000000..d454577d42
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.FIELD_METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.composeMetadataObject;
+
+public final class InjectMetadataRecordConverter implements 
KinesisRecordConverter {
+
+    @Override
+    public Record convert(final Record record, final KinesisClientRecord 
kinesisRecord, final String streamName, final String shardId) {
+        final List<RecordField> schemaFields = new 
ArrayList<>(record.getSchema().getFields());
+        schemaFields.add(FIELD_METADATA);
+        final RecordSchema schema = new SimpleRecordSchema(schemaFields);
+
+        final Record metadata = composeMetadataObject(kinesisRecord, 
streamName, shardId);
+        final Map<String, Object> recordValues = new HashMap<>(record.toMap());
+        recordValues.put(METADATA, metadata);
+
+        return new MapRecord(schema, recordValues);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverter.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverter.java
new file mode 100644
index 0000000000..83a65fd81f
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public interface KinesisRecordConverter {
+
+    Record convert(Record record, KinesisClientRecord kinesisRecord, String 
streamName, String shardId);
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordMetadata.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordMetadata.java
new file mode 100644
index 0000000000..9fdc480727
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+final class KinesisRecordMetadata {
+
+    static final String METADATA = "kinesisMetadata";
+    static final String APPROX_ARRIVAL_TIMESTAMP = "approximateArrival";
+
+    private static final String STREAM = "stream";
+    private static final String SHARD_ID = "shardId";
+    private static final String SEQUENCE_NUMBER = "sequenceNumber";
+    private static final String SUB_SEQUENCE_NUMBER = "subSequenceNumber";
+    private static final String SHARDED_SEQUENCE_NUMBER = 
"shardedSequenceNumber";
+    private static final String PARTITION_KEY = "partitionKey";
+
+    private static final RecordField FIELD_STREAM = new RecordField(STREAM, 
RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_SHARD_ID = new 
RecordField(SHARD_ID, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_SEQUENCE_NUMBER = new 
RecordField(SEQUENCE_NUMBER, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_SUB_SEQUENCE_NUMBER = new 
RecordField(SUB_SEQUENCE_NUMBER, RecordFieldType.LONG.getDataType());
+    private static final RecordField FIELD_SHARDED_SEQUENCE_NUMBER = new 
RecordField(SHARDED_SEQUENCE_NUMBER, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_PARTITION_KEY = new 
RecordField(PARTITION_KEY, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_APPROX_ARRIVAL_TIMESTAMP = new 
RecordField(APPROX_ARRIVAL_TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType());
+
+    private static final RecordSchema SCHEMA_METADATA = new 
SimpleRecordSchema(List.of(
+            FIELD_STREAM,
+            FIELD_SHARD_ID,
+            FIELD_SEQUENCE_NUMBER,
+            FIELD_SUB_SEQUENCE_NUMBER,
+            FIELD_SHARDED_SEQUENCE_NUMBER,
+            FIELD_PARTITION_KEY,
+            FIELD_APPROX_ARRIVAL_TIMESTAMP));
+
+    static final RecordField FIELD_METADATA = new RecordField(METADATA, 
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
+
+    static Record composeMetadataObject(final KinesisClientRecord 
kinesisRecord, final String streamName, final String shardId) {
+        final Map<String, Object> metadata = new HashMap<>(7, 1.0f);
+
+        metadata.put(STREAM, streamName);
+        metadata.put(SHARD_ID, shardId);
+        metadata.put(SEQUENCE_NUMBER, kinesisRecord.sequenceNumber());
+        metadata.put(SUB_SEQUENCE_NUMBER, kinesisRecord.subSequenceNumber());
+        metadata.put(SHARDED_SEQUENCE_NUMBER, 
"%s%020d".formatted(kinesisRecord.sequenceNumber(), 
kinesisRecord.subSequenceNumber()));
+        metadata.put(PARTITION_KEY, kinesisRecord.partitionKey());
+
+        if (kinesisRecord.approximateArrivalTimestamp() != null) {
+            metadata.put(APPROX_ARRIVAL_TIMESTAMP, 
kinesisRecord.approximateArrivalTimestamp().toEpochMilli());
+        }
+
+        return new MapRecord(SCHEMA_METADATA, metadata);
+    }
+
+    private KinesisRecordMetadata() {
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/ValueRecordConverter.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/ValueRecordConverter.java
new file mode 100644
index 0000000000..b67ae22072
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/ValueRecordConverter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public final class ValueRecordConverter implements KinesisRecordConverter {
+
+    @Override
+    public Record convert(final Record record, final KinesisClientRecord 
kinesisRecord, final String streamName, final String shardId) {
+        return record;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverter.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverter.java
new file mode 100644
index 0000000000..693077f443
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.FIELD_METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.composeMetadataObject;
+
+public final class WrapperRecordConverter implements KinesisRecordConverter {
+
+    private static final String VALUE = "value";
+
+    @Override
+    public Record convert(final Record record, final KinesisClientRecord 
kinesisRecord, final String streamName, final String shardId) {
+        final Record metadata = composeMetadataObject(kinesisRecord, 
streamName, shardId);
+
+        final RecordSchema convertedSchema = new SimpleRecordSchema(List.of(
+                FIELD_METADATA,
+                new RecordField(VALUE, 
RecordFieldType.RECORD.getRecordDataType(record.getSchema())))
+        );
+        final Map<String, Object> convertedRecord = Map.of(
+                METADATA, metadata,
+                VALUE, record
+        );
+
+        return new MapRecord(convertedSchema, convertedRecord);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
index 5fb223ab89..fee288941b 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
@@ -134,3 +134,142 @@ preserves record ordering for a single Shard, even with 
record schema changes.
 **Please note**, the processor relies on _Record Reader_ to provide correct 
schema for each record.
 Using a Reader with schema inference may produce a lot of different schemas, 
which may lead to excessive FlowFile creation.
 If performance is a concern, it is recommended to use a Reader with a 
predefined schema or schema registry.
+
+### Output Strategies
+
+This processor offers multiple strategies configured via processor property 
_Output Strategy_ for converting Kinesis records into FlowFiles.
+
+- _Use Content as Value_ (the default) writes only a Kinesis record value to a 
FlowFile.
+- _Use Wrapper_ writes a Kinesis Record value as well as metadata into 
separate fields of a FlowFile record.
+- _Inject Metadata_ writes a Kinesis Record value to a FlowFile record and 
adds a sub-record to it with metadata.
+
+The written metadata includes the following fields:
+- _stream_: The name of the Kinesis stream the record was received from.
+- _shardId_: The identifier of the shard the record was received from.
+- _sequenceNumber_: The sequence number of the record.
+- _subSequenceNumber_: The subsequence number of the record, used when 
multiple smaller records are aggregated into a single Kinesis record. If a 
record was not part of a batch, this value will be 0.
+- _shardedSequenceNumber_: A combination of the sequence number and 
subsequence number. This can be used to uniquely identify a record within a 
shard.
+- _partitionKey_: The partition key of the record.
+- _approximateArrival_: The approximate arrival timestamp of the record (in 
milliseconds since epoch).
+
+The record schema that is used when _Use Wrapper_ is selected is as follows 
(in Avro format):
+
+```json
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [
+    {
+      "name": "value",
+      "type": [
+        {
+          < Fields as determined by the Record Reader for a Kinesis message >
+        },
+        "null"
+      ]
+    },
+    {
+      "name": "kinesisMetadata",
+      "type": [
+        {
+          "type": "record",
+          "name": "metadataType",
+          "fields": [
+            { "name": "stream", "type": ["string", "null"] },
+            { "name": "shardId", "type": ["string", "null"] },
+            { "name": "sequenceNumber", "type": ["string", "null"] },
+            { "name": "subSequenceNumber", "type": ["long", "null"] },
+            { "name": "shardedSequenceNumber", "type": ["string", "null"] },
+            { "name": "partitionKey", "type": ["string", "null"] },
+            { "name": "approximateArrival", "type": [ { "type": "long", 
"logicalType": "timestamp-millis" }, "null" ] }
+          ]
+        },
+        "null"
+      ]
+    }
+  ]
+}
+```
+
+The record schema that is used when "Inject Metadata" is selected is as 
follows (in Avro format):
+
+```json
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [
+    < Fields as determined by the Record Reader for a Kinesis message >,
+    {
+      "name": "kinesisMetadata",
+      "type": [
+        {
+          "type": "record",
+          "name": "metadataType",
+          "fields": [
+            { "name": "stream", "type": ["string", "null"] },
+            { "name": "shardId", "type": ["string", "null"] },
+            { "name": "sequenceNumber", "type": ["string", "null"] },
+            { "name": "subSequenceNumber", "type": ["long", "null"] },
+            { "name": "shardedSequenceNumber", "type": ["string", "null"] },
+            { "name": "partitionKey", "type": ["string", "null"] },
+            { "name": "approximateArrival", "type": [ { "type": "long", 
"logicalType": "timestamp-millis" }, "null" ] }
+          ]
+        },
+        "null"
+      ]
+    }
+  ]
+}
+```
+
+Here is an example of FlowFile content that is emitted by JsonRecordSetWriter 
when strategy _Use Wrapper_ is selected:
+
+```json
+[
+  {
+    "value": {
+      "address": "1234 First Street",
+      "zip": "12345",
+      "account": {
+        "name": "Acme",
+        "number": "AC1234"
+      }
+    },
+    "kinesisMetadata" : {
+      "stream" : "stream-name",
+      "shardId" : "shardId-000000000000",
+      "sequenceNumber" : "123456789",
+      "subSequenceNumber" : 3,
+      "shardedSequenceNumber" : "12345678900000000000000000003",
+      "partitionKey" : "123",
+      "approximateArrival" : 1756459596788
+    }
+  }
+]
+```
+
+Here is an example of FlowFile content that is emitted by JsonRecordSetWriter 
when strategy _Inject Metadata_ is selected:
+
+```json
+[
+  {
+    "address": "1234 First Street",
+    "zip": "12345",
+    "account": {
+      "name": "Acme",
+      "number": "AC1234"
+    },
+    "kinesisMetadata" : {
+      "stream" : "stream-name",
+      "shardId" : "shardId-000000000000",
+      "sequenceNumber" : "123456789",
+      "subSequenceNumber" : 3,
+      "shardedSequenceNumber" : "12345678900000000000000000003",
+      "partitionKey" : "123",
+      "approximateArrival" : 1756459596788
+    }
+  }
+]
+```
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
index 48a4abd687..f685585788 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.logging.ComponentLog;
 import 
org.apache.nifi.processors.aws.kinesis.ReaderRecordProcessor.ProcessingResult;
+import org.apache.nifi.processors.aws.kinesis.converter.ValueRecordConverter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.inference.SchemaInferenceUtil;
@@ -83,8 +84,8 @@ class ReaderRecordProcessorTest {
     private MockProcessSession session;
     private ComponentLog logger;
 
-    private JsonTreeReader jsonReader;
     private JsonRecordSetWriter jsonWriter;
+    private ReaderRecordProcessor processor;
 
     @BeforeEach
     void setUp() throws InitializationException {
@@ -93,7 +94,7 @@ class ReaderRecordProcessorTest {
         session = new MockProcessSession(sharedState, runner.getProcessor());
         logger = runner.getLogger();
 
-        jsonReader = new JsonTreeReader();
+        final JsonTreeReader jsonReader = new JsonTreeReader();
         runner.addControllerService("json-reader", jsonReader);
         runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA.getValue());
         runner.enableControllerService(jsonReader);
@@ -102,12 +103,12 @@ class ReaderRecordProcessorTest {
         runner.addControllerService("json-writer", jsonWriter);
         runner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA.getValue());
         runner.enableControllerService(jsonWriter);
+
+        processor = new ReaderRecordProcessor(jsonReader, new 
ValueRecordConverter(), jsonWriter, logger);
     }
 
     @Test
     void testProcessSingleRecord() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final KinesisClientRecord record = KinesisClientRecord.builder()
                 .data(ByteBuffer.wrap(USER_JSON_1.getBytes(UTF_8)))
                 .sequenceNumber("1")
@@ -143,8 +144,6 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testProcessMultipleRecordsWithSameSchema() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final List<KinesisClientRecord> records = List.of(
                 createKinesisRecord(USER_JSON_1, "1"),
                 createKinesisRecord(USER_JSON_2, "2"),
@@ -171,8 +170,6 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testEmptyRecordsList() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final ProcessingResult result = processor.processRecords(session, 
TEST_STREAM_NAME, TEST_SHARD_ID, Collections.emptyList());
 
         assertEquals(0, result.successFlowFiles().size());
@@ -181,8 +178,6 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testSchemaChangeCreatesNewFlowFile() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final List<KinesisClientRecord> records = List.of(
                 createKinesisRecord(USER_JSON_1, "1"),
                 createKinesisRecord(CITY_JSON_1, "2")
@@ -204,8 +199,6 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testSchemaChangeWithMultipleRecordsInBetween() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final List<KinesisClientRecord> records = List.of(
                 createKinesisRecord(USER_JSON_1, "1"),
                 createKinesisRecord(USER_JSON_2, "2"),
@@ -229,8 +222,6 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testSingleMalformedRecord() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final List<KinesisClientRecord> records = List.of(
                 createKinesisRecord(INVALID_JSON, "1")
         );
@@ -251,8 +242,6 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testMalformedRecordBetweenValid() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final List<KinesisClientRecord> records = List.of(
                 createKinesisRecord(USER_JSON_1, "1"),
                 createKinesisRecord(INVALID_JSON, "2"),
@@ -290,7 +279,7 @@ class ReaderRecordProcessorTest {
             }
         };
 
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(failingReaderFactory, jsonWriter, logger);
+        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(failingReaderFactory, new ValueRecordConverter(), 
jsonWriter, logger);
 
         final KinesisClientRecord record = createKinesisRecord(USER_JSON_1, 
"1");
         final List<KinesisClientRecord> records = List.of(record);
@@ -307,7 +296,7 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testMalformedRecordExceptionDuringReading() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(getMalformedRecordExceptionReader(), jsonWriter, logger);
+        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(getMalformedRecordExceptionReader(), new 
ValueRecordConverter(), jsonWriter, logger);
 
         final KinesisClientRecord record = createKinesisRecord(USER_JSON_1, 
"1");
         final List<KinesisClientRecord> records = 
Collections.singletonList(record);
@@ -324,8 +313,6 @@ class ReaderRecordProcessorTest {
 
     @Test
     void testInvalidRecordsWithSchemaEvolution() {
-        final ReaderRecordProcessor processor = new 
ReaderRecordProcessor(jsonReader, jsonWriter, logger);
-
         final List<KinesisClientRecord> records = List.of(
                 createKinesisRecord(USER_JSON_1, "1"), // Schema A
                 createKinesisRecord(USER_JSON_2, "2"), // Schema A
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverterTest.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverterTest.java
new file mode 100644
index 0000000000..c4261d3087
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/InjectMetadataRecordConverterTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.INPUT_RECORD;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.KINESIS_METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.SCHEMA_METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_ARRIVAL_TIMESTAMP;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_SHARD_ID;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_STREAM_NAME;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.createTestKinesisRecord;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.verifyMetadata;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class InjectMetadataRecordConverterTest {
+
+    private static final RecordSchema EXPECTED_SCHEMA = new 
SimpleRecordSchema(List.of(
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType()),
+            new RecordField(KINESIS_METADATA, 
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA))
+    ));
+
+    private static final InjectMetadataRecordConverter CONVERTER = new 
InjectMetadataRecordConverter();
+
+    @Test
+    void testConvertWithApproximateArrivalTimestamp() {
+        final KinesisClientRecord kinesisRecord = 
createTestKinesisRecord(TEST_ARRIVAL_TIMESTAMP);
+
+        final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord, 
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+        assertEquals(EXPECTED_SCHEMA, record.getSchema());
+
+        final Map<String, Object> recordValues = new HashMap<>(record.toMap());
+        recordValues.remove(KINESIS_METADATA);
+        assertEquals(INPUT_RECORD.toMap(), recordValues);
+
+        final Record metadata = record.getAsRecord(KINESIS_METADATA, 
SCHEMA_METADATA);
+        final boolean expectTimestamp = true;
+        verifyMetadata(metadata, expectTimestamp);
+    }
+
+    @Test
+    void testConvertWithoutApproximateArrivalTimestamp() {
+        final KinesisClientRecord kinesisRecord = 
createTestKinesisRecord(null);
+
+        final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord, 
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+        assertEquals(EXPECTED_SCHEMA, record.getSchema());
+
+        final Map<String, Object> recordValues = new HashMap<>(record.toMap());
+        recordValues.remove(KINESIS_METADATA);
+        assertEquals(INPUT_RECORD.toMap(), recordValues);
+
+        final Record metadata = record.getAsRecord(KINESIS_METADATA, 
SCHEMA_METADATA);
+        final boolean expectTimestamp = false;
+        verifyMetadata(metadata, expectTimestamp);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverterTestUtil.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverterTestUtil.java
new file mode 100644
index 0000000000..12f736656a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/KinesisRecordConverterTestUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import jakarta.annotation.Nullable;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordMetadata.APPROX_ARRIVAL_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class KinesisRecordConverterTestUtil {
+
+    static final String KINESIS_METADATA = "kinesisMetadata";
+
+    static final String TEST_STREAM_NAME = "test-stream";
+    static final String TEST_SHARD_ID = "shardId-000000000001";
+    static final String TEST_SEQUENCE_NUMBER = 
"49590338271490256608559692538361571095921575989136588801";
+    static final long TEST_SUB_SEQUENCE_NUMBER = 2;
+    static final String TEST_PARTITION_KEY = "test-partition-key";
+    static final Instant TEST_ARRIVAL_TIMESTAMP = 
Instant.ofEpochMilli(1640995200000L);
+
+    static final String EXPECTED_SHARDED_SEQUENCE_NUMBER = 
"4959033827149025660855969253836157109592157598913658880100000000000000000002";
+
+    static final RecordSchema INPUT_SCHEMA = new SimpleRecordSchema(List.of(
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType())
+    ));
+
+    static final Record INPUT_RECORD = new MapRecord(INPUT_SCHEMA, Map.of(
+            "name", "John Doe",
+            "age", 30
+    ));
+
+    static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(List.of(
+            new RecordField("stream", RecordFieldType.STRING.getDataType()),
+            new RecordField("shardId", RecordFieldType.STRING.getDataType()),
+            new RecordField("sequenceNumber", 
RecordFieldType.STRING.getDataType()),
+            new RecordField("subSequenceNumber", 
RecordFieldType.LONG.getDataType()),
+            new RecordField("shardedSequenceNumber", 
RecordFieldType.STRING.getDataType()),
+            new RecordField("partitionKey", 
RecordFieldType.STRING.getDataType()),
+            new RecordField(APPROX_ARRIVAL_TIMESTAMP, 
RecordFieldType.TIMESTAMP.getDataType())
+    ));
+
+    private KinesisRecordConverterTestUtil() {
+        // Utility class
+    }
+
+    static KinesisClientRecord createTestKinesisRecord(final @Nullable Instant 
arrivalTimestamp) {
+        return KinesisClientRecord.builder()
+                .data(ByteBuffer.allocate(0))
+                .sequenceNumber(TEST_SEQUENCE_NUMBER)
+                .subSequenceNumber(TEST_SUB_SEQUENCE_NUMBER)
+                .partitionKey(TEST_PARTITION_KEY)
+                .approximateArrivalTimestamp(arrivalTimestamp)
+                .build();
+    }
+
+    static void verifyMetadata(final Record metadata, final boolean 
expectTimestamp) {
+        assertEquals(TEST_STREAM_NAME, metadata.getValue("stream"));
+        assertEquals(TEST_SHARD_ID, metadata.getValue("shardId"));
+        assertEquals(TEST_SEQUENCE_NUMBER, 
metadata.getValue("sequenceNumber"));
+        assertEquals(TEST_SUB_SEQUENCE_NUMBER, 
metadata.getValue("subSequenceNumber"));
+        assertEquals(EXPECTED_SHARDED_SEQUENCE_NUMBER, 
metadata.getValue("shardedSequenceNumber"));
+        assertEquals(TEST_PARTITION_KEY, metadata.getValue("partitionKey"));
+
+        if (expectTimestamp) {
+            assertEquals(TEST_ARRIVAL_TIMESTAMP.toEpochMilli(), 
metadata.getValue(APPROX_ARRIVAL_TIMESTAMP));
+        } else {
+            assertNull(metadata.getValue(APPROX_ARRIVAL_TIMESTAMP));
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverterTest.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverterTest.java
new file mode 100644
index 0000000000..b15e1efc5c
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/converter/WrapperRecordConverterTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.List;
+
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.INPUT_RECORD;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.INPUT_SCHEMA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.KINESIS_METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.SCHEMA_METADATA;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_ARRIVAL_TIMESTAMP;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_SHARD_ID;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.TEST_STREAM_NAME;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.createTestKinesisRecord;
+import static 
org.apache.nifi.processors.aws.kinesis.converter.KinesisRecordConverterTestUtil.verifyMetadata;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class WrapperRecordConverterTest {
+
+    private static final RecordSchema EXPECTED_SCHEMA = new 
SimpleRecordSchema(List.of(
+            new RecordField(KINESIS_METADATA, 
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA)),
+            new RecordField("value", 
RecordFieldType.RECORD.getRecordDataType(INPUT_SCHEMA))
+    ));
+
+    private static final WrapperRecordConverter CONVERTER = new 
WrapperRecordConverter();
+
+    @Test
+    void testConvertWithApproximateArrivalTimestamp() {
+        final KinesisClientRecord kinesisRecord = 
createTestKinesisRecord(TEST_ARRIVAL_TIMESTAMP);
+
+        final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord, 
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+        assertEquals(EXPECTED_SCHEMA, record.getSchema());
+        assertEquals(INPUT_RECORD, record.getValue("value"));
+
+        final Record metadata = record.getAsRecord(KINESIS_METADATA, 
SCHEMA_METADATA);
+        final boolean expectTimestamp = true;
+        verifyMetadata(metadata, expectTimestamp);
+    }
+
+    @Test
+    void testConvertWithoutApproximateArrivalTimestamp() {
+        final KinesisClientRecord kinesisRecord = 
createTestKinesisRecord(null);
+
+        final Record record = CONVERTER.convert(INPUT_RECORD, kinesisRecord, 
TEST_STREAM_NAME, TEST_SHARD_ID);
+
+        assertEquals(EXPECTED_SCHEMA, record.getSchema());
+        assertEquals(INPUT_RECORD, record.getValue("value"));
+
+        final Record metadata = record.getAsRecord(KINESIS_METADATA, 
SCHEMA_METADATA);
+        final boolean expectTimestamp = false;
+        verifyMetadata(metadata, expectTimestamp);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 263ca63e09..b8885827c0 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 org.apache.nifi.processors.aws.s3.CopyS3Object
 org.apache.nifi.processors.aws.s3.GetS3ObjectMetadata
 org.apache.nifi.processors.aws.s3.GetS3ObjectTags

Reply via email to