This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 86bd16364c NIFI-15586 Add demarcator processing strategy to
ConsumeKinesis
86bd16364c is described below
commit 86bd16364cc1ae861b790e00c952c823c6f62680
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Feb 12 16:58:02 2026 +0100
NIFI-15586 Add demarcator processing strategy to ConsumeKinesis
This closes #10904.
Signed-off-by: Pierre Villard <[email protected]>
---
.../processors/aws/kinesis/ConsumeKinesis.java | 79 +++++++++++++++++++---
.../processors/aws/kinesis/ConsumeKinesisIT.java | 39 +++++++++++
.../processors/aws/kinesis/ConsumeKinesisTest.java | 10 +++
3 files changed, 119 insertions(+), 9 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index 4b9b857d8f..52044ee0f4 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -93,6 +94,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -102,6 +104,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.APPROXIMATE_ARRIVAL_TIMESTAMP;
@@ -246,6 +249,19 @@ public class ConsumeKinesis extends AbstractProcessor {
.allowableValues(OutputStrategy.class)
.build();
+ static final PropertyDescriptor MESSAGE_DEMARCATOR = new
PropertyDescriptor.Builder()
+ .name("Message Demarcator")
+ .description("""
+ Specifies the string (interpreted as UTF-8) to use for
demarcating multiple Kinesis messages
+ within a single FlowFile. If not specified, the content of
the messages will be concatenated
+ without any delimiter.
+ To enter special character such as 'new line' use
CTRL+Enter or Shift+Enter depending on the OS.
+ """)
+ .required(false)
+ .addValidator(Validator.VALID)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
+ .build();
+
static final PropertyDescriptor INITIAL_STREAM_POSITION = new
PropertyDescriptor.Builder()
.name("Initial Stream Position")
.description("The position in the stream where the processor
should start reading.")
@@ -309,6 +325,7 @@ public class ConsumeKinesis extends AbstractProcessor {
RECORD_READER,
RECORD_WRITER,
OUTPUT_STRATEGY,
+ MESSAGE_DEMARCATOR,
INITIAL_STREAM_POSITION,
STREAM_POSITION_TIMESTAMP,
MAX_BYTES_TO_BUFFER,
@@ -339,6 +356,7 @@ public class ConsumeKinesis extends AbstractProcessor {
private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
+ private volatile @Nullable byte[] demarcatorValue;
private volatile Future<InitializationResult> initializationResultFuture;
private final AtomicBoolean initialized = new AtomicBoolean();
@@ -360,7 +378,7 @@ public class ConsumeKinesis extends AbstractProcessor {
@Override
public Set<Relationship> getRelationships() {
return switch (processingStrategy) {
- case FLOW_FILE -> RAW_FILE_RELATIONSHIPS;
+ case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
case RECORD -> RECORD_FILE_RELATIONSHIPS;
};
}
@@ -375,9 +393,16 @@ public class ConsumeKinesis extends AbstractProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
readerRecordProcessor = switch (processingStrategy) {
- case FLOW_FILE -> null;
+ case FLOW_FILE, DEMARCATOR -> null;
case RECORD -> createReaderRecordProcessor(context);
};
+ demarcatorValue = switch (processingStrategy) {
+ case FLOW_FILE, RECORD -> null;
+ case DEMARCATOR -> {
+ final String demarcatorValue =
context.getProperty(MESSAGE_DEMARCATOR).getValue();
+ yield demarcatorValue != null ?
demarcatorValue.getBytes(UTF_8) : new byte[0];
+ }
+ };
final Region region = RegionUtil.getRegion(context);
final AwsCredentialsProvider credentialsProvider =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
@@ -593,6 +618,7 @@ public class ConsumeKinesis extends AbstractProcessor {
recordBuffer = null;
readerRecordProcessor = null;
+ demarcatorValue = null;
}
private void shutdownScheduler() {
@@ -680,11 +706,10 @@ public class ConsumeKinesis extends AbstractProcessor {
}
final String shardId = lease.shardId();
- final ReaderRecordProcessor processor = readerRecordProcessor;
- if (processor != null) {
- processRecordsWithReader(processor, session, shardId, records);
- } else {
- processRecordsAsRaw(session, shardId, records);
+ switch (processingStrategy) {
+ case FLOW_FILE -> processRecordsAsRaw(session, shardId,
records);
+ case RECORD -> processRecordsWithReader(session, shardId,
records);
+ case DEMARCATOR -> processRecordsAsDemarcated(session,
shardId, records);
}
session.adjustCounter("Records Processed", records.size(), false);
@@ -732,13 +757,48 @@ public class ConsumeKinesis extends AbstractProcessor {
}
}
- private void processRecordsWithReader(final ReaderRecordProcessor
recordProcessor, final ProcessSession session, final String shardId, final
List<KinesisClientRecord> records) {
+ private void processRecordsWithReader(final ProcessSession session, final
String shardId, final List<KinesisClientRecord> records) {
+ final ReaderRecordProcessor recordProcessor = readerRecordProcessor;
+ if (recordProcessor == null) {
+ throw new IllegalStateException("RecordProcessor has not been
initialized");
+ }
+
final ProcessingResult result =
recordProcessor.processRecords(session, streamName, shardId, records);
session.transfer(result.successFlowFiles(), REL_SUCCESS);
session.transfer(result.parseFailureFlowFiles(), REL_PARSE_FAILURE);
}
+ private void processRecordsAsDemarcated(final ProcessSession session,
final String shardId, final List<KinesisClientRecord> records) {
+ final byte[] demarcator = demarcatorValue;
+ if (demarcator == null) {
+ throw new IllegalStateException("Demarcator has not been
initialized");
+ }
+
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes =
ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId,
records.getFirst(), records.getLast());
+ attributes.put(RECORD_COUNT, String.valueOf(records.size()));
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ flowFile = session.write(flowFile, out -> {
+ try (final WritableByteChannel channel = Channels.newChannel(out))
{
+ boolean writtenData = false;
+ for (final KinesisClientRecord record : records) {
+ if (writtenData) {
+ out.write(demarcator);
+ }
+ channel.write(record.data());
+ writtenData = true;
+ }
+ }
+ });
+
+ session.getProvenanceReporter().receive(flowFile,
ProvenanceTransitUriFormat.toTransitUri(streamName, shardId));
+
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
/**
* An adapter between Kinesis Consumer Library and {@link RecordBuffer}.
*/
@@ -859,7 +919,8 @@ public class ConsumeKinesis extends AbstractProcessor {
enum ProcessingStrategy implements DescribedValue {
FLOW_FILE("Write one FlowFile for each consumed Kinesis Record"),
- RECORD("Write one FlowFile containing multiple consumed Kinesis
Records processed with Record Reader and Record Writer");
+ RECORD("Write one FlowFile containing multiple consumed Kinesis
Records processed with Record Reader and Record Writer"),
+ DEMARCATOR("Write one FlowFile containing multiple consumed Kinesis
Records separated by a configurable demarcator");
private final String description;
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
index ac353124e6..184f2edc56 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
@@ -442,6 +442,35 @@ class ConsumeKinesisIT {
assertReceiveProvenanceEvents(recordRunner.getProvenanceEvents(),
firstFlowFile, secondFlowFile, parseFailureFlowFile);
}
+ @Test
+ void testRecordProcessingWithDemarcator() throws InitializationException {
+ streamClient.createStream(1);
+
+ final TestRunner demarcatorTestRunner =
createDemarcatorTestRunner(streamName, applicationName, System.lineSeparator());
+
+ final List<String> testRecords = List.of(
+ "{\"name\":\"John\",\"age\":30}", // Schema A
+ "{\"name\":\"Jane\",\"age\":25}", // Schema A
+ "{invalid json}",
+ "{\"id\":\"123\",\"value\":\"test\"}" // Schema B
+ );
+
+ testRecords.forEach(record -> streamClient.putRecord("key", record));
+
+ runProcessorWithInitAndWaitForFiles(demarcatorTestRunner, 1);
+
+ // All records from the same shard are put as is into the same
FlowFile.
+ demarcatorTestRunner.assertTransferCount(REL_SUCCESS, 1);
+ final List<MockFlowFile> successFlowFiles =
demarcatorTestRunner.getFlowFilesForRelationship(REL_SUCCESS);
+
+ final MockFlowFile flowFile = successFlowFiles.getFirst();
+ assertEquals("4", flowFile.getAttribute(RECORD_COUNT));
+ flowFile.assertContentEquals(String.join(System.lineSeparator(),
testRecords));
+
+ // Verify provenance events.
+
assertReceiveProvenanceEvents(demarcatorTestRunner.getProvenanceEvents(),
flowFile);
+ }
+
private static void assertReceiveProvenanceEvents(final
List<ProvenanceEventRecord> actualEvents, final FlowFile... expectedFlowFiles) {
assertReceiveProvenanceEvents(actualEvents,
List.of(expectedFlowFiles));
}
@@ -511,6 +540,16 @@ class ConsumeKinesisIT {
return runner;
}
+ private TestRunner createDemarcatorTestRunner(final String streamName,
final String applicationName, final String demarcator) throws
InitializationException {
+ final TestRunner runner = createTestRunner(streamName,
applicationName);
+
+ runner.setProperty(ConsumeKinesis.PROCESSING_STRATEGY,
ConsumeKinesis.ProcessingStrategy.DEMARCATOR);
+ runner.setProperty(ConsumeKinesis.MESSAGE_DEMARCATOR, demarcator);
+
+ runner.assertValid();
+ return runner;
+ }
+
private void runProcessorWithInitAndWaitForFiles(final TestRunner runner,
final int expectedFlowFileCount) {
runProcessorAndWaitForFiles(runner, expectedFlowFileCount, true);
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
index 3d105cb0bf..1024896b2c 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Set;
import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY;
+import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.DEMARCATOR;
import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE;
import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.RECORD;
import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
@@ -61,6 +62,15 @@ class ConsumeKinesisTest {
assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
}
+ @Test
+ void getRelationshipsForDemarcatorProcessingStrategy() {
+ testRunner.setProperty(PROCESSING_STRATEGY, DEMARCATOR);
+
+ final Set<Relationship> relationships =
testRunner.getProcessor().getRelationships();
+
+ assertEquals(Set.of(REL_SUCCESS), relationships);
+ }
+
private static TestRunner createTestRunner() {
final TestRunner runner =
TestRunners.newTestRunner(ConsumeKinesis.class);