This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 504ba51096 NIFI-15270 Added Consumer Type property to ConsumeKinesis
(#10578)
504ba51096 is described below
commit 504ba5109683c519f70001b1bc7cc04b8fd61648
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Tue Dec 2 17:58:58 2025 +0100
NIFI-15270 Added Consumer Type property to ConsumeKinesis (#10578)
- Support Shared Throughput or Enhanced Fan-Out options
Signed-off-by: David Handermann <[email protected]>
---
.../processors/aws/kinesis/ConsumeKinesis.java | 63 +++++++++++++++++--
.../additionalDetails.md | 10 +++
.../processors/aws/kinesis/ConsumeKinesisIT.java | 73 +++++++++++++++++++---
3 files changed, 132 insertions(+), 14 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index b173cfab46..91d1982fe8 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -80,6 +80,9 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
+import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
+import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
+import software.amazon.kinesis.retrieval.polling.PollingConfig;
import java.net.URI;
import java.nio.channels.Channels;
@@ -189,12 +192,19 @@ public class ConsumeKinesis extends AbstractProcessor {
.description("""
The Controller Service that is used to obtain AWS
credentials provider.
Ensure that the credentials provided have access to
Kinesis, DynamoDB and (optional) CloudWatch.
- (See processor's additional details for more information.)
""")
.required(true)
.identifiesControllerService(AwsCredentialsProviderService.class)
.build();
+ static final PropertyDescriptor CONSUMER_TYPE = new
PropertyDescriptor.Builder()
+ .name("Consumer Type")
+ .description("Strategy for reading records from Amazon Kinesis
streams.")
+ .required(true)
+ .allowableValues(ConsumerType.class)
+ .defaultValue(ConsumerType.ENHANCED_FAN_OUT)
+ .build();
+
static final PropertyDescriptor PROCESSING_STRATEGY = new
PropertyDescriptor.Builder()
.name("Processing Strategy")
.description("Strategy for processing Kinesis Records and writing
serialized output to FlowFiles.")
@@ -209,7 +219,7 @@ public class ConsumeKinesis extends AbstractProcessor {
The Record Reader to use for parsing the data received
from Kinesis.
The Record Reader is responsible for providing schemas for
the records. If the schemas change frequently,
- it might hinder performance of the processor. (See
processor's additional details for more information.)
+ it might hinder performance of the processor.
""")
.required(true)
.dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
@@ -291,6 +301,7 @@ public class ConsumeKinesis extends AbstractProcessor {
AWS_CREDENTIALS_PROVIDER_SERVICE,
REGION,
CUSTOM_REGION,
+ CONSUMER_TYPE,
PROCESSING_STRATEGY,
RECORD_READER,
RECORD_WRITER,
@@ -397,6 +408,7 @@ public class ConsumeKinesis extends AbstractProcessor {
final ConfigsBuilder configsBuilder = new
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient,
cloudWatchClient, workerId, recordProcessorFactory);
final MetricsFactory metricsFactory = configureMetricsFactory(context);
+ final RetrievalSpecificConfig retrievalSpecificConfig =
configureRetrievalSpecificConfig(context, kinesisClient, streamName,
applicationName);
final InitializationStateChangeListener initializationListener = new
InitializationStateChangeListener(getLogger());
@@ -407,7 +419,7 @@ public class ConsumeKinesis extends AbstractProcessor {
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig().metricsFactory(metricsFactory),
configsBuilder.processorConfig(),
- configsBuilder.retrievalConfig()
+
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
);
final String schedulerThreadName =
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
@@ -541,6 +553,18 @@ public class ConsumeKinesis extends AbstractProcessor {
};
}
+ private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
+ final ProcessContext context,
+ final KinesisAsyncClient kinesisClient,
+ final String streamName,
+ final String applicationName) {
+ final ConsumerType consumerType =
context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class);
+ return switch (consumerType) {
+ case SHARED_THROUGHPUT -> new
PollingConfig(kinesisClient).streamName(streamName);
+ case ENHANCED_FAN_OUT -> new
FanOutConfig(kinesisClient).streamName(streamName).applicationName(applicationName);
+ };
+ }
+
@OnStopped
public void onStopped() {
cleanUpState();
@@ -769,6 +793,34 @@ public class ConsumeKinesis extends AbstractProcessor {
}
}
+ enum ConsumerType implements DescribedValue {
+ SHARED_THROUGHPUT("Shared Throughput", "A consumer shares the read
throughput limits with other consumers"),
+ ENHANCED_FAN_OUT("Enhanced Fan-Out", "A consumer is granted a
dedicated read throughput with a lower latency");
+
+ private final String displayName;
+ private final String description;
+
+ ConsumerType(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+
enum ProcessingStrategy implements DescribedValue {
FLOW_FILE("Write one FlowFile for each consumed Kinesis Record"),
RECORD("Write one FlowFile containing multiple consumed Kinesis
Records processed with Record Reader and Record Writer");
@@ -860,9 +912,8 @@ public class ConsumeKinesis extends AbstractProcessor {
enum OutputStrategy implements DescribedValue {
USE_VALUE("Use Content as Value", "Write only the Kinesis Record value
to the FlowFile record."),
- USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and
metadata into the FlowFile record. (See additional details for more
information.)"),
- INJECT_METADATA("Inject Metadata",
- "Write the Kinesis Record value to the FlowFile record and add
a sub-record to it with metadata. (See additional details for more
information.)");
+ USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and
metadata into the FlowFile record."),
+ INJECT_METADATA("Inject Metadata", "Write the Kinesis Record value to
the FlowFile record and add a sub-record to it with metadata.");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
index fee288941b..6056c9b58e 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
@@ -116,6 +116,16 @@ The following is an example policy document for
`ConsumeKinesis`.
**Note:** Replace "{Region}", "{Account}", "{Stream Name}", and "{Application
Name}" in the ARNs with your own AWS region,
AWS account number, Kinesis data stream name, and `ConsumeKinesis`
_Application Name_ property respectively.
+## Consumer Type
+
+Comparison of different Consumer Types from [Amazon Kinesis Streams
documentation](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html):
+
+| Characteristics | Shared throughput consumers without enhanced
fan-out
| Enhanced fan-out consumers
|
+|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Read throughput | Fixed at a total of 2 MB/sec per shard. If there
are multiple consumers reading from the same shard, they all share this
throughput. The sum of the throughputs they receive from the shard doesn't
exceed 2 MB/sec. | Scales as consumers register to use enhanced fan-out. Each
consumer registered to use enhanced fan-out receives its own read throughput
per shard, up to 2 MB/sec, independently of other consumers. |
+| Message propagation delay | An average of around 200 ms if you have one
consumer reading from the stream. This average goes up to around 1000 ms if you
have five consumers.
| Typically an average of 70 ms whether you have one consumer or five
consumers.
|
+| Cost | Not applicable
| There is a data retrieval cost and a consumer-shard hour cost. For more
information, see [Amazon Kinesis Data Streams
Pricing](https://aws.amazon.com/kinesis/data-streams/pricing/?nc=sn&loc=3). |
+
## Record processing
When _Processing Strategy_ property is set to _RECORD_, _ConsumeKinesis_
operates in Record mode.
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
index 66d13d4a91..daef96b94b 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
@@ -55,9 +55,12 @@ import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Consumer;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import
software.amazon.awssdk.services.kinesis.model.ListStreamConsumersResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.ScalingType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import java.net.URI;
@@ -67,6 +70,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
@@ -195,6 +199,40 @@ class ConsumeKinesisIT {
assertNotNull(flowFile.getAttribute("aws.kinesis.shard.id"));
assertReceiveProvenanceEvents(runner.getProvenanceEvents(), flowFile);
+
+ // Creates an enhanced fan-out consumer by default.
+ assertEquals(
+ List.of(applicationName),
+ streamClient.getEnhancedFanOutConsumerNames(),
+ "Expected a single enhanced fan-out consumer with an
application name");
+ }
+
+ @Test
+ void testConsumeSingleMessageFromSingleShard_withoutEnhancedFanOut() {
+ runner.setProperty(ConsumeKinesis.CONSUMER_TYPE,
ConsumeKinesis.ConsumerType.SHARED_THROUGHPUT);
+
+ streamClient.createStream(1);
+
+ final String testMessage = "Hello, Kinesis!";
+ streamClient.putRecord("test-partition-key", testMessage);
+
+ runProcessorWithInitAndWaitForFiles(runner, 1);
+
+ runner.assertTransferCount(REL_SUCCESS, 1);
+ final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(REL_SUCCESS);
+ final MockFlowFile flowFile = flowFiles.getFirst();
+
+ flowFile.assertContentEquals(testMessage);
+ flowFile.assertAttributeEquals("aws.kinesis.partition.key",
"test-partition-key");
+
assertNotNull(flowFile.getAttribute("aws.kinesis.first.sequence.number"));
+
assertNotNull(flowFile.getAttribute("aws.kinesis.last.sequence.number"));
+ assertNotNull(flowFile.getAttribute("aws.kinesis.shard.id"));
+
+ assertReceiveProvenanceEvents(runner.getProvenanceEvents(), flowFile);
+
+ assertTrue(
+ streamClient.getEnhancedFanOutConsumerNames().isEmpty(),
+ "No enhanced fan-out consumers should be created for Shared
Throughput consumer type");
}
@Test
@@ -569,6 +607,28 @@ class ConsumeKinesisIT {
logger.info("Stream {} is now active", streamName);
}
+ List<String> getEnhancedFanOutConsumerNames() {
+ final String arn = describeStream().streamARN();
+
+ final ListStreamConsumersResponse response = executeWithRetry(
+ "listStreamConsumers",
+ () -> kinesisClient.listStreamConsumers(req ->
req.streamARN(arn))
+ );
+
+ return response.consumers().stream()
+ .map(Consumer::consumerName)
+ .toList();
+ }
+
+ private StreamDescription describeStream() {
+ final DescribeStreamResponse response = executeWithRetry(
+ "describeStream",
+ () -> kinesisClient.describeStream(req ->
req.streamName(streamName))
+ );
+
+ return response.streamDescription();
+ }
+
void deleteStream() {
logger.info("Deleting stream: {}", streamName);
@@ -627,9 +687,7 @@ class ConsumeKinesisIT {
while (System.currentTimeMillis() < timeoutMillis) {
try {
- final DescribeStreamResponse response =
kinesisClient.describeStream(req -> req.streamName(streamName));
-
- final StreamStatus status =
response.streamDescription().streamStatus();
+ final StreamStatus status =
describeStream().streamStatus();
if (status == StreamStatus.ACTIVE) {
return;
}
@@ -653,14 +711,13 @@ class ConsumeKinesisIT {
throw new IllegalStateException("Stream " + streamName + " did not
become active within timeout");
}
- private void executeWithRetry(final String operation, final Runnable
op) {
- RuntimeException lastException = null;
+ private <T> T executeWithRetry(final String operation, final
Callable<T> op) {
+ Exception lastException = null;
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
- op.run();
- return;
- } catch (final RuntimeException e) {
+ return op.call();
+ } catch (final Exception e) {
lastException = e;
logger.warn("Attempt {} of {} failed for operation {}: {}",
attempt, MAX_RETRIES, operation, e.getMessage());