This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 86bd16364c NIFI-15586 Add demarcator processing strategy to 
ConsumeKinesis
86bd16364c is described below

commit 86bd16364cc1ae861b790e00c952c823c6f62680
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Feb 12 16:58:02 2026 +0100

    NIFI-15586 Add demarcator processing strategy to ConsumeKinesis
    
    This closes #10904.
    
    Signed-off-by: Pierre Villard <[email protected]>
---
 .../processors/aws/kinesis/ConsumeKinesis.java     | 79 +++++++++++++++++++---
 .../processors/aws/kinesis/ConsumeKinesisIT.java   | 39 +++++++++++
 .../processors/aws/kinesis/ConsumeKinesisTest.java | 10 +++
 3 files changed, 119 insertions(+), 9 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index 4b9b857d8f..52044ee0f4 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -93,6 +94,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -102,6 +104,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.APPROXIMATE_ARRIVAL_TIMESTAMP;
@@ -246,6 +249,19 @@ public class ConsumeKinesis extends AbstractProcessor {
             .allowableValues(OutputStrategy.class)
             .build();
 
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .description("""
+                    Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kinesis messages
+                    within a single FlowFile. If not specified, the content of 
the messages will be concatenated
+                    without any delimiter.
+                    To enter special character such as 'new line' use 
CTRL+Enter or Shift+Enter depending on the OS.
+                    """)
+            .required(false)
+            .addValidator(Validator.VALID)
+            .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
+            .build();
+
     static final PropertyDescriptor INITIAL_STREAM_POSITION = new 
PropertyDescriptor.Builder()
             .name("Initial Stream Position")
             .description("The position in the stream where the processor 
should start reading.")
@@ -309,6 +325,7 @@ public class ConsumeKinesis extends AbstractProcessor {
             RECORD_READER,
             RECORD_WRITER,
             OUTPUT_STRATEGY,
+            MESSAGE_DEMARCATOR,
             INITIAL_STREAM_POSITION,
             STREAM_POSITION_TIMESTAMP,
             MAX_BYTES_TO_BUFFER,
@@ -339,6 +356,7 @@ public class ConsumeKinesis extends AbstractProcessor {
     private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
 
     private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
+    private volatile @Nullable byte[] demarcatorValue;
 
     private volatile Future<InitializationResult> initializationResultFuture;
     private final AtomicBoolean initialized = new AtomicBoolean();
@@ -360,7 +378,7 @@ public class ConsumeKinesis extends AbstractProcessor {
     @Override
     public Set<Relationship> getRelationships() {
         return switch (processingStrategy) {
-            case FLOW_FILE -> RAW_FILE_RELATIONSHIPS;
+            case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
             case RECORD -> RECORD_FILE_RELATIONSHIPS;
         };
     }
@@ -375,9 +393,16 @@ public class ConsumeKinesis extends AbstractProcessor {
     @OnScheduled
     public void setup(final ProcessContext context) {
         readerRecordProcessor = switch (processingStrategy) {
-            case FLOW_FILE -> null;
+            case FLOW_FILE, DEMARCATOR -> null;
             case RECORD -> createReaderRecordProcessor(context);
         };
+        demarcatorValue = switch (processingStrategy) {
+            case FLOW_FILE, RECORD -> null;
+            case DEMARCATOR -> {
+                final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
+                yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
+            }
+        };
 
         final Region region = RegionUtil.getRegion(context);
         final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
@@ -593,6 +618,7 @@ public class ConsumeKinesis extends AbstractProcessor {
 
         recordBuffer = null;
         readerRecordProcessor = null;
+        demarcatorValue = null;
     }
 
     private void shutdownScheduler() {
@@ -680,11 +706,10 @@ public class ConsumeKinesis extends AbstractProcessor {
             }
 
             final String shardId = lease.shardId();
-            final ReaderRecordProcessor processor = readerRecordProcessor;
-            if (processor != null) {
-                processRecordsWithReader(processor, session, shardId, records);
-            } else {
-                processRecordsAsRaw(session, shardId, records);
+            switch (processingStrategy) {
+                case FLOW_FILE -> processRecordsAsRaw(session, shardId, 
records);
+                case RECORD -> processRecordsWithReader(session, shardId, 
records);
+                case DEMARCATOR -> processRecordsAsDemarcated(session, 
shardId, records);
             }
 
             session.adjustCounter("Records Processed", records.size(), false);
@@ -732,13 +757,48 @@ public class ConsumeKinesis extends AbstractProcessor {
         }
     }
 
-    private void processRecordsWithReader(final ReaderRecordProcessor 
recordProcessor, final ProcessSession session, final String shardId, final 
List<KinesisClientRecord> records) {
+    private void processRecordsWithReader(final ProcessSession session, final 
String shardId, final List<KinesisClientRecord> records) {
+        final ReaderRecordProcessor recordProcessor = readerRecordProcessor;
+        if (recordProcessor == null) {
+            throw new IllegalStateException("RecordProcessor has not been 
initialized");
+        }
+
         final ProcessingResult result = 
recordProcessor.processRecords(session, streamName, shardId, records);
 
         session.transfer(result.successFlowFiles(), REL_SUCCESS);
         session.transfer(result.parseFailureFlowFiles(), REL_PARSE_FAILURE);
     }
 
+    private void processRecordsAsDemarcated(final ProcessSession session, 
final String shardId, final List<KinesisClientRecord> records) {
+        final byte[] demarcator = demarcatorValue;
+        if (demarcator == null) {
+            throw new IllegalStateException("Demarcator has not been 
initialized");
+        }
+
+        FlowFile flowFile = session.create();
+
+        final Map<String, String> attributes = 
ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, 
records.getFirst(), records.getLast());
+        attributes.put(RECORD_COUNT, String.valueOf(records.size()));
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        flowFile = session.write(flowFile, out -> {
+            try (final WritableByteChannel channel = Channels.newChannel(out)) 
{
+                boolean writtenData = false;
+                for (final KinesisClientRecord record : records) {
+                    if (writtenData) {
+                        out.write(demarcator);
+                    }
+                    channel.write(record.data());
+                    writtenData = true;
+                }
+            }
+        });
+
+        session.getProvenanceReporter().receive(flowFile, 
ProvenanceTransitUriFormat.toTransitUri(streamName, shardId));
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
     /**
      * An adapter between Kinesis Consumer Library and {@link RecordBuffer}.
      */
@@ -859,7 +919,8 @@ public class ConsumeKinesis extends AbstractProcessor {
 
     enum ProcessingStrategy implements DescribedValue {
         FLOW_FILE("Write one FlowFile for each consumed Kinesis Record"),
-        RECORD("Write one FlowFile containing multiple consumed Kinesis 
Records processed with Record Reader and Record Writer");
+        RECORD("Write one FlowFile containing multiple consumed Kinesis 
Records processed with Record Reader and Record Writer"),
+        DEMARCATOR("Write one FlowFile containing multiple consumed Kinesis 
Records separated by a configurable demarcator");
 
         private final String description;
 
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
index ac353124e6..184f2edc56 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
@@ -442,6 +442,35 @@ class ConsumeKinesisIT {
         assertReceiveProvenanceEvents(recordRunner.getProvenanceEvents(), 
firstFlowFile, secondFlowFile, parseFailureFlowFile);
     }
 
+    @Test
+    void testRecordProcessingWithDemarcator() throws InitializationException {
+        streamClient.createStream(1);
+
+        final TestRunner demarcatorTestRunner = 
createDemarcatorTestRunner(streamName, applicationName, System.lineSeparator());
+
+        final List<String> testRecords = List.of(
+                "{\"name\":\"John\",\"age\":30}", // Schema A
+                "{\"name\":\"Jane\",\"age\":25}", // Schema A
+                "{invalid json}",
+                "{\"id\":\"123\",\"value\":\"test\"}" // Schema B
+        );
+
+        testRecords.forEach(record -> streamClient.putRecord("key", record));
+
+        runProcessorWithInitAndWaitForFiles(demarcatorTestRunner, 1);
+
+        // All records from the same shard are put as is into the same 
FlowFile.
+        demarcatorTestRunner.assertTransferCount(REL_SUCCESS, 1);
+        final List<MockFlowFile> successFlowFiles = 
demarcatorTestRunner.getFlowFilesForRelationship(REL_SUCCESS);
+
+        final MockFlowFile flowFile = successFlowFiles.getFirst();
+        assertEquals("4", flowFile.getAttribute(RECORD_COUNT));
+        flowFile.assertContentEquals(String.join(System.lineSeparator(), 
testRecords));
+
+        // Verify provenance events.
+        
assertReceiveProvenanceEvents(demarcatorTestRunner.getProvenanceEvents(), 
flowFile);
+    }
+
     private static void assertReceiveProvenanceEvents(final 
List<ProvenanceEventRecord> actualEvents, final FlowFile... expectedFlowFiles) {
         assertReceiveProvenanceEvents(actualEvents, 
List.of(expectedFlowFiles));
     }
@@ -511,6 +540,16 @@ class ConsumeKinesisIT {
         return runner;
     }
 
+    private TestRunner createDemarcatorTestRunner(final String streamName, 
final String applicationName, final String demarcator) throws 
InitializationException {
+        final TestRunner runner = createTestRunner(streamName, 
applicationName);
+
+        runner.setProperty(ConsumeKinesis.PROCESSING_STRATEGY, 
ConsumeKinesis.ProcessingStrategy.DEMARCATOR);
+        runner.setProperty(ConsumeKinesis.MESSAGE_DEMARCATOR, demarcator);
+
+        runner.assertValid();
+        return runner;
+    }
+
     private void runProcessorWithInitAndWaitForFiles(final TestRunner runner, 
final int expectedFlowFileCount) {
         runProcessorAndWaitForFiles(runner, expectedFlowFileCount, true);
     }
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
index 3d105cb0bf..1024896b2c 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Set;
 
 import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY;
+import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.DEMARCATOR;
 import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE;
 import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.RECORD;
 import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
@@ -61,6 +62,15 @@ class ConsumeKinesisTest {
         assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
     }
 
+    @Test
+    void getRelationshipsForDemarcatorProcessingStrategy() {
+        testRunner.setProperty(PROCESSING_STRATEGY, DEMARCATOR);
+
+        final Set<Relationship> relationships = 
testRunner.getProcessor().getRelationships();
+
+        assertEquals(Set.of(REL_SUCCESS), relationships);
+    }
+
     private static TestRunner createTestRunner() {
         final TestRunner runner = 
TestRunners.newTestRunner(ConsumeKinesis.class);
 

Reply via email to