awelless commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2910557024


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +335,1198 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
     private static final Set<Relationship> RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
     private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-    private volatile DynamoDbAsyncClient dynamoDbClient;
-    private volatile CloudWatchAsyncClient cloudWatchClient;
-    private volatile KinesisAsyncClient kinesisClient;
-    private volatile Scheduler kinesisScheduler;
-
+    private volatile SdkHttpClient kinesisHttpClient;
+    private volatile SdkHttpClient dynamoHttpClient;
+    private volatile KinesisClient kinesisClient;
+    private volatile DynamoDbClient dynamoDbClient;
+    private volatile SdkAsyncHttpClient asyncHttpClient;
+    private volatile KinesisShardManager shardManager;
+    private volatile KinesisConsumerClient consumerClient;
     private volatile String streamName;
-    private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
-    private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-    private volatile @Nullable byte[] demarcatorValue;
+    private volatile int maxRecordsPerRequest;
+    private volatile String initialStreamPosition;
+    private volatile long maxBatchNanos;
+    private volatile long maxBatchBytes;
 
-    private volatile Future<InitializationResult> initializationResultFuture;
-    private final AtomicBoolean initialized = new AtomicBoolean();
-
-    // An instance filed, so that it can be read in getRelationships.
-    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-            PROCESSING_STRATEGY.getDefaultValue());
+    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
+    private volatile String efoConsumerArn;
+    private final AtomicLong shardRoundRobinCounter = new AtomicLong();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTY_DESCRIPTORS;
     }
 
-    @Override
-    public void migrateProperties(final PropertyConfiguration config) {
-        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-    }
-
     @Override
     public Set<Relationship> getRelationships() {
         return switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+            case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
             case RECORD -> RECORD_FILE_RELATIONSHIPS;
         };
     }
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (descriptor.equals(PROCESSING_STRATEGY)) {
-            processingStrategy = ProcessingStrategy.from(newValue);
+            processingStrategy = ProcessingStrategy.valueOf(newValue);
         }
     }
 
-    @OnScheduled
-    public void setup(final ProcessContext context) {
-        readerRecordProcessor = switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> null;
-            case RECORD -> createReaderRecordProcessor(context);
-        };
-        demarcatorValue = switch (processingStrategy) {
-            case FLOW_FILE, RECORD -> null;
-            case DEMARCATOR -> {
-                final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-                yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-            }
-        };
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+        config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+        config.removeProperty("Checkpoint Interval");
+        config.removeProperty("Metrics Publishing");
+    }
 
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
         final Region region = RegionUtil.getRegion(context);
         final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
                 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+        final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-        kinesisClient = KinesisAsyncClient.builder()
-                .region(region)
-                .credentialsProvider(credentialsProvider)
-                .endpointOverride(getKinesisEndpointOverride())
-                .httpClient(createKinesisHttpClient(context))
+        final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+                .apiCallTimeout(API_CALL_TIMEOUT)
+                .apiCallAttemptTimeout(API_CALL_ATTEMPT_TIMEOUT)
                 .build();
 
-        dynamoDbClient = DynamoDbAsyncClient.builder()
+        final KinesisClientBuilder kinesisBuilder = KinesisClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getDynamoDbEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
+                .overrideConfiguration(clientConfig);
 
-        cloudWatchClient = CloudWatchAsyncClient.builder()
+        final DynamoDbClientBuilder dynamoBuilder = DynamoDbClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getCloudwatchEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
+                .overrideConfiguration(clientConfig);
 
-        streamName = context.getProperty(STREAM_NAME).getValue();
-        final InitialPositionInStreamExtended initialPositionExtended = 
getInitialPosition(context);
-        final SingleStreamTracker streamTracker = new 
SingleStreamTracker(streamName, initialPositionExtended);
-
-        final long maxBytesToBuffer = 
context.getProperty(MAX_BYTES_TO_BUFFER).asDataSize(DataUnit.B).longValue();
-        final Duration checkpointInterval = 
context.getProperty(CHECKPOINT_INTERVAL).asDuration();
-        final MemoryBoundRecordBuffer memoryBoundRecordBuffer = new 
MemoryBoundRecordBuffer(getLogger(), maxBytesToBuffer, checkpointInterval);
-        recordBuffer = memoryBoundRecordBuffer;
-        final ShardRecordProcessorFactory recordProcessorFactory = () -> new 
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
-
-        final String applicationName = 
context.getProperty(APPLICATION_NAME).getValue();
-        final String workerId = generateWorkerId();
-        final ConfigsBuilder configsBuilder = new 
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient, 
cloudWatchClient, workerId, recordProcessorFactory);
-
-        final MetricsFactory metricsFactory = configureMetricsFactory(context);
-        final RetrievalSpecificConfig retrievalSpecificConfig = 
configureRetrievalSpecificConfig(context, kinesisClient, streamName, 
applicationName);
-
-        final InitializationStateChangeListener initializationListener = new 
InitializationStateChangeListener(getLogger());
-        initialized.set(false);
-        initializationResultFuture = initializationListener.result();
-
-        kinesisScheduler = new Scheduler(
-                configsBuilder.checkpointConfig(),
-                
configsBuilder.coordinatorConfig().workerStateChangeListener(initializationListener),
-                configsBuilder.leaseManagementConfig(),
-                configsBuilder.lifecycleConfig(),
-                configsBuilder.metricsConfig().metricsFactory(metricsFactory),
-                configsBuilder.processorConfig(),
-                
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
-        );
-
-        final String schedulerThreadName = 
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
-        final Thread schedulerThread = new Thread(kinesisScheduler, 
schedulerThreadName);
-        schedulerThread.setDaemon(true);
-        schedulerThread.start();
-        // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
-
-        try {
-            final InitializationResult result = initializationResultFuture.get(
-                    
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
-            checkInitializationResult(result);
-        } catch (final TimeoutException e) {
-            // During a first run the processor will take more time to 
initialize. We return from OnSchedule and continue waiting in the onTrigger 
method.
-            getLogger().warn("Kinesis Scheduler initialization may take up to 
10 minutes on a first run, which is caused by AWS resources initialization");
-        } catch (final InterruptedException | ExecutionException e) {
-            if (e instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-            cleanUpState();
-            throw new ProcessException("Initialization failed for stream 
[%s]".formatted(streamName), e);
+        if (endpointOverride != null && !endpointOverride.isEmpty()) {
+            final URI endpointUri = URI.create(endpointOverride);
+            kinesisBuilder.endpointOverride(endpointUri);
+            dynamoBuilder.endpointOverride(endpointUri);
         }
-    }
 
-    /**
-     * Creating Kinesis HTTP client, as per
-     * {@link 
software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
-     */
-    private static SdkAsyncHttpClient createKinesisHttpClient(final 
ProcessContext context) {
-        return createHttpClientBuilder(context)
-                .protocol(Protocol.HTTP2)
-                // Since we're using HTTP/2, multiple concurrent requests will 
reuse the same HTTP connection.
-                // Therefore, the number of real connections is going to be 
relatively small.
-                .maxConcurrency(Integer.MAX_VALUE)
-                .http2Configuration(Http2Configuration.builder()
-                        
.initialWindowSize(KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES)
-                        
.healthCheckPingPeriod(KINESIS_HTTP_HEALTH_CHECK_PERIOD)
-                        .build())
-                .build();
-    }
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        kinesisHttpClient = buildApacheHttpClient(proxyConfig, 
PollingKinesisClient.MAX_CONCURRENT_FETCHES + 10);
+        dynamoHttpClient = buildApacheHttpClient(proxyConfig, 50);
+        kinesisBuilder.httpClient(kinesisHttpClient);
+        dynamoBuilder.httpClient(dynamoHttpClient);
+
+        kinesisClient = kinesisBuilder.build();
+        dynamoDbClient = dynamoBuilder.build();
 
-    private static NettyNioAsyncHttpClient.Builder 
createHttpClientBuilder(final ProcessContext context) {
-        final NettyNioAsyncHttpClient.Builder builder = 
NettyNioAsyncHttpClient.builder()
-                .connectionTimeout(HTTP_CLIENTS_CONNECTION_TIMEOUT)
-                .readTimeout(HTTP_CLIENTS_READ_TIMEOUT);
+        final String checkpointTableName = 
context.getProperty(APPLICATION_NAME).getValue();
+        streamName = context.getProperty(STREAM_NAME).getValue();
+        initialStreamPosition = 
context.getProperty(INITIAL_STREAM_POSITION).getValue();
+        maxBatchNanos = 
context.getProperty(MAX_BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
+        maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B).longValue();
+
+        final boolean efoMode = 
ConsumerType.ENHANCED_FAN_OUT.equals(context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class));
+        maxRecordsPerRequest = efoMode ? 0 : 
context.getProperty(MAX_RECORDS_PER_REQUEST).asInteger();
+
+        shardManager = createShardManager(kinesisClient, dynamoDbClient, 
getLogger(), checkpointTableName, streamName);
+        shardManager.ensureCheckpointTableExists();
+        consumerClient = createConsumerClient(kinesisClient, getLogger(), 
efoMode);
+
+        final Instant timestampForPosition = resolveTimestampPosition(context);
+        if (timestampForPosition != null) {
+            if (consumerClient instanceof PollingKinesisClient polling) {
+                polling.setTimestampForInitialPosition(timestampForPosition);
+            } else if (consumerClient instanceof EfoKinesisClient efo) {
+                efo.setTimestampForInitialPosition(timestampForPosition);
+            }
+        }
 
-        final ProxyConfigurationService proxyConfigService = 
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
-        if (proxyConfigService != null) {
-            final ProxyConfiguration proxyConfig = 
proxyConfigService.getConfiguration();
+        if (efoMode) {
+            final NettyNioAsyncHttpClient.Builder nettyBuilder = 
NettyNioAsyncHttpClient.builder()
+                    .protocol(Protocol.HTTP2)
+                    .maxConcurrency(500)
+                    .connectionAcquisitionTimeout(Duration.ofSeconds(60));
 
-            final 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder 
proxyConfigBuilder = 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
+            if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+                final 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder 
nettyProxyBuilder = 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
                     .host(proxyConfig.getProxyServerHost())
                     .port(proxyConfig.getProxyServerPort());
 
-            if (proxyConfig.hasCredential()) {
-                proxyConfigBuilder.username(proxyConfig.getProxyUserName());
-                
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
+                if (proxyConfig.hasCredential()) {
+                    nettyProxyBuilder.username(proxyConfig.getProxyUserName());
+                    
nettyProxyBuilder.password(proxyConfig.getProxyUserPassword());
+                }
+
+                nettyBuilder.proxyConfiguration(nettyProxyBuilder.build());
             }
 
-            builder.proxyConfiguration(proxyConfigBuilder.build());
-        }
+            asyncHttpClient = nettyBuilder.build();
 
-        return builder;
-    }
+            final KinesisAsyncClientBuilder asyncBuilder = 
KinesisAsyncClient.builder()
+                    .region(region)
+                    .credentialsProvider(credentialsProvider)
+                    .httpClient(asyncHttpClient);
 
-    private ReaderRecordProcessor createReaderRecordProcessor(final 
ProcessContext context) {
-        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        final RecordSetWriterFactory recordWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        final OutputStrategy outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
-        final KinesisRecordConverter converter = switch (outputStrategy) {
-            case USE_VALUE -> new ValueRecordConverter();
-            case USE_WRAPPER -> new WrapperRecordConverter();
-            case INJECT_METADATA -> new InjectMetadataRecordConverter();
-        };
+            if (endpointOverride != null && !endpointOverride.isEmpty()) {
+                asyncBuilder.endpointOverride(URI.create(endpointOverride));
+            }
 
-        return new ReaderRecordProcessor(recordReaderFactory, converter, 
recordWriterFactory, getLogger());
+            final String consumerName = 
context.getProperty(APPLICATION_NAME).getValue();
+            consumerClient.initialize(asyncBuilder.build(), streamName, 
consumerName);
+        }
     }
 
-    private static InitialPositionInStreamExtended getInitialPosition(final 
ProcessContext context) {
-        final InitialPosition initialPosition = 
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
-        return switch (initialPosition) {
-            case TRIM_HORIZON ->
-                
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
-            case LATEST -> 
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
-            case AT_TIMESTAMP -> {
-                final String timestampValue = 
context.getProperty(STREAM_POSITION_TIMESTAMP).getValue();
-                final Instant timestamp = Instant.parse(timestampValue);
-                yield 
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
-            }
-        };
+    private static Instant resolveTimestampPosition(final ProcessContext 
context) {
+        final InitialPosition position = 
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
+        if (position == InitialPosition.AT_TIMESTAMP) {
+            return 
Instant.parse(context.getProperty(STREAM_POSITION_TIMESTAMP).getValue());
+        }
+        return null;
     }
 
-    private String generateWorkerId() {
-        final String processorId = getIdentifier();
-        final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
-
-        final String workerId;
+    /**
+     * Builds an {@link ApacheHttpClient} with the given connection pool size 
and optional proxy
+     * configuration. Each AWS service client (Kinesis, DynamoDB) should 
receive its own HTTP client
+     * so their connection pools are isolated and cannot starve each other 
under high shard counts.
+     */
+    private static SdkHttpClient buildApacheHttpClient(final 
ProxyConfiguration proxyConfig, final int maxConnections) {
+        final ApacheHttpClient.Builder builder = ApacheHttpClient.builder()
+                .maxConnections(maxConnections);
 
-        if (nodeTypeProvider.isClustered()) {
-            // If a node id is not available for some reason, generating a 
random UUID helps to avoid collisions.
-            final String nodeId = 
nodeTypeProvider.getCurrentNode().orElse(UUID.randomUUID().toString());
-            workerId = "%s@%s".formatted(processorId, nodeId);
-        } else {
-            workerId = processorId;
-        }
+        if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+            final URI proxyEndpoint = URI.create(String.format("http://%s:%s";, 
proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort()));
+            final 
software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyBuilder =
+                    
software.amazon.awssdk.http.apache.ProxyConfiguration.builder().endpoint(proxyEndpoint);
 
-        return workerId;
-    }
+            if (proxyConfig.hasCredential()) {
+                proxyBuilder.username(proxyConfig.getProxyUserName());
+                proxyBuilder.password(proxyConfig.getProxyUserPassword());
+            }
 
-    private static @Nullable MetricsFactory configureMetricsFactory(final 
ProcessContext context) {
-        final MetricsPublishing metricsPublishing = 
context.getProperty(METRICS_PUBLISHING).asAllowableValue(MetricsPublishing.class);
-        return switch (metricsPublishing) {
-            case DISABLED -> new NullMetricsFactory();
-            case LOGS -> new LogMetricsFactory();
-            case CLOUDWATCH -> null; // If no metrics factory was provided, 
CloudWatch metrics factory is used by default.
-        };
-    }
+            builder.proxyConfiguration(proxyBuilder.build());
+        }
 
-    private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
-            final ProcessContext context,
-            final KinesisAsyncClient kinesisClient,
-            final String streamName,
-            final String applicationName) {
-        final ConsumerType consumerType = 
context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class);
-        return switch (consumerType) {
-            case SHARED_THROUGHPUT -> new 
PollingConfig(kinesisClient).streamName(streamName);
-            case ENHANCED_FAN_OUT -> new 
FanOutConfig(kinesisClient).streamName(streamName).applicationName(applicationName);
-        };
+        return builder.build();
     }
 
     @OnStopped
     public void onStopped() {
-        cleanUpState();
+        if (shardManager != null) {
+            shardManager.releaseAllLeases();
+            shardManager.close();
+            shardManager = null;
+        }
 
-        initialized.set(false);
-        initializationResultFuture = null;
-    }
+        if (consumerClient instanceof EfoKinesisClient efo) {
+            efoConsumerArn = efo.getConsumerArn();
+        }
+        if (consumerClient != null) {
+            consumerClient.close();
+            consumerClient = null;
+        }
 
-    private void cleanUpState() {
-        if (kinesisScheduler != null) {
-            shutdownScheduler();
-            kinesisScheduler = null;
+        if (asyncHttpClient != null) {
+            asyncHttpClient.close();
+            asyncHttpClient = null;
         }
 
         if (kinesisClient != null) {
             kinesisClient.close();
             kinesisClient = null;
         }
+
         if (dynamoDbClient != null) {
             dynamoDbClient.close();
             dynamoDbClient = null;
         }
-        if (cloudWatchClient != null) {
-            cloudWatchClient.close();
-            cloudWatchClient = null;
-        }
 
-        recordBuffer = null;
-        readerRecordProcessor = null;
-        demarcatorValue = null;
+        closeQuietly(kinesisHttpClient);
+        kinesisHttpClient = null;
+        closeQuietly(dynamoHttpClient);
+        dynamoHttpClient = null;
     }
 
-    private void shutdownScheduler() {
-        if (kinesisScheduler.shutdownComplete()) {
+    @OnRemoved
+    public void onRemoved(final ProcessContext context) {
+        final String arn = efoConsumerArn;
+        efoConsumerArn = null;
+        if (arn == null) {
             return;
         }
 
-        final long start = System.nanoTime();
-        getLogger().debug("Shutting down Kinesis Scheduler");
+        final Region region = RegionUtil.getRegion(context);
+        final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+        final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-        boolean gracefulShutdownSucceeded;
-        try {
-            gracefulShutdownSucceeded = 
kinesisScheduler.startGracefulShutdown().get(KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(),
 SECONDS);
-            if (!gracefulShutdownSucceeded) {
-                getLogger().warn("Failed to shutdown Kinesis Scheduler 
gracefully. See the logs for more details");
-            }
-        } catch (final RuntimeException | InterruptedException | 
ExecutionException | TimeoutException e) {
-            if (e instanceof TimeoutException) {
-                getLogger().warn("Failed to shutdown Kinesis Scheduler 
gracefully after {} seconds", 
KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), e);
-            } else {
-                getLogger().warn("Failed to shutdown Kinesis Scheduler 
gracefully", e);
-            }
-            gracefulShutdownSucceeded = false;
-        }
+        final KinesisClientBuilder builder = KinesisClient.builder()
+                .region(region)
+                .credentialsProvider(credentialsProvider);
 
-        if (!gracefulShutdownSucceeded) {
-            kinesisScheduler.shutdown();
+        if (endpointOverride != null && !endpointOverride.isEmpty()) {
+            builder.endpointOverride(URI.create(endpointOverride));
         }
 
-        final long finish = System.nanoTime();
-        getLogger().debug("Kinesis Scheduler shutdown finished after {} 
seconds", NANOSECONDS.toSeconds(finish - start));
+        try (final KinesisClient tempClient = builder.build()) {
+            
tempClient.deregisterStreamConsumer(DeregisterStreamConsumerRequest.builder()
+                    .consumerARN(arn)
+                    .build());
+            getLogger().info("Deregistered EFO consumer [{}]", arn);
+        } catch (final Exception e) {
+            getLogger().warn("Failed to deregister EFO consumer [{}]; manual 
cleanup may be required", arn, e);
+        }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        if (!initialized.get()) {
-            if (!initializationResultFuture.isDone()) {
-                getLogger().debug("Waiting for Kinesis Scheduler to finish 
initialization");
+        final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+        final int clusterMemberCount = nodeTypeProvider.isClustered() ? 0 : 
Math.max(1, nodeTypeProvider.getClusterMembers().size());
+        shardManager.refreshLeasesIfNecessary(clusterMemberCount);
+        final List<Shard> ownedShards = shardManager.getOwnedShards();
+
+        if (ownedShards.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        final Set<String> ownedShardIds = new HashSet<>();

Review Comment:
   Nit: 
   ```suggestion
           final Set<String> ownedShardIds = 
HashSet.newHashSet(ownedShards.size());
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +335,1198 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
     private static final Set<Relationship> RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
     private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-    private volatile DynamoDbAsyncClient dynamoDbClient;
-    private volatile CloudWatchAsyncClient cloudWatchClient;
-    private volatile KinesisAsyncClient kinesisClient;
-    private volatile Scheduler kinesisScheduler;
-
+    private volatile SdkHttpClient kinesisHttpClient;
+    private volatile SdkHttpClient dynamoHttpClient;
+    private volatile KinesisClient kinesisClient;
+    private volatile DynamoDbClient dynamoDbClient;
+    private volatile SdkAsyncHttpClient asyncHttpClient;
+    private volatile KinesisShardManager shardManager;
+    private volatile KinesisConsumerClient consumerClient;
     private volatile String streamName;
-    private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
-    private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-    private volatile @Nullable byte[] demarcatorValue;
+    private volatile int maxRecordsPerRequest;
+    private volatile String initialStreamPosition;
+    private volatile long maxBatchNanos;
+    private volatile long maxBatchBytes;
 
-    private volatile Future<InitializationResult> initializationResultFuture;
-    private final AtomicBoolean initialized = new AtomicBoolean();
-
-    // An instance filed, so that it can be read in getRelationships.
-    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-            PROCESSING_STRATEGY.getDefaultValue());
+    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
+    private volatile String efoConsumerArn;
+    private final AtomicLong shardRoundRobinCounter = new AtomicLong();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTY_DESCRIPTORS;
     }
 
-    @Override
-    public void migrateProperties(final PropertyConfiguration config) {
-        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-    }
-
     @Override
     public Set<Relationship> getRelationships() {
         return switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+            case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
             case RECORD -> RECORD_FILE_RELATIONSHIPS;
         };
     }
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (descriptor.equals(PROCESSING_STRATEGY)) {
-            processingStrategy = ProcessingStrategy.from(newValue);
+            processingStrategy = ProcessingStrategy.valueOf(newValue);
         }
     }
 
-    @OnScheduled
-    public void setup(final ProcessContext context) {
-        readerRecordProcessor = switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> null;
-            case RECORD -> createReaderRecordProcessor(context);
-        };
-        demarcatorValue = switch (processingStrategy) {
-            case FLOW_FILE, RECORD -> null;
-            case DEMARCATOR -> {
-                final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-                yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-            }
-        };
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+        config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+        config.removeProperty("Checkpoint Interval");
+        config.removeProperty("Metrics Publishing");
+    }
 
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
         final Region region = RegionUtil.getRegion(context);
         final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
                 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+        final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-        kinesisClient = KinesisAsyncClient.builder()
-                .region(region)
-                .credentialsProvider(credentialsProvider)
-                .endpointOverride(getKinesisEndpointOverride())
-                .httpClient(createKinesisHttpClient(context))
+        final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+                .apiCallTimeout(API_CALL_TIMEOUT)
+                .apiCallAttemptTimeout(API_CALL_ATTEMPT_TIMEOUT)
                 .build();
 
-        dynamoDbClient = DynamoDbAsyncClient.builder()
+        final KinesisClientBuilder kinesisBuilder = KinesisClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getDynamoDbEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
+                .overrideConfiguration(clientConfig);
 
-        cloudWatchClient = CloudWatchAsyncClient.builder()
+        final DynamoDbClientBuilder dynamoBuilder = DynamoDbClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getCloudwatchEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
+                .overrideConfiguration(clientConfig);
 
-        streamName = context.getProperty(STREAM_NAME).getValue();
-        final InitialPositionInStreamExtended initialPositionExtended = 
getInitialPosition(context);
-        final SingleStreamTracker streamTracker = new 
SingleStreamTracker(streamName, initialPositionExtended);
-
-        final long maxBytesToBuffer = 
context.getProperty(MAX_BYTES_TO_BUFFER).asDataSize(DataUnit.B).longValue();
-        final Duration checkpointInterval = 
context.getProperty(CHECKPOINT_INTERVAL).asDuration();
-        final MemoryBoundRecordBuffer memoryBoundRecordBuffer = new 
MemoryBoundRecordBuffer(getLogger(), maxBytesToBuffer, checkpointInterval);
-        recordBuffer = memoryBoundRecordBuffer;
-        final ShardRecordProcessorFactory recordProcessorFactory = () -> new 
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
-
-        final String applicationName = 
context.getProperty(APPLICATION_NAME).getValue();
-        final String workerId = generateWorkerId();
-        final ConfigsBuilder configsBuilder = new 
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient, 
cloudWatchClient, workerId, recordProcessorFactory);
-
-        final MetricsFactory metricsFactory = configureMetricsFactory(context);
-        final RetrievalSpecificConfig retrievalSpecificConfig = 
configureRetrievalSpecificConfig(context, kinesisClient, streamName, 
applicationName);
-
-        final InitializationStateChangeListener initializationListener = new 
InitializationStateChangeListener(getLogger());
-        initialized.set(false);
-        initializationResultFuture = initializationListener.result();
-
-        kinesisScheduler = new Scheduler(
-                configsBuilder.checkpointConfig(),
-                
configsBuilder.coordinatorConfig().workerStateChangeListener(initializationListener),
-                configsBuilder.leaseManagementConfig(),
-                configsBuilder.lifecycleConfig(),
-                configsBuilder.metricsConfig().metricsFactory(metricsFactory),
-                configsBuilder.processorConfig(),
-                
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
-        );
-
-        final String schedulerThreadName = 
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
-        final Thread schedulerThread = new Thread(kinesisScheduler, 
schedulerThreadName);
-        schedulerThread.setDaemon(true);
-        schedulerThread.start();
-        // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
-
-        try {
-            final InitializationResult result = initializationResultFuture.get(
-                    
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
-            checkInitializationResult(result);
-        } catch (final TimeoutException e) {
-            // During a first run the processor will take more time to 
initialize. We return from OnSchedule and continue waiting in the onTrigger 
method.
-            getLogger().warn("Kinesis Scheduler initialization may take up to 
10 minutes on a first run, which is caused by AWS resources initialization");
-        } catch (final InterruptedException | ExecutionException e) {
-            if (e instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-            cleanUpState();
-            throw new ProcessException("Initialization failed for stream 
[%s]".formatted(streamName), e);
+        if (endpointOverride != null && !endpointOverride.isEmpty()) {
+            final URI endpointUri = URI.create(endpointOverride);
+            kinesisBuilder.endpointOverride(endpointUri);
+            dynamoBuilder.endpointOverride(endpointUri);
         }
-    }
 
-    /**
-     * Creating Kinesis HTTP client, as per
-     * {@link 
software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
-     */
-    private static SdkAsyncHttpClient createKinesisHttpClient(final 
ProcessContext context) {
-        return createHttpClientBuilder(context)
-                .protocol(Protocol.HTTP2)
-                // Since we're using HTTP/2, multiple concurrent requests will 
reuse the same HTTP connection.
-                // Therefore, the number of real connections is going to be 
relatively small.
-                .maxConcurrency(Integer.MAX_VALUE)
-                .http2Configuration(Http2Configuration.builder()
-                        
.initialWindowSize(KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES)
-                        
.healthCheckPingPeriod(KINESIS_HTTP_HEALTH_CHECK_PERIOD)
-                        .build())
-                .build();
-    }
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        kinesisHttpClient = buildApacheHttpClient(proxyConfig, 
PollingKinesisClient.MAX_CONCURRENT_FETCHES + 10);
+        dynamoHttpClient = buildApacheHttpClient(proxyConfig, 50);
+        kinesisBuilder.httpClient(kinesisHttpClient);
+        dynamoBuilder.httpClient(dynamoHttpClient);
+
+        kinesisClient = kinesisBuilder.build();
+        dynamoDbClient = dynamoBuilder.build();
 
-    private static NettyNioAsyncHttpClient.Builder 
createHttpClientBuilder(final ProcessContext context) {
-        final NettyNioAsyncHttpClient.Builder builder = 
NettyNioAsyncHttpClient.builder()
-                .connectionTimeout(HTTP_CLIENTS_CONNECTION_TIMEOUT)
-                .readTimeout(HTTP_CLIENTS_READ_TIMEOUT);
+        final String checkpointTableName = 
context.getProperty(APPLICATION_NAME).getValue();
+        streamName = context.getProperty(STREAM_NAME).getValue();
+        initialStreamPosition = 
context.getProperty(INITIAL_STREAM_POSITION).getValue();
+        maxBatchNanos = 
context.getProperty(MAX_BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
+        maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B).longValue();
+
+        final boolean efoMode = 
ConsumerType.ENHANCED_FAN_OUT.equals(context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class));
+        maxRecordsPerRequest = efoMode ? 0 : 
context.getProperty(MAX_RECORDS_PER_REQUEST).asInteger();
+
+        shardManager = createShardManager(kinesisClient, dynamoDbClient, 
getLogger(), checkpointTableName, streamName);
+        shardManager.ensureCheckpointTableExists();
+        consumerClient = createConsumerClient(kinesisClient, getLogger(), 
efoMode);
+
+        final Instant timestampForPosition = resolveTimestampPosition(context);
+        if (timestampForPosition != null) {
+            if (consumerClient instanceof PollingKinesisClient polling) {
+                polling.setTimestampForInitialPosition(timestampForPosition);
+            } else if (consumerClient instanceof EfoKinesisClient efo) {
+                efo.setTimestampForInitialPosition(timestampForPosition);
+            }
+        }
 
-        final ProxyConfigurationService proxyConfigService = 
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
-        if (proxyConfigService != null) {
-            final ProxyConfiguration proxyConfig = 
proxyConfigService.getConfiguration();
+        if (efoMode) {
+            final NettyNioAsyncHttpClient.Builder nettyBuilder = 
NettyNioAsyncHttpClient.builder()
+                    .protocol(Protocol.HTTP2)
+                    .maxConcurrency(500)
+                    .connectionAcquisitionTimeout(Duration.ofSeconds(60));
 
-            final 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder 
proxyConfigBuilder = 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
+            if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+                final 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder 
nettyProxyBuilder = 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
                     .host(proxyConfig.getProxyServerHost())
                     .port(proxyConfig.getProxyServerPort());
 
-            if (proxyConfig.hasCredential()) {
-                proxyConfigBuilder.username(proxyConfig.getProxyUserName());
-                
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
+                if (proxyConfig.hasCredential()) {
+                    nettyProxyBuilder.username(proxyConfig.getProxyUserName());
+                    
nettyProxyBuilder.password(proxyConfig.getProxyUserPassword());
+                }
+
+                nettyBuilder.proxyConfiguration(nettyProxyBuilder.build());
             }
 
-            builder.proxyConfiguration(proxyConfigBuilder.build());
-        }
+            asyncHttpClient = nettyBuilder.build();
 
-        return builder;
-    }
+            final KinesisAsyncClientBuilder asyncBuilder = 
KinesisAsyncClient.builder()
+                    .region(region)
+                    .credentialsProvider(credentialsProvider)
+                    .httpClient(asyncHttpClient);
 
-    private ReaderRecordProcessor createReaderRecordProcessor(final 
ProcessContext context) {
-        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        final RecordSetWriterFactory recordWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        final OutputStrategy outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
-        final KinesisRecordConverter converter = switch (outputStrategy) {
-            case USE_VALUE -> new ValueRecordConverter();
-            case USE_WRAPPER -> new WrapperRecordConverter();
-            case INJECT_METADATA -> new InjectMetadataRecordConverter();
-        };
+            if (endpointOverride != null && !endpointOverride.isEmpty()) {
+                asyncBuilder.endpointOverride(URI.create(endpointOverride));
+            }
 
-        return new ReaderRecordProcessor(recordReaderFactory, converter, 
recordWriterFactory, getLogger());
+            final String consumerName = 
context.getProperty(APPLICATION_NAME).getValue();
+            consumerClient.initialize(asyncBuilder.build(), streamName, 
consumerName);
+        }
     }
 
-    private static InitialPositionInStreamExtended getInitialPosition(final 
ProcessContext context) {
-        final InitialPosition initialPosition = 
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
-        return switch (initialPosition) {
-            case TRIM_HORIZON ->
-                
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
-            case LATEST -> 
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
-            case AT_TIMESTAMP -> {
-                final String timestampValue = 
context.getProperty(STREAM_POSITION_TIMESTAMP).getValue();
-                final Instant timestamp = Instant.parse(timestampValue);
-                yield 
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
-            }
-        };
+    private static Instant resolveTimestampPosition(final ProcessContext 
context) {
+        final InitialPosition position = 
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
+        if (position == InitialPosition.AT_TIMESTAMP) {
+            return 
Instant.parse(context.getProperty(STREAM_POSITION_TIMESTAMP).getValue());
+        }
+        return null;
     }
 
-    private String generateWorkerId() {
-        final String processorId = getIdentifier();
-        final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
-
-        final String workerId;
+    /**
+     * Builds an {@link ApacheHttpClient} with the given connection pool size 
and optional proxy
+     * configuration. Each AWS service client (Kinesis, DynamoDB) should 
receive its own HTTP client
+     * so their connection pools are isolated and cannot starve each other 
under high shard counts.
+     */
+    private static SdkHttpClient buildApacheHttpClient(final 
ProxyConfiguration proxyConfig, final int maxConnections) {
+        final ApacheHttpClient.Builder builder = ApacheHttpClient.builder()
+                .maxConnections(maxConnections);
 
-        if (nodeTypeProvider.isClustered()) {
-            // If a node id is not available for some reason, generating a 
random UUID helps to avoid collisions.
-            final String nodeId = 
nodeTypeProvider.getCurrentNode().orElse(UUID.randomUUID().toString());
-            workerId = "%s@%s".formatted(processorId, nodeId);
-        } else {
-            workerId = processorId;
-        }
+        if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+            final URI proxyEndpoint = URI.create(String.format("http://%s:%s";, 
proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort()));
+            final 
software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyBuilder =
+                    
software.amazon.awssdk.http.apache.ProxyConfiguration.builder().endpoint(proxyEndpoint);
 
-        return workerId;
-    }
+            if (proxyConfig.hasCredential()) {
+                proxyBuilder.username(proxyConfig.getProxyUserName());
+                proxyBuilder.password(proxyConfig.getProxyUserPassword());
+            }
 
-    private static @Nullable MetricsFactory configureMetricsFactory(final 
ProcessContext context) {
-        final MetricsPublishing metricsPublishing = 
context.getProperty(METRICS_PUBLISHING).asAllowableValue(MetricsPublishing.class);
-        return switch (metricsPublishing) {
-            case DISABLED -> new NullMetricsFactory();
-            case LOGS -> new LogMetricsFactory();
-            case CLOUDWATCH -> null; // If no metrics factory was provided, 
CloudWatch metrics factory is used by default.
-        };
-    }
+            builder.proxyConfiguration(proxyBuilder.build());
+        }
 
-    private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
-            final ProcessContext context,
-            final KinesisAsyncClient kinesisClient,
-            final String streamName,
-            final String applicationName) {
-        final ConsumerType consumerType = 
context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class);
-        return switch (consumerType) {
-            case SHARED_THROUGHPUT -> new 
PollingConfig(kinesisClient).streamName(streamName);
-            case ENHANCED_FAN_OUT -> new 
FanOutConfig(kinesisClient).streamName(streamName).applicationName(applicationName);
-        };
+        return builder.build();
     }
 
     @OnStopped
     public void onStopped() {
-        cleanUpState();
+        if (shardManager != null) {
+            shardManager.releaseAllLeases();
+            shardManager.close();
+            shardManager = null;
+        }
 
-        initialized.set(false);
-        initializationResultFuture = null;
-    }
+        if (consumerClient instanceof EfoKinesisClient efo) {
+            efoConsumerArn = efo.getConsumerArn();
+        }
+        if (consumerClient != null) {
+            consumerClient.close();
+            consumerClient = null;
+        }
 
-    private void cleanUpState() {
-        if (kinesisScheduler != null) {
-            shutdownScheduler();
-            kinesisScheduler = null;
+        if (asyncHttpClient != null) {
+            asyncHttpClient.close();
+            asyncHttpClient = null;
         }
 
         if (kinesisClient != null) {
             kinesisClient.close();
             kinesisClient = null;
         }
+
         if (dynamoDbClient != null) {
             dynamoDbClient.close();
             dynamoDbClient = null;
         }
-        if (cloudWatchClient != null) {
-            cloudWatchClient.close();
-            cloudWatchClient = null;
-        }
 
-        recordBuffer = null;
-        readerRecordProcessor = null;
-        demarcatorValue = null;
+        closeQuietly(kinesisHttpClient);
+        kinesisHttpClient = null;
+        closeQuietly(dynamoHttpClient);
+        dynamoHttpClient = null;
     }
 
-    private void shutdownScheduler() {
-        if (kinesisScheduler.shutdownComplete()) {
+    @OnRemoved
+    public void onRemoved(final ProcessContext context) {
+        final String arn = efoConsumerArn;
+        efoConsumerArn = null;
+        if (arn == null) {
             return;
         }
 
-        final long start = System.nanoTime();
-        getLogger().debug("Shutting down Kinesis Scheduler");
+        final Region region = RegionUtil.getRegion(context);
+        final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+        final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-        boolean gracefulShutdownSucceeded;
-        try {
-            gracefulShutdownSucceeded = 
kinesisScheduler.startGracefulShutdown().get(KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(),
 SECONDS);
-            if (!gracefulShutdownSucceeded) {
-                getLogger().warn("Failed to shutdown Kinesis Scheduler 
gracefully. See the logs for more details");
-            }
-        } catch (final RuntimeException | InterruptedException | 
ExecutionException | TimeoutException e) {
-            if (e instanceof TimeoutException) {
-                getLogger().warn("Failed to shutdown Kinesis Scheduler 
gracefully after {} seconds", 
KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), e);
-            } else {
-                getLogger().warn("Failed to shutdown Kinesis Scheduler 
gracefully", e);
-            }
-            gracefulShutdownSucceeded = false;
-        }
+        final KinesisClientBuilder builder = KinesisClient.builder()
+                .region(region)
+                .credentialsProvider(credentialsProvider);
 
-        if (!gracefulShutdownSucceeded) {
-            kinesisScheduler.shutdown();
+        if (endpointOverride != null && !endpointOverride.isEmpty()) {
+            builder.endpointOverride(URI.create(endpointOverride));
         }
 
-        final long finish = System.nanoTime();
-        getLogger().debug("Kinesis Scheduler shutdown finished after {} 
seconds", NANOSECONDS.toSeconds(finish - start));
+        try (final KinesisClient tempClient = builder.build()) {
+            
tempClient.deregisterStreamConsumer(DeregisterStreamConsumerRequest.builder()
+                    .consumerARN(arn)
+                    .build());
+            getLogger().info("Deregistered EFO consumer [{}]", arn);
+        } catch (final Exception e) {
+            getLogger().warn("Failed to deregister EFO consumer [{}]; manual 
cleanup may be required", arn, e);
+        }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        if (!initialized.get()) {
-            if (!initializationResultFuture.isDone()) {
-                getLogger().debug("Waiting for Kinesis Scheduler to finish 
initialization");
+        final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+        final int clusterMemberCount = nodeTypeProvider.isClustered() ? 0 : 
Math.max(1, nodeTypeProvider.getClusterMembers().size());
+        shardManager.refreshLeasesIfNecessary(clusterMemberCount);
+        final List<Shard> ownedShards = shardManager.getOwnedShards();
+
+        if (ownedShards.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        final Set<String> ownedShardIds = new HashSet<>();
+        for (final Shard shard : ownedShards) {
+            ownedShardIds.add(shard.shardId());
+        }
+
+        consumerClient.removeUnownedShards(ownedShardIds);
+        consumerClient.startFetches(ownedShards, streamName, 
maxRecordsPerRequest, initialStreamPosition, shardManager);
+        consumerClient.logDiagnostics(ownedShards.size(), 
shardManager.getCachedShardCount());
+
+        final Set<String> claimedShards = new HashSet<>();
+        try {
+            final List<ShardFetchResult> consumed = 
consumeRecords(claimedShards);
+            final List<ShardFetchResult> accepted = 
discardRelinquishedResults(consumed, claimedShards);
+
+            if (accepted.isEmpty()) {
+                consumerClient.releaseShards(claimedShards);
+                context.yield();
+                return;
+            }
+
+            final PartitionedBatch batch = 
partitionByShardAndCheckpoint(accepted);
+
+            final WriteResult output;
+            try {
+                output = writeResults(session, context, 
batch.resultsByShard());
+            } catch (final Exception e) {
+                handleWriteFailure(e, accepted, claimedShards, context);
+                return;
+            }
+
+            if (output.produced().isEmpty() && 
output.parseFailures().isEmpty()) {
+                consumerClient.releaseShards(claimedShards);
                 context.yield();
                 return;
             }
 
-            checkInitializationResult(initializationResultFuture.resultNow());
+            session.transfer(output.produced(), REL_SUCCESS);
+            if (!output.parseFailures().isEmpty()) {
+                session.transfer(output.parseFailures(), REL_PARSE_FAILURE);
+                session.adjustCounter("Records Parse Failure", 
output.parseFailures().size(), false);
+            }
+            session.adjustCounter("Records Consumed", 
output.totalRecordCount(), false);
+            final long dedupEvents = 
consumerClient.drainDeduplicatedEventCount();
+            if (dedupEvents > 0) {
+                session.adjustCounter("EFO Deduplicated Events", dedupEvents, 
false);
+            }
+
+            session.commitAsync(
+                () -> {
+                    try {
+                        shardManager.writeCheckpoints(batch.checkpoints());
+                    } finally {
+                        try {
+                            consumerClient.acknowledgeResults(accepted);
+                        } finally {
+                            consumerClient.releaseShards(claimedShards);
+                        }
+                    }
+                },
+                failure -> {
+                    try {
+                        getLogger().error("Session commit failed; resetting 
shard iterators for re-consumption", failure);
+                        consumerClient.rollbackResults(accepted);
+                    } finally {
+                        consumerClient.releaseShards(claimedShards);
+                    }
+                });
+        } catch (final Exception e) {
+            consumerClient.releaseShards(claimedShards);
+            throw e;
         }
+    }
 
-        final Optional<Lease> leaseAcquired = 
recordBuffer.acquireBufferLease();
+    private List<ShardFetchResult> discardRelinquishedResults(final 
List<ShardFetchResult> consumedResults, final Set<String> claimedShards) {
+        final List<ShardFetchResult> accepted = new ArrayList<>();
+        final List<ShardFetchResult> discarded = new ArrayList<>();
+        for (final ShardFetchResult result : consumedResults) {
+            if (shardManager.shouldProcessFetchedResult(result.shardId())) {
+                accepted.add(result);
+            } else {
+                discarded.add(result);
+            }
+        }
+
+        if (!discarded.isEmpty()) {
+            getLogger().debug("Discarding {} fetched shard result(s) for 
relinquished shards", discarded.size());
+            consumerClient.rollbackResults(discarded);
+            for (final ShardFetchResult result : discarded) {
+                claimedShards.remove(result.shardId());
+            }
+            
consumerClient.releaseShards(discarded.stream().map(ShardFetchResult::shardId).toList());
+        }
+
+        return accepted;
+    }
+
+    private PartitionedBatch partitionByShardAndCheckpoint(final 
List<ShardFetchResult> accepted) {
+        final Map<String, List<ShardFetchResult>> resultsByShard = new 
LinkedHashMap<>();
+        for (final ShardFetchResult result : accepted) {
+            resultsByShard.computeIfAbsent(result.shardId(), k -> new 
ArrayList<>()).add(result);
+        }
+        for (final List<ShardFetchResult> shardResults : 
resultsByShard.values()) {
+            
shardResults.sort(Comparator.comparing(ShardFetchResult::firstSequenceNumber));

Review Comment:
   I wonder if we really need to sort the results here.
   Since we're consuming data from a queue in `KinesisConsumerClient`, we 
should have the data ordered by sequence numbers already, right? We operate on 
lists everywhere, so the order is preserved.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +335,1198 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
     private static final Set<Relationship> RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
     private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-    private volatile DynamoDbAsyncClient dynamoDbClient;
-    private volatile CloudWatchAsyncClient cloudWatchClient;
-    private volatile KinesisAsyncClient kinesisClient;
-    private volatile Scheduler kinesisScheduler;
-
+    private volatile SdkHttpClient kinesisHttpClient;
+    private volatile SdkHttpClient dynamoHttpClient;
+    private volatile KinesisClient kinesisClient;
+    private volatile DynamoDbClient dynamoDbClient;
+    private volatile SdkAsyncHttpClient asyncHttpClient;
+    private volatile KinesisShardManager shardManager;
+    private volatile KinesisConsumerClient consumerClient;
     private volatile String streamName;
-    private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
-    private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-    private volatile @Nullable byte[] demarcatorValue;
+    private volatile int maxRecordsPerRequest;
+    private volatile String initialStreamPosition;
+    private volatile long maxBatchNanos;
+    private volatile long maxBatchBytes;
 
-    private volatile Future<InitializationResult> initializationResultFuture;
-    private final AtomicBoolean initialized = new AtomicBoolean();
-
-    // An instance filed, so that it can be read in getRelationships.
-    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-            PROCESSING_STRATEGY.getDefaultValue());
+    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
+    private volatile String efoConsumerArn;
+    private final AtomicLong shardRoundRobinCounter = new AtomicLong();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTY_DESCRIPTORS;
     }
 
-    @Override
-    public void migrateProperties(final PropertyConfiguration config) {
-        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-    }
-
     @Override
     public Set<Relationship> getRelationships() {
         return switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+            case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
             case RECORD -> RECORD_FILE_RELATIONSHIPS;
         };
     }
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (descriptor.equals(PROCESSING_STRATEGY)) {
-            processingStrategy = ProcessingStrategy.from(newValue);
+            processingStrategy = ProcessingStrategy.valueOf(newValue);
         }
     }
 
-    @OnScheduled
-    public void setup(final ProcessContext context) {
-        readerRecordProcessor = switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> null;
-            case RECORD -> createReaderRecordProcessor(context);
-        };
-        demarcatorValue = switch (processingStrategy) {
-            case FLOW_FILE, RECORD -> null;
-            case DEMARCATOR -> {
-                final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-                yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-            }
-        };
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+        config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+        config.removeProperty("Checkpoint Interval");
+        config.removeProperty("Metrics Publishing");
+    }
 
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
         final Region region = RegionUtil.getRegion(context);
         final AwsCredentialsProvider credentialsProvider = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
                 
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+        final String endpointOverride = 
context.getProperty(ENDPOINT_OVERRIDE).getValue();
 
-        kinesisClient = KinesisAsyncClient.builder()
-                .region(region)
-                .credentialsProvider(credentialsProvider)
-                .endpointOverride(getKinesisEndpointOverride())
-                .httpClient(createKinesisHttpClient(context))
+        final ClientOverrideConfiguration clientConfig = 
ClientOverrideConfiguration.builder()
+                .apiCallTimeout(API_CALL_TIMEOUT)
+                .apiCallAttemptTimeout(API_CALL_ATTEMPT_TIMEOUT)
                 .build();
 
-        dynamoDbClient = DynamoDbAsyncClient.builder()
+        final KinesisClientBuilder kinesisBuilder = KinesisClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getDynamoDbEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
+                .overrideConfiguration(clientConfig);
 
-        cloudWatchClient = CloudWatchAsyncClient.builder()
+        final DynamoDbClientBuilder dynamoBuilder = DynamoDbClient.builder()
                 .region(region)
                 .credentialsProvider(credentialsProvider)
-                .endpointOverride(getCloudwatchEndpointOverride())
-                .httpClient(createHttpClientBuilder(context).build())
-                .build();
+                .overrideConfiguration(clientConfig);
 
-        streamName = context.getProperty(STREAM_NAME).getValue();
-        final InitialPositionInStreamExtended initialPositionExtended = 
getInitialPosition(context);
-        final SingleStreamTracker streamTracker = new 
SingleStreamTracker(streamName, initialPositionExtended);
-
-        final long maxBytesToBuffer = 
context.getProperty(MAX_BYTES_TO_BUFFER).asDataSize(DataUnit.B).longValue();
-        final Duration checkpointInterval = 
context.getProperty(CHECKPOINT_INTERVAL).asDuration();
-        final MemoryBoundRecordBuffer memoryBoundRecordBuffer = new 
MemoryBoundRecordBuffer(getLogger(), maxBytesToBuffer, checkpointInterval);
-        recordBuffer = memoryBoundRecordBuffer;
-        final ShardRecordProcessorFactory recordProcessorFactory = () -> new 
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
-
-        final String applicationName = 
context.getProperty(APPLICATION_NAME).getValue();
-        final String workerId = generateWorkerId();
-        final ConfigsBuilder configsBuilder = new 
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient, 
cloudWatchClient, workerId, recordProcessorFactory);
-
-        final MetricsFactory metricsFactory = configureMetricsFactory(context);
-        final RetrievalSpecificConfig retrievalSpecificConfig = 
configureRetrievalSpecificConfig(context, kinesisClient, streamName, 
applicationName);
-
-        final InitializationStateChangeListener initializationListener = new 
InitializationStateChangeListener(getLogger());
-        initialized.set(false);
-        initializationResultFuture = initializationListener.result();
-
-        kinesisScheduler = new Scheduler(
-                configsBuilder.checkpointConfig(),
-                
configsBuilder.coordinatorConfig().workerStateChangeListener(initializationListener),
-                configsBuilder.leaseManagementConfig(),
-                configsBuilder.lifecycleConfig(),
-                configsBuilder.metricsConfig().metricsFactory(metricsFactory),
-                configsBuilder.processorConfig(),
-                
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
-        );
-
-        final String schedulerThreadName = 
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
-        final Thread schedulerThread = new Thread(kinesisScheduler, 
schedulerThreadName);
-        schedulerThread.setDaemon(true);
-        schedulerThread.start();
-        // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
-
-        try {
-            final InitializationResult result = initializationResultFuture.get(
-                    
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
-            checkInitializationResult(result);
-        } catch (final TimeoutException e) {
-            // During a first run the processor will take more time to 
initialize. We return from OnSchedule and continue waiting in the onTrigger 
method.
-            getLogger().warn("Kinesis Scheduler initialization may take up to 
10 minutes on a first run, which is caused by AWS resources initialization");
-        } catch (final InterruptedException | ExecutionException e) {
-            if (e instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-            cleanUpState();
-            throw new ProcessException("Initialization failed for stream 
[%s]".formatted(streamName), e);
+        if (endpointOverride != null && !endpointOverride.isEmpty()) {
+            final URI endpointUri = URI.create(endpointOverride);
+            kinesisBuilder.endpointOverride(endpointUri);
+            dynamoBuilder.endpointOverride(endpointUri);
         }
-    }
 
-    /**
-     * Creating Kinesis HTTP client, as per
-     * {@link 
software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
-     */
-    private static SdkAsyncHttpClient createKinesisHttpClient(final 
ProcessContext context) {
-        return createHttpClientBuilder(context)
-                .protocol(Protocol.HTTP2)
-                // Since we're using HTTP/2, multiple concurrent requests will 
reuse the same HTTP connection.
-                // Therefore, the number of real connections is going to be 
relatively small.
-                .maxConcurrency(Integer.MAX_VALUE)
-                .http2Configuration(Http2Configuration.builder()
-                        
.initialWindowSize(KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES)
-                        
.healthCheckPingPeriod(KINESIS_HTTP_HEALTH_CHECK_PERIOD)
-                        .build())
-                .build();
-    }
+        final ProxyConfiguration proxyConfig = 
ProxyConfiguration.getConfiguration(context);
+
+        kinesisHttpClient = buildApacheHttpClient(proxyConfig, 
PollingKinesisClient.MAX_CONCURRENT_FETCHES + 10);
+        dynamoHttpClient = buildApacheHttpClient(proxyConfig, 50);
+        kinesisBuilder.httpClient(kinesisHttpClient);
+        dynamoBuilder.httpClient(dynamoHttpClient);
+
+        kinesisClient = kinesisBuilder.build();
+        dynamoDbClient = dynamoBuilder.build();
 
-    private static NettyNioAsyncHttpClient.Builder 
createHttpClientBuilder(final ProcessContext context) {
-        final NettyNioAsyncHttpClient.Builder builder = 
NettyNioAsyncHttpClient.builder()
-                .connectionTimeout(HTTP_CLIENTS_CONNECTION_TIMEOUT)
-                .readTimeout(HTTP_CLIENTS_READ_TIMEOUT);
+        final String checkpointTableName = 
context.getProperty(APPLICATION_NAME).getValue();
+        streamName = context.getProperty(STREAM_NAME).getValue();
+        initialStreamPosition = 
context.getProperty(INITIAL_STREAM_POSITION).getValue();
+        maxBatchNanos = 
context.getProperty(MAX_BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
+        maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B).longValue();
+
+        final boolean efoMode = 
ConsumerType.ENHANCED_FAN_OUT.equals(context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class));
+        maxRecordsPerRequest = efoMode ? 0 : 
context.getProperty(MAX_RECORDS_PER_REQUEST).asInteger();
+
+        shardManager = createShardManager(kinesisClient, dynamoDbClient, 
getLogger(), checkpointTableName, streamName);
+        shardManager.ensureCheckpointTableExists();
+        consumerClient = createConsumerClient(kinesisClient, getLogger(), 
efoMode);
+
+        final Instant timestampForPosition = resolveTimestampPosition(context);
+        if (timestampForPosition != null) {
+            if (consumerClient instanceof PollingKinesisClient polling) {
+                polling.setTimestampForInitialPosition(timestampForPosition);
+            } else if (consumerClient instanceof EfoKinesisClient efo) {
+                efo.setTimestampForInitialPosition(timestampForPosition);
+            }
+        }
 
-        final ProxyConfigurationService proxyConfigService = 
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
-        if (proxyConfigService != null) {
-            final ProxyConfiguration proxyConfig = 
proxyConfigService.getConfiguration();
+        if (efoMode) {
+            final NettyNioAsyncHttpClient.Builder nettyBuilder = 
NettyNioAsyncHttpClient.builder()
+                    .protocol(Protocol.HTTP2)
+                    .maxConcurrency(500)
+                    .connectionAcquisitionTimeout(Duration.ofSeconds(60));
 
-            final 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder 
proxyConfigBuilder = 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
+            if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+                final 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder 
nettyProxyBuilder = 
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
                     .host(proxyConfig.getProxyServerHost())
                     .port(proxyConfig.getProxyServerPort());
 
-            if (proxyConfig.hasCredential()) {
-                proxyConfigBuilder.username(proxyConfig.getProxyUserName());
-                
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
+                if (proxyConfig.hasCredential()) {
+                    nettyProxyBuilder.username(proxyConfig.getProxyUserName());
+                    
nettyProxyBuilder.password(proxyConfig.getProxyUserPassword());
+                }
+
+                nettyBuilder.proxyConfiguration(nettyProxyBuilder.build());
             }
 
-            builder.proxyConfiguration(proxyConfigBuilder.build());
-        }
+            asyncHttpClient = nettyBuilder.build();
 
-        return builder;
-    }
+            final KinesisAsyncClientBuilder asyncBuilder = 
KinesisAsyncClient.builder()
+                    .region(region)
+                    .credentialsProvider(credentialsProvider)
+                    .httpClient(asyncHttpClient);
 
-    private ReaderRecordProcessor createReaderRecordProcessor(final 
ProcessContext context) {
-        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        final RecordSetWriterFactory recordWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        final OutputStrategy outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
-        final KinesisRecordConverter converter = switch (outputStrategy) {
-            case USE_VALUE -> new ValueRecordConverter();
-            case USE_WRAPPER -> new WrapperRecordConverter();
-            case INJECT_METADATA -> new InjectMetadataRecordConverter();
-        };
+            if (endpointOverride != null && !endpointOverride.isEmpty()) {
+                asyncBuilder.endpointOverride(URI.create(endpointOverride));
+            }
 
-        return new ReaderRecordProcessor(recordReaderFactory, converter, 
recordWriterFactory, getLogger());
+            final String consumerName = 
context.getProperty(APPLICATION_NAME).getValue();
+            consumerClient.initialize(asyncBuilder.build(), streamName, 
consumerName);
+        }
     }
 
-    private static InitialPositionInStreamExtended getInitialPosition(final 
ProcessContext context) {
-        final InitialPosition initialPosition = 
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
-        return switch (initialPosition) {
-            case TRIM_HORIZON ->
-                
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
-            case LATEST -> 
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
-            case AT_TIMESTAMP -> {
-                final String timestampValue = 
context.getProperty(STREAM_POSITION_TIMESTAMP).getValue();
-                final Instant timestamp = Instant.parse(timestampValue);
-                yield 
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
-            }
-        };
+    private static Instant resolveTimestampPosition(final ProcessContext 
context) {
+        final InitialPosition position = 
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
+        if (position == InitialPosition.AT_TIMESTAMP) {
+            return 
Instant.parse(context.getProperty(STREAM_POSITION_TIMESTAMP).getValue());
+        }
+        return null;
     }
 
-    private String generateWorkerId() {
-        final String processorId = getIdentifier();
-        final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
-
-        final String workerId;
+    /**
+     * Builds an {@link ApacheHttpClient} with the given connection pool size 
and optional proxy
+     * configuration. Each AWS service client (Kinesis, DynamoDB) should 
receive its own HTTP client
+     * so their connection pools are isolated and cannot starve each other 
under high shard counts.
+     */
+    private static SdkHttpClient buildApacheHttpClient(final 
ProxyConfiguration proxyConfig, final int maxConnections) {
+        final ApacheHttpClient.Builder builder = ApacheHttpClient.builder()
+                .maxConnections(maxConnections);
 
-        if (nodeTypeProvider.isClustered()) {
-            // If a node id is not available for some reason, generating a 
random UUID helps to avoid collisions.
-            final String nodeId = 
nodeTypeProvider.getCurrentNode().orElse(UUID.randomUUID().toString());
-            workerId = "%s@%s".formatted(processorId, nodeId);
-        } else {
-            workerId = processorId;
-        }
+        if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+            final URI proxyEndpoint = URI.create(String.format("http://%s:%s";, 
proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort()));
+            final 
software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyBuilder =
+                    
software.amazon.awssdk.http.apache.ProxyConfiguration.builder().endpoint(proxyEndpoint);
 
-        return workerId;
-    }
+            if (proxyConfig.hasCredential()) {
+                proxyBuilder.username(proxyConfig.getProxyUserName());
+                proxyBuilder.password(proxyConfig.getProxyUserPassword());
+            }
 
-    private static @Nullable MetricsFactory configureMetricsFactory(final 
ProcessContext context) {
-        final MetricsPublishing metricsPublishing = 
context.getProperty(METRICS_PUBLISHING).asAllowableValue(MetricsPublishing.class);
-        return switch (metricsPublishing) {
-            case DISABLED -> new NullMetricsFactory();
-            case LOGS -> new LogMetricsFactory();
-            case CLOUDWATCH -> null; // If no metrics factory was provided, 
CloudWatch metrics factory is used by default.
-        };
-    }
+            builder.proxyConfiguration(proxyBuilder.build());
+        }
 
-    private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
-            final ProcessContext context,
-            final KinesisAsyncClient kinesisClient,
-            final String streamName,
-            final String applicationName) {
-        final ConsumerType consumerType = 
context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class);
-        return switch (consumerType) {
-            case SHARED_THROUGHPUT -> new 
PollingConfig(kinesisClient).streamName(streamName);
-            case ENHANCED_FAN_OUT -> new 
FanOutConfig(kinesisClient).streamName(streamName).applicationName(applicationName);
-        };
+        return builder.build();
     }
 
     @OnStopped
     public void onStopped() {
-        cleanUpState();
+        if (shardManager != null) {
+            shardManager.releaseAllLeases();
+            shardManager.close();
+            shardManager = null;
+        }
 
-        initialized.set(false);
-        initializationResultFuture = null;
-    }
+        if (consumerClient instanceof EfoKinesisClient efo) {
+            efoConsumerArn = efo.getConsumerArn();
+        }
+        if (consumerClient != null) {
+            consumerClient.close();
+            consumerClient = null;
+        }
 
-    private void cleanUpState() {
-        if (kinesisScheduler != null) {
-            shutdownScheduler();
-            kinesisScheduler = null;
+        if (asyncHttpClient != null) {
+            asyncHttpClient.close();
+            asyncHttpClient = null;
         }
 
         if (kinesisClient != null) {
             kinesisClient.close();
             kinesisClient = null;
         }
+
         if (dynamoDbClient != null) {
             dynamoDbClient.close();
             dynamoDbClient = null;
         }
-        if (cloudWatchClient != null) {
-            cloudWatchClient.close();
-            cloudWatchClient = null;
-        }
 
-        recordBuffer = null;
-        readerRecordProcessor = null;
-        demarcatorValue = null;
+        closeQuietly(kinesisHttpClient);
+        kinesisHttpClient = null;
+        closeQuietly(dynamoHttpClient);
+        dynamoHttpClient = null;
     }
 
-    private void shutdownScheduler() {
-        if (kinesisScheduler.shutdownComplete()) {
+    @OnRemoved
+    public void onRemoved(final ProcessContext context) {

Review Comment:
   This approach to deregistering consumers looks good to me.
   What should happen when the processor's `CONSUMER_TYPE` changes? Now we wait 
for the processor to be removed. Furthermore, if the `APPLICATION_NAME` is 
changed, the old consumer will be orphaned. 
   
   Should we deregister the consumer immediately when `CONSUMER_TYPE` changes 
to `SHARED_THROUGHPUT` or when `CONSUMER_TYPE` is efo and `APPLICATION_NAME` 
changes? 



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClient.java:
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis;
+
+import org.apache.nifi.logging.ComponentLog;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Shared-throughput Kinesis consumer that runs a continuous background fetch 
loop per shard.
+ * Each owned shard gets its own virtual thread that repeatedly calls 
GetRecords and enqueues
+ * results for the processor, mirroring the producer-consumer architecture of 
the KCL Scheduler.
+ * This keeps data flowing between onTrigger invocations rather than fetching 
on-demand.
+ *
+ * <p>Concurrency is bounded by a semaphore with {@value 
#MAX_CONCURRENT_FETCHES} permits so
+ * that at most that many GetRecords HTTP calls are in flight at any moment, 
preventing
+ * connection-pool exhaustion. Additionally, when the shared result queue 
exceeds
+ * {@value #MAX_QUEUED_RESULTS} entries the fetch loop sleeps until the 
processor drains results.
+ */
+final class PollingKinesisClient extends KinesisConsumerClient {
+
+    private static final long DEFAULT_EMPTY_SHARD_BACKOFF_NANOS = 
TimeUnit.MILLISECONDS.toNanos(500);
+    private static final long DEFAULT_ERROR_BACKOFF_NANOS = 
TimeUnit.SECONDS.toNanos(2);
+    static final int MAX_QUEUED_RESULTS = 500;
+    static final int MAX_CONCURRENT_FETCHES = 50;
+
+    private final ExecutorService fetchExecutor = 
Executors.newVirtualThreadPerTaskExecutor();
+    private final Map<String, PollingShardState> pollingShardStates = new 
ConcurrentHashMap<>();
+    private final Semaphore fetchPermits = new 
Semaphore(MAX_CONCURRENT_FETCHES, true);
+    private final long emptyShardBackoffNanos;
+    private final long errorBackoffNanos;
+    private volatile Instant timestampForInitialPosition;
+
+    PollingKinesisClient(final KinesisClient kinesisClient, final ComponentLog 
logger) {
+        this(kinesisClient, logger, DEFAULT_EMPTY_SHARD_BACKOFF_NANOS, 
DEFAULT_ERROR_BACKOFF_NANOS);
+    }
+
+    PollingKinesisClient(final KinesisClient kinesisClient, final ComponentLog 
logger,
+            final long emptyShardBackoffNanos, final long errorBackoffNanos) {
+        super(kinesisClient, logger);
+        this.emptyShardBackoffNanos = emptyShardBackoffNanos;
+        this.errorBackoffNanos = errorBackoffNanos;
+    }
+
+    void setTimestampForInitialPosition(final Instant timestamp) {
+        this.timestampForInitialPosition = timestamp;
+    }
+
+    @Override
+    void startFetches(final List<Shard> shards, final String streamName, final 
int batchSize,
+            final String initialStreamPosition, final KinesisShardManager 
shardManager) {
+        if (fetchExecutor.isShutdown()) {
+            return;
+        }
+
+        for (final Shard shard : shards) {
+            final String shardId = shard.shardId();
+            final PollingShardState existing = pollingShardStates.get(shardId);
+            if (existing == null) {
+                final PollingShardState state = new PollingShardState();
+                if (pollingShardStates.putIfAbsent(shardId, state) == null && 
state.tryStartLoop()) {
+                    launchFetchLoop(state, shardId, streamName, batchSize, 
initialStreamPosition, shardManager);
+                }
+            } else if (!existing.isExhausted() && !existing.isStopped() && 
!existing.isLoopRunning()
+                    && existing.tryStartLoop()) {
+                logger.warn("Restarting dead fetch loop for shard {}", 
shardId);
+                launchFetchLoop(existing, shardId, streamName, batchSize, 
initialStreamPosition, shardManager);
+            }
+        }
+    }
+
+    @Override
+    boolean hasPendingFetches() {
+        if (hasQueuedResults()) {
+            return true;
+        }
+        for (final PollingShardState state : pollingShardStates.values()) {
+            if (!state.isExhausted() && !state.isStopped()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    void acknowledgeResults(final List<ShardFetchResult> results) {
+    }
+
+    @Override
+    void rollbackResults(final List<ShardFetchResult> results) {
+        for (final ShardFetchResult result : results) {
+            final PollingShardState state = 
pollingShardStates.get(result.shardId());
+            if (state != null) {
+                state.requestReset();

Review Comment:
   We should drain the queue while still holding the shard lease. I.e. here, in 
the `rollbackResults`. 
   Since reset doesn't happen immediately, so there is a window when the lease 
can be acquired, subsequent records polled from the queue, before the reset 
happens. 
[Repro](https://github.com/awelless/nifi/blob/NIFI-15669_Drain_race_condition_repro/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java#L438).
   
   But we need to make sure that while draining the queue we don't fetch a new 
batch of records. Otherwise we'd have to drain it as well. 



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -272,47 +238,49 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
 
     static final PropertyDescriptor STREAM_POSITION_TIMESTAMP = new 
PropertyDescriptor.Builder()
             .name("Stream Position Timestamp")
-            .description("Timestamp position in stream from which to start 
reading Kinesis Records. The timestamp must be in ISO 8601 format.")
+            .description("Timestamp position in stream from which to start 
reading Kinesis Records. Must be in ISO 8601 format.")
             .required(true)
             .addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR)
             .dependsOn(INITIAL_STREAM_POSITION, InitialPosition.AT_TIMESTAMP)
             .build();
 
-    static final PropertyDescriptor MAX_BYTES_TO_BUFFER = new 
PropertyDescriptor.Builder()
-            .name("Max Bytes to Buffer")
-            .description("""
-                    The maximum size of Kinesis Records that can be buffered 
in memory before being processed by NiFi.
-                    If the buffer size exceeds the limit, the processor will 
stop consuming new records until free space is available.
-
-                    Using a larger value may increase the throughput, but will 
do so at the expense of using more memory.
-                    """)
+    static final PropertyDescriptor MAX_RECORDS_PER_REQUEST = new 
PropertyDescriptor.Builder()
+            .name("Max Records Per Request")
+            .description("The maximum number of records to retrieve per 
GetRecords call. Maximum is 10,000.")
             .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("100 MB")
+            .defaultValue("1000")
+            .addValidator(StandardValidators.createLongValidator(1, 10000, 
true))
             .build();
 
-    static final PropertyDescriptor CHECKPOINT_INTERVAL = new 
PropertyDescriptor.Builder()
-            .name("Checkpoint Interval")
+    static final PropertyDescriptor MAX_BATCH_DURATION = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Duration")
             .description("""
-                    Interval between checkpointing consumed Kinesis records. 
To checkpoint records each time the Processor is run, set this value to 0 
seconds.
-
-                    More frequent checkpoint may reduce performance and 
increase DynamoDB costs,
-                    but less frequent checkpointing may result in duplicates 
when a Shard lease is lost or NiFi is restarted.
-                    """)
+                    The maximum amount of time to spend consuming records in a 
single invocation before \
+                    committing the session and checkpointing.""")
             .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .defaultValue("5 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor METRICS_PUBLISHING = new 
PropertyDescriptor.Builder()
-            .name("Metrics Publishing")
-            .description("Specifies where Kinesis usage metrics are published 
to.")
+    static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description("""
+                    The maximum amount of data to consume in a single 
invocation before committing the \
+                    session and checkpointing.""")
             .required(true)
-            .allowableValues(MetricsPublishing.class)
-            .defaultValue(MetricsPublishing.DISABLED)
+            .defaultValue("10 MB")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP, 
ProxySpec.HTTP_AUTH);
+    static final PropertyDescriptor ENDPOINT_OVERRIDE = new 
PropertyDescriptor.Builder()

Review Comment:
   I see. Then should we have separate endpoints for Kinesis and DynamoDB?
   In Localstack that's the same endpoint for each service, but this might not 
be the case for production scenarios.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +335,1198 @@ Specifies the string (interpreted as UTF-8) to use for 
demarcating multiple Kine
     private static final Set<Relationship> RAW_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS);
     private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
 
-    private volatile DynamoDbAsyncClient dynamoDbClient;
-    private volatile CloudWatchAsyncClient cloudWatchClient;
-    private volatile KinesisAsyncClient kinesisClient;
-    private volatile Scheduler kinesisScheduler;
-
+    private volatile SdkHttpClient kinesisHttpClient;
+    private volatile SdkHttpClient dynamoHttpClient;
+    private volatile KinesisClient kinesisClient;
+    private volatile DynamoDbClient dynamoDbClient;
+    private volatile SdkAsyncHttpClient asyncHttpClient;
+    private volatile KinesisShardManager shardManager;
+    private volatile KinesisConsumerClient consumerClient;
     private volatile String streamName;
-    private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
-    private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
-    private volatile @Nullable byte[] demarcatorValue;
+    private volatile int maxRecordsPerRequest;
+    private volatile String initialStreamPosition;
+    private volatile long maxBatchNanos;
+    private volatile long maxBatchBytes;
 
-    private volatile Future<InitializationResult> initializationResultFuture;
-    private final AtomicBoolean initialized = new AtomicBoolean();
-
-    // An instance filed, so that it can be read in getRelationships.
-    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
-            PROCESSING_STRATEGY.getDefaultValue());
+    private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
+    private volatile String efoConsumerArn;
+    private final AtomicLong shardRoundRobinCounter = new AtomicLong();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTY_DESCRIPTORS;
     }
 
-    @Override
-    public void migrateProperties(final PropertyConfiguration config) {
-        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
-    }
-
     @Override
     public Set<Relationship> getRelationships() {
         return switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+            case FLOW_FILE, LINE_DELIMITED, DEMARCATOR -> 
RAW_FILE_RELATIONSHIPS;
             case RECORD -> RECORD_FILE_RELATIONSHIPS;
         };
     }
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (descriptor.equals(PROCESSING_STRATEGY)) {
-            processingStrategy = ProcessingStrategy.from(newValue);
+            processingStrategy = ProcessingStrategy.valueOf(newValue);
         }
     }
 
-    @OnScheduled
-    public void setup(final ProcessContext context) {
-        readerRecordProcessor = switch (processingStrategy) {
-            case FLOW_FILE, DEMARCATOR -> null;
-            case RECORD -> createReaderRecordProcessor(context);
-        };
-        demarcatorValue = switch (processingStrategy) {
-            case FLOW_FILE, RECORD -> null;
-            case DEMARCATOR -> {
-                final String demarcatorValue = 
context.getProperty(MESSAGE_DEMARCATOR).getValue();
-                yield demarcatorValue != null ? 
demarcatorValue.getBytes(UTF_8) : new byte[0];
-            }
-        };
+    @Override
+    public void migrateProperties(final PropertyConfiguration config) {
+        ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+        config.renameProperty("Max Bytes to Buffer", "Max Batch Size");

Review Comment:
   Should we really move `Max Bytes to Buffer` to `Max Batch Size`? These are 
different properties. 
   The default value of 100 MB for buffer size might be too much for the batch 
size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to