This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 504ba51096 NIFI-15270 Added Consumer Type property to ConsumeKinesis 
(#10578)
504ba51096 is described below

commit 504ba5109683c519f70001b1bc7cc04b8fd61648
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Tue Dec 2 17:58:58 2025 +0100

    NIFI-15270 Added Consumer Type property to ConsumeKinesis (#10578)
    
    - Support Shared Throughput or Enhanced Fan-Out options
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/aws/kinesis/ConsumeKinesis.java     | 63 +++++++++++++++++--
 .../additionalDetails.md                           | 10 +++
 .../processors/aws/kinesis/ConsumeKinesisIT.java   | 73 +++++++++++++++++++---
 3 files changed, 132 insertions(+), 14 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index b173cfab46..91d1982fe8 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -80,6 +80,9 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
 import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
 import software.amazon.kinesis.processor.SingleStreamTracker;
 import software.amazon.kinesis.retrieval.KinesisClientRecord;
+import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
+import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
+import software.amazon.kinesis.retrieval.polling.PollingConfig;
 
 import java.net.URI;
 import java.nio.channels.Channels;
@@ -189,12 +192,19 @@ public class ConsumeKinesis extends AbstractProcessor {
             .description("""
                     The Controller Service that is used to obtain AWS 
credentials provider.
                     Ensure that the credentials provided have access to 
Kinesis, DynamoDB and (optional) CloudWatch.
-                    (See processor's additional details for more information.)
                     """)
             .required(true)
             .identifiesControllerService(AwsCredentialsProviderService.class)
             .build();
 
+    static final PropertyDescriptor CONSUMER_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Consumer Type")
+            .description("Strategy for reading records from Amazon Kinesis 
streams.")
+            .required(true)
+            .allowableValues(ConsumerType.class)
+            .defaultValue(ConsumerType.ENHANCED_FAN_OUT)
+            .build();
+
     static final PropertyDescriptor PROCESSING_STRATEGY = new 
PropertyDescriptor.Builder()
             .name("Processing Strategy")
             .description("Strategy for processing Kinesis Records and writing 
serialized output to FlowFiles.")
@@ -209,7 +219,7 @@ public class ConsumeKinesis extends AbstractProcessor {
                     The Record Reader to use for parsing the data received 
from Kinesis.
 
                     The Record Reader is responsible for providing schemas for 
the records. If the schemas change frequently,
-                    it might hinder performance of the processor. (See 
processor's additional details for more information.)
+                    it might hinder performance of the processor.
                     """)
             .required(true)
             .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
@@ -291,6 +301,7 @@ public class ConsumeKinesis extends AbstractProcessor {
             AWS_CREDENTIALS_PROVIDER_SERVICE,
             REGION,
             CUSTOM_REGION,
+            CONSUMER_TYPE,
             PROCESSING_STRATEGY,
             RECORD_READER,
             RECORD_WRITER,
@@ -397,6 +408,7 @@ public class ConsumeKinesis extends AbstractProcessor {
         final ConfigsBuilder configsBuilder = new 
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient, 
cloudWatchClient, workerId, recordProcessorFactory);
 
         final MetricsFactory metricsFactory = configureMetricsFactory(context);
+        final RetrievalSpecificConfig retrievalSpecificConfig = 
configureRetrievalSpecificConfig(context, kinesisClient, streamName, 
applicationName);
 
         final InitializationStateChangeListener initializationListener = new 
InitializationStateChangeListener(getLogger());
 
@@ -407,7 +419,7 @@ public class ConsumeKinesis extends AbstractProcessor {
                 configsBuilder.lifecycleConfig(),
                 configsBuilder.metricsConfig().metricsFactory(metricsFactory),
                 configsBuilder.processorConfig(),
-                configsBuilder.retrievalConfig()
+                
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
         );
 
         final String schedulerThreadName = 
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
@@ -541,6 +553,18 @@ public class ConsumeKinesis extends AbstractProcessor {
         };
     }
 
+    private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
+            final ProcessContext context,
+            final KinesisAsyncClient kinesisClient,
+            final String streamName,
+            final String applicationName) {
+        final ConsumerType consumerType = 
context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class);
+        return switch (consumerType) {
+            case SHARED_THROUGHPUT -> new 
PollingConfig(kinesisClient).streamName(streamName);
+            case ENHANCED_FAN_OUT -> new 
FanOutConfig(kinesisClient).streamName(streamName).applicationName(applicationName);
+        };
+    }
+
     @OnStopped
     public void onStopped() {
         cleanUpState();
@@ -769,6 +793,34 @@ public class ConsumeKinesis extends AbstractProcessor {
         }
     }
 
+    enum ConsumerType implements DescribedValue {
+        SHARED_THROUGHPUT("Shared Throughput", "A consumer shares the read 
throughput limits with other consumers"),
+        ENHANCED_FAN_OUT("Enhanced Fan-Out", "A consumer is granted a 
dedicated read throughput with a lower latency");
+
+        private final String displayName;
+        private final String description;
+
+        ConsumerType(final String displayName, final String description) {
+            this.displayName = displayName;
+            this.description = description;
+        }
+
+        @Override
+        public String getValue() {
+            return name();
+        }
+
+        @Override
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+
     enum ProcessingStrategy implements DescribedValue {
         FLOW_FILE("Write one FlowFile for each consumed Kinesis Record"),
         RECORD("Write one FlowFile containing multiple consumed Kinesis 
Records processed with Record Reader and Record Writer");
@@ -860,9 +912,8 @@ public class ConsumeKinesis extends AbstractProcessor {
 
     enum OutputStrategy implements DescribedValue {
         USE_VALUE("Use Content as Value", "Write only the Kinesis Record value 
to the FlowFile record."),
-        USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and 
metadata into the FlowFile record. (See additional details for more 
information.)"),
-        INJECT_METADATA("Inject Metadata",
-                "Write the Kinesis Record value to the FlowFile record and add 
a sub-record to it with metadata. (See additional details for more 
information.)");
+        USE_WRAPPER("Use Wrapper", "Write the Kinesis Record value and 
metadata into the FlowFile record."),
+        INJECT_METADATA("Inject Metadata", "Write the Kinesis Record value to 
the FlowFile record and add a sub-record to it with metadata.");
 
         private final String displayName;
         private final String description;
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
index fee288941b..6056c9b58e 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md
@@ -116,6 +116,16 @@ The following is an example policy document for 
`ConsumeKinesis`.
 **Note:** Replace "{Region}", "{Account}", "{Stream Name}", and "{Application 
Name}" in the ARNs with your own AWS region, 
 AWS account number, Kinesis data stream name, and `ConsumeKinesis` 
_Application Name_ property respectively.
 
+## Consumer Type
+
+Comparison of different Consumer Types from [Amazon Kinesis Streams 
documentation](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html):
+
+| Characteristics           | Shared throughput consumers without enhanced 
fan-out                                                                         
                                                                                
        | Enhanced fan-out consumers                                            
                                                                                
                                               |
+|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Read throughput           | Fixed at a total of 2 MB/sec per shard. If there 
are multiple consumers reading from the same shard, they all share this 
throughput. The sum of the throughputs they receive from the shard doesn't 
exceed 2 MB/sec. | Scales as consumers register to use enhanced fan-out. Each 
consumer registered to use enhanced fan-out receives its own read throughput 
per shard, up to 2 MB/sec, independently of other consumers. |
+| Message propagation delay | An average of around 200 ms if you have one 
consumer reading from the stream. This average goes up to around 1000 ms if you 
have five consumers.                                                            
         | Typically an average of 70 ms whether you have one consumer or five 
consumers.                                                                      
                                                 |
+| Cost                      | Not applicable                                   
                                                                                
                                                                                
    | There is a data retrieval cost and a consumer-shard hour cost. For more 
information, see [Amazon Kinesis Data Streams 
Pricing](https://aws.amazon.com/kinesis/data-streams/pricing/?nc=sn&loc=3).    |
+
 ## Record processing
 
 When _Processing Strategy_ property is set to _RECORD_, _ConsumeKinesis_ 
operates in Record mode.
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
index 66d13d4a91..daef96b94b 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java
@@ -55,9 +55,12 @@ import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Consumer;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import 
software.amazon.awssdk.services.kinesis.model.ListStreamConsumersResponse;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
 import software.amazon.awssdk.services.kinesis.model.ScalingType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 import software.amazon.awssdk.services.kinesis.model.StreamStatus;
 
 import java.net.URI;
@@ -67,6 +70,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
@@ -195,6 +199,40 @@ class ConsumeKinesisIT {
         assertNotNull(flowFile.getAttribute("aws.kinesis.shard.id"));
 
         assertReceiveProvenanceEvents(runner.getProvenanceEvents(), flowFile);
+
+        // Creates an enhanced fan-out consumer by default.
+        assertEquals(
+                List.of(applicationName),
+                streamClient.getEnhancedFanOutConsumerNames(),
+                "Expected a single enhanced fan-out consumer with an 
application name");
+    }
+
+    @Test
+    void testConsumeSingleMessageFromSingleShard_withoutEnhancedFanOut() {
+        runner.setProperty(ConsumeKinesis.CONSUMER_TYPE, 
ConsumeKinesis.ConsumerType.SHARED_THROUGHPUT);
+
+        streamClient.createStream(1);
+
+        final String testMessage = "Hello, Kinesis!";
+        streamClient.putRecord("test-partition-key", testMessage);
+
+        runProcessorWithInitAndWaitForFiles(runner, 1);
+
+        runner.assertTransferCount(REL_SUCCESS, 1);
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+        final MockFlowFile flowFile = flowFiles.getFirst();
+
+        flowFile.assertContentEquals(testMessage);
+        flowFile.assertAttributeEquals("aws.kinesis.partition.key", 
"test-partition-key");
+        
assertNotNull(flowFile.getAttribute("aws.kinesis.first.sequence.number"));
+        
assertNotNull(flowFile.getAttribute("aws.kinesis.last.sequence.number"));
+        assertNotNull(flowFile.getAttribute("aws.kinesis.shard.id"));
+
+        assertReceiveProvenanceEvents(runner.getProvenanceEvents(), flowFile);
+
+        assertTrue(
+                streamClient.getEnhancedFanOutConsumerNames().isEmpty(),
+                "No enhanced fan-out consumers should be created for Shared 
Throughput consumer type");
     }
 
     @Test
@@ -569,6 +607,28 @@ class ConsumeKinesisIT {
             logger.info("Stream {} is now active", streamName);
         }
 
+        List<String> getEnhancedFanOutConsumerNames() {
+            final String arn = describeStream().streamARN();
+
+            final ListStreamConsumersResponse response = executeWithRetry(
+                    "listStreamConsumers",
+                    () -> kinesisClient.listStreamConsumers(req -> 
req.streamARN(arn))
+            );
+
+            return response.consumers().stream()
+                    .map(Consumer::consumerName)
+                    .toList();
+        }
+
+        private StreamDescription describeStream() {
+            final DescribeStreamResponse response = executeWithRetry(
+                    "describeStream",
+                    () -> kinesisClient.describeStream(req -> 
req.streamName(streamName))
+            );
+
+            return response.streamDescription();
+        }
+
         void deleteStream() {
             logger.info("Deleting stream: {}", streamName);
 
@@ -627,9 +687,7 @@ class ConsumeKinesisIT {
 
             while (System.currentTimeMillis() < timeoutMillis) {
                 try {
-                    final DescribeStreamResponse response = 
kinesisClient.describeStream(req -> req.streamName(streamName));
-
-                    final StreamStatus status = 
response.streamDescription().streamStatus();
+                    final StreamStatus status = 
describeStream().streamStatus();
                     if (status == StreamStatus.ACTIVE) {
                         return;
                     }
@@ -653,14 +711,13 @@ class ConsumeKinesisIT {
             throw new IllegalStateException("Stream " + streamName + " did not 
become active within timeout");
         }
 
-        private void executeWithRetry(final String operation, final Runnable 
op) {
-            RuntimeException lastException = null;
+        private <T> T executeWithRetry(final String operation, final 
Callable<T> op) {
+            Exception lastException = null;
 
             for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
                 try {
-                    op.run();
-                    return;
-                } catch (final RuntimeException e) {
+                    return op.call();
+                } catch (final Exception e) {
                     lastException = e;
                     logger.warn("Attempt {} of {} failed for operation {}: {}",
                             attempt, MAX_RETRIES, operation, e.getMessage());

Reply via email to