This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d7805f203 NIFI-14285 Added Output Strategy with Wrapper to 
ConsumeKinesisStream (#9738)
6d7805f203 is described below

commit 6d7805f2035f9d14683795cebfd2bd3b98634b67
Author: Dariusz Seweryn <[email protected]>
AuthorDate: Tue Mar 4 22:11:19 2025 +0100

    NIFI-14285 Added Output Strategy with Wrapper to ConsumeKinesisStream 
(#9738)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../aws/kinesis/property/OutputStrategy.java       | 50 +++++++++++++++
 .../aws/kinesis/stream/ConsumeKinesisStream.java   | 24 +++++--
 .../record/AbstractKinesisRecordProcessor.java     |  4 ++
 .../record/KinesisRecordProcessorRecord.java       | 15 +++--
 .../stream/record/converter/RecordConverter.java   | 25 ++++++++
 .../record/converter/RecordConverterIdentity.java  | 27 ++++++++
 .../record/converter/RecordConverterWrapper.java   | 73 ++++++++++++++++++++++
 .../additionalDetails.md                           |  7 ++-
 .../record/TestKinesisRecordProcessorRecord.java   | 67 ++++++++++++++------
 9 files changed, 263 insertions(+), 29 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/OutputStrategy.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/OutputStrategy.java
new file mode 100644
index 0000000000..885acbdd69
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/property/OutputStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Kinesis Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {
+    USE_VALUE("Use Content as Value", "Write only the Kinesis Record value to 
the FlowFile record."),
+    USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and metadata 
into the FlowFile record.");
+
+    private final String displayName;
+    private final String description;
+
+    OutputStrategy(final String displayName, final String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
index 3771e3835a..75ba3496dd 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
@@ -48,9 +48,13 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.kinesis.property.OutputStrategy;
 import 
org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
 import 
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRaw;
 import 
org.apache.nifi.processors.aws.kinesis.stream.record.KinesisRecordProcessorRecord;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterIdentity;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterWrapper;
 import org.apache.nifi.processors.aws.v2.AbstractAwsAsyncProcessor;
 import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -107,7 +111,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @TriggerSerially
@@ -304,6 +307,15 @@ public class ConsumeKinesisStream extends 
AbstractAwsAsyncProcessor<KinesisAsync
             .required(true)
             .build();
 
+    public static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Output Strategy")
+            .description("The format used to output the Kinesis Record into a 
FlowFile Record.")
+            .required(true)
+            .defaultValue(OutputStrategy.USE_VALUE)
+            .allowableValues(OutputStrategy.class)
+            .dependsOn(RECORD_WRITER)
+            .build();
+
     public static final Relationship REL_PARSE_FAILURE = new 
Relationship.Builder()
             .name("parse.failure")
             .description("If a message from Kinesis cannot be parsed using the 
configured Record Reader" +
@@ -317,6 +329,7 @@ public class ConsumeKinesisStream extends 
AbstractAwsAsyncProcessor<KinesisAsync
             APPLICATION_NAME,
             RECORD_READER,
             RECORD_WRITER,
+            OUTPUT_STRATEGY,
             REGION,
             ENDPOINT_OVERRIDE,
             DYNAMODB_ENDPOINT_OVERRIDE,
@@ -645,8 +658,7 @@ public class ConsumeKinesisStream extends 
AbstractAwsAsyncProcessor<KinesisAsync
                 .keySet()
                 .stream()
                 .filter(PropertyDescriptor::isDynamic)
-                .collect(Collectors.toList());
-
+                .toList();
 
         final RetrievalConfig retrievalConfig = 
configsBuilder.retrievalConfig()
                 .retrievalSpecificConfig(new PollingConfig(streamName, 
kinesisClient));
@@ -702,11 +714,15 @@ public class ConsumeKinesisStream extends 
AbstractAwsAsyncProcessor<KinesisAsync
     private ShardRecordProcessorFactory prepareRecordProcessorFactory(final 
ProcessContext context, final ProcessSessionFactory sessionFactory) {
         return () -> {
             if (isRecordReaderSet && isRecordWriterSet) {
+                final OutputStrategy outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+                final RecordConverter recordConverter = 
OutputStrategy.USE_WRAPPER == outputStrategy
+                        ? new RecordConverterWrapper()
+                        : new RecordConverterIdentity();
                 return new KinesisRecordProcessorRecord(
                         sessionFactory, getLogger(), getStreamName(context), 
getEndpointPrefix(context),
                         getKinesisEndpoint(context).orElse(null), 
getCheckpointIntervalMillis(context),
                         getRetryWaitMillis(context), getNumRetries(context), 
getDateTimeFormatter(context),
-                        getReaderFactory(context), getWriterFactory(context)
+                        getReaderFactory(context), getWriterFactory(context), 
recordConverter
                 );
             } else {
                 return new KinesisRecordProcessorRaw(
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
index ace24b71ae..d219067309 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java
@@ -313,6 +313,10 @@ public abstract class AbstractKinesisRecordProcessor 
implements ShardRecordProce
         return log;
     }
 
+    String getStreamName() {
+        return streamName;
+    }
+
     String getKinesisShardId() {
         return kinesisShardId;
     }
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
index 03ba1ca164..d39d791e34 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java
@@ -22,6 +22,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
@@ -30,6 +31,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StopWatch;
 import software.amazon.kinesis.retrieval.KinesisClientRecord;
@@ -52,18 +54,21 @@ public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor
 
     private RecordSetWriter writer;
     private OutputStream outputStream;
+    private final RecordConverter recordConverter;
 
     public KinesisRecordProcessorRecord(final ProcessSessionFactory 
sessionFactory, final ComponentLog log, final String streamName,
                                         final String endpointPrefix, final 
String kinesisEndpoint,
                                         final long checkpointIntervalMillis, 
final long retryWaitMillis,
                                         final int numRetries, final 
DateTimeFormatter dateTimeFormatter,
-                                        final RecordReaderFactory 
readerFactory, final RecordSetWriterFactory writerFactory) {
+                                        final RecordReaderFactory 
readerFactory, final RecordSetWriterFactory writerFactory,
+                                        final RecordConverter recordConverter) 
{
         super(sessionFactory, log, streamName, endpointPrefix, 
kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis,
                 numRetries, dateTimeFormatter);
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
 
         schemaRetrievalVariables = 
Collections.singletonMap(KINESIS_RECORD_SCHEMA_KEY, streamName);
+        this.recordConverter = recordConverter;
     }
 
     @Override
@@ -88,9 +93,10 @@ public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor
         try (final InputStream in = new ByteArrayInputStream(data);
              final RecordReader reader = 
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length, 
getLogger())
         ) {
-            org.apache.nifi.serialization.record.Record outputRecord;
+            Record intermediateRecord;
             final PushBackRecordSet recordSet = new 
PushBackRecordSet(reader.createRecordSet());
-            while ((outputRecord = recordSet.next()) != null) {
+            while ((intermediateRecord = recordSet.next()) != null) {
+                Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
                 if (flowFiles.isEmpty()) {
                     flowFile = session.create();
                     flowFiles.add(flowFile);
@@ -121,8 +127,7 @@ public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor
         }
     }
 
-    private void createWriter(final FlowFile flowFile, final ProcessSession 
session,
-                              final 
org.apache.nifi.serialization.record.Record outputRecord)
+    private void createWriter(final FlowFile flowFile, final ProcessSession 
session, final Record outputRecord)
             throws IOException, SchemaNotFoundException {
 
         final RecordSchema readerSchema = outputRecord.getSchema();
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverter.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverter.java
new file mode 100644
index 0000000000..886c2e5cbd
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public interface RecordConverter {
+
+    Record convert(final Record record, final KinesisClientRecord 
kinesisRecord, final String streamName, final String shardId);
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterIdentity.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterIdentity.java
new file mode 100644
index 0000000000..b2ac42113e
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterIdentity.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record.converter;
+
+import org.apache.nifi.serialization.record.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+public class RecordConverterIdentity implements RecordConverter {
+    @Override
+    public Record convert(final Record record, final KinesisClientRecord 
kinesisRecord, final String streamName, final String shardId) {
+        return record;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterWrapper.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterWrapper.java
new file mode 100644
index 0000000000..208eb5dcf4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/converter/RecordConverterWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream.record.converter;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordConverterWrapper implements RecordConverter {
+
+    private static final String VALUE = "value";
+    private static final String METADATA = "metadata";
+
+    private static final String STREAM = "stream";
+    private static final String SHARD_ID = "shardId";
+    private static final String SEQUENCE_NUMBER = "sequenceNumber";
+    private static final String PARTITION_KEY = "partitionKey";
+    private static final String APPROX_ARRIVAL_TIMESTAMP = 
"approximateArrival";
+
+    private static final RecordField FIELD_STREAM = new RecordField(STREAM, 
RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_SHARD_ID = new 
RecordField(SHARD_ID, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_SEQUENCE_NUMBER = new 
RecordField(SEQUENCE_NUMBER, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_PARTITION_KEY = new 
RecordField(PARTITION_KEY, RecordFieldType.STRING.getDataType());
+    private static final RecordField FIELD_APPROX_ARRIVAL_TIMESTAMP = new 
RecordField(APPROX_ARRIVAL_TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType());
+    private static final RecordSchema SCHEMA_METADATA = new 
SimpleRecordSchema(Arrays.asList(
+            FIELD_STREAM, FIELD_SHARD_ID, FIELD_SEQUENCE_NUMBER, 
FIELD_PARTITION_KEY, FIELD_APPROX_ARRIVAL_TIMESTAMP));
+
+    public static final RecordField FIELD_METADATA = new RecordField(METADATA, 
RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
+
+
+    @Override
+    public Record convert(final Record valueRecord, final KinesisClientRecord 
kinesisRecord, final String streamName, final String shardId) {
+        final Map<String, Object> metadata = new LinkedHashMap<>();
+        metadata.put(STREAM, streamName);
+        metadata.put(SHARD_ID, shardId);
+        metadata.put(SEQUENCE_NUMBER, kinesisRecord.sequenceNumber());
+        metadata.put(PARTITION_KEY, kinesisRecord.partitionKey());
+        final Instant approxArrivalTimestamp = 
kinesisRecord.approximateArrivalTimestamp();
+        metadata.put(APPROX_ARRIVAL_TIMESTAMP, approxArrivalTimestamp == null 
? null : approxArrivalTimestamp.toEpochMilli());
+        final Record metadataRecord = new MapRecord(SCHEMA_METADATA, metadata);
+
+        return new MapRecord(convertToWriteSchema(valueRecord.getSchema()), 
Map.of(METADATA, metadataRecord, VALUE, valueRecord));
+    }
+
+    private RecordSchema convertToWriteSchema(final RecordSchema readerSchema) 
{
+        final RecordField recordField = new RecordField(VALUE, 
RecordFieldType.RECORD.getRecordDataType(readerSchema));
+        return new SimpleRecordSchema(List.of(FIELD_METADATA, recordField));
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
index d7363cf9fb..24bee7cc8e 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream/additionalDetails.md
@@ -36,4 +36,9 @@ within the batch of Kinesis Records (messages), instead of a 
separate FlowFile p
 
 The FlowFiles emitted in this mode will include the standard `record.*` 
attributes along with the same Kinesis Shard ID,
 Sequence Number and Approximate Arrival Timestamp; but the values will relate 
to the **last** Kinesis Record that was
-processed in the batch of messages constituting the content of the FlowFile.
\ No newline at end of file
+processed in the batch of messages constituting the content of the FlowFile.
+
+Once a Record Writer is set the Output Strategy can be set to `Use Wrapper` or 
`Use Content`. When `Use Wrapper` is 
+picked the original content of the Kinesis Record will be wrapped under 
`value` key and an additional `metadata`
+key will be populated with Stream Name, Shard ID, Partition Key, Sequence 
Number and Approximate Arrival Timestamp. 
+When `Use Content` is picked the original content of the Kinesis Record will 
be used as is.
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
index 81296efa37..f877362c65 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java
@@ -21,6 +21,8 @@ import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterIdentity;
+import 
org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverterWrapper;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.inference.SchemaInferenceUtil;
@@ -101,7 +103,7 @@ public class TestKinesisRecordProcessorRecord {
         // default test fixture will try operations twice with very little 
wait in between
         fixture = new KinesisRecordProcessorRecord(processSessionFactory, 
runner.getLogger(), "kinesis-test",
                 "endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER,
-                reader, writer);
+                reader, writer, new RecordConverterIdentity());
     }
 
     @AfterEach
@@ -133,17 +135,28 @@ public class TestKinesisRecordProcessorRecord {
 
     @Test
     public void testProcessRecordsNoCheckpoint() {
-        processMultipleRecordsAssertProvenance(false);
+        processMultipleRecordsAssertProvenance(false, false);
     }
 
     @Test
     public void testProcessRecordsWithEndpointOverride() {
-        processMultipleRecordsAssertProvenance(true);
+        processMultipleRecordsAssertProvenance(true, false);
     }
 
-    private void processMultipleRecordsAssertProvenance(final boolean 
endpointOverridden) {
-        final Date firstDate = Date.from(Instant.now().minus(1, 
ChronoUnit.MINUTES));
-        final Date secondDate = new Date();
+    @Test
+    public void testProcessWrappedRecordsNoCheckpoint() {
+        processMultipleRecordsAssertProvenance(false, true);
+    }
+
+    @Test
+    public void testProcessWrappedRecordsWithEndpointOverride() {
+        processMultipleRecordsAssertProvenance(true, true);
+    }
+
+    private void processMultipleRecordsAssertProvenance(final boolean 
endpointOverridden, final boolean useWrapper) {
+        final Instant referenceInstant = 
Instant.parse("2021-01-01T00:00:00.000Z");
+        final Date firstDate = Date.from(referenceInstant.minus(1, 
ChronoUnit.MINUTES));
+        final Date secondDate = Date.from(referenceInstant);
 
         final ProcessRecordsInput processRecordsInput = 
ProcessRecordsInput.builder()
                 .records(Arrays.asList(
@@ -170,11 +183,9 @@ public class TestKinesisRecordProcessorRecord {
                 .build();
 
         final String transitUriPrefix = endpointOverridden ? 
"https://another-endpoint.com:8443"; : "http://endpoint-prefix.amazonaws.com";;
-        if (endpointOverridden) {
-            fixture = new KinesisRecordProcessorRecord(processSessionFactory, 
runner.getLogger(), "kinesis-test",
-                    "endpoint-prefix", "https://another-endpoint.com:8443";, 
10_000L, 1L, 2, DATE_TIME_FORMATTER,
-                    reader, writer);
-        }
+        fixture = new KinesisRecordProcessorRecord(processSessionFactory, 
runner.getLogger(), "kinesis-test",
+                "endpoint-prefix", transitUriPrefix, 10_000L, 1L, 2, 
DATE_TIME_FORMATTER,
+                reader, writer, useWrapper ? new RecordConverterWrapper() : 
new RecordConverterIdentity());
 
         // skip checkpoint
         fixture.setNextCheckpointTimeInMillis(System.currentTimeMillis() + 
10_000L);
@@ -190,16 +201,34 @@ public class TestKinesisRecordProcessorRecord {
 
         final List<MockFlowFile> flowFiles = 
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
         // 4 records in single output file, attributes equating to that of the 
last record
-        assertFlowFile(flowFiles.get(0), secondDate, "partition-2", "2", 
"another-shard", "{\"record\":\"1\"}\n" +
-                "{\"record\":\"1b\"}\n" +
-                "{\"record\":\"no-date\"}\n" +
-                "{\"record\":\"2\"}", 4);
+        final String expectedContent = useWrapper ? 
expectRecordContentWrapper() : expectRecordContentIdentity();
+        assertFlowFile(flowFiles.getFirst(), secondDate, "partition-2", "2", 
"another-shard", expectedContent, 4);
         session.assertTransferCount(ConsumeKinesisStream.REL_PARSE_FAILURE, 0);
 
         session.assertCommitted();
         session.assertNotRolledBack();
     }
 
+    private String expectRecordContentIdentity() {
+        return """
+                {"record":"1"}
+                {"record":"1b"}
+                {"record":"no-date"}
+                {"record":"2"}""";
+    }
+
+    private String expectRecordContentWrapper() {
+        return """
+                
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"1","partitionKey":"partition-1",\
+                "approximateArrival":1609459140000},"value":{"record":"1"}}
+                
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"1","partitionKey":"partition-1",\
+                "approximateArrival":1609459140000},"value":{"record":"1b"}}
+                
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"no-date","partitionKey":"partition-no-date",\
+                "approximateArrival":null},"value":{"record":"no-date"}}
+                
{"metadata":{"stream":"kinesis-test","shardId":"another-shard","sequenceNumber":"2","partitionKey":"partition-2",\
+                "approximateArrival":1609459200000},"value":{"record":"2"}}""";
+    }
+
     @Test
     public void testProcessPoisonPillRecordButNoRawOutputWithCheckpoint() 
throws ShutdownException, InvalidStateException {
         final ProcessRecordsInput processRecordsInput = 
ProcessRecordsInput.builder()
@@ -235,7 +264,7 @@ public class TestKinesisRecordProcessorRecord {
         session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 1);
         final List<MockFlowFile> flowFiles = 
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
         // 2 successful records in single output file, attributes equating to 
that of the last successful record
-        assertFlowFile(flowFiles.get(0), null, "partition-3", "3", 
"test-shard", "{\"record\":\"1\"}\n" +
+        assertFlowFile(flowFiles.getFirst(), null, "partition-3", "3", 
"test-shard", "{\"record\":\"1\"}\n" +
                 "{\"record\":\"3\"}", 2);
 
         // check no poison-pill output (as the raw data could not be retrieved)
@@ -288,14 +317,14 @@ public class TestKinesisRecordProcessorRecord {
         session.assertTransferCount(ConsumeKinesisStream.REL_SUCCESS, 1);
         final List<MockFlowFile> flowFiles = 
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_SUCCESS);
         // 2 successful records in single output file, attributes equating to 
that of the last successful record
-        assertFlowFile(flowFiles.get(0), null, "partition-3", "3", 
"test-shard", "{\"record\":\"1\"}\n" +
+        assertFlowFile(flowFiles.getFirst(), null, "partition-3", "3", 
"test-shard", "{\"record\":\"1\"}\n" +
                 "{\"record\":\"3\"}", 2);
 
         // check poison-pill output (as the raw data could not be retrieved)
         session.assertTransferCount(ConsumeKinesisStream.REL_PARSE_FAILURE, 1);
         final List<MockFlowFile> failureFlowFiles = 
session.getFlowFilesForRelationship(ConsumeKinesisStream.REL_PARSE_FAILURE);
-        assertFlowFile(failureFlowFiles.get(0), null, "unparsable-partition", 
"unparsable-sequence", "test-shard", "invalid-json", 0);
-        failureFlowFiles.get(0).assertAttributeExists("record.error.message");
+        assertFlowFile(failureFlowFiles.getFirst(), null, 
"unparsable-partition", "unparsable-sequence", "test-shard", "invalid-json", 0);
+        
failureFlowFiles.getFirst().assertAttributeExists("record.error.message");
 
         // check the invalid json record was *not* retried a 2nd time
         assertNull(verify(kinesisRecord, times(1)).partitionKey());

Reply via email to