This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 14412f0ca1 NIFI-15269 Removed concurrency limit in ConsumeKinesis HTTP
client (#10575)
14412f0ca1 is described below
commit 14412f0ca14e77a33a09659a32fcc8e3f4e07e60
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Mon Dec 1 14:41:55 2025 +0100
NIFI-15269 Removed concurrency limit in ConsumeKinesis HTTP client (#10575)
Signed-off-by: David Handermann <[email protected]>
---
.../org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index 3ac4443eb2..b173cfab46 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -160,10 +160,6 @@ public class ConsumeKinesis extends AbstractProcessor {
private static final Duration HTTP_CLIENTS_CONNECTION_TIMEOUT =
Duration.ofSeconds(30);
private static final Duration HTTP_CLIENTS_READ_TIMEOUT =
Duration.ofMinutes(3);
- /**
- * Best balance between throughput and CPU usage by KCL.
- */
- private static final int KINESIS_HTTP_CLIENT_CONCURRENCY_PER_TASK = 16;
private static final int KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES = 512 *
1024; // 512 KiB
private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD =
Duration.ofMinutes(1);
@@ -455,11 +451,11 @@ public class ConsumeKinesis extends AbstractProcessor {
* {@link
software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
*/
private static SdkAsyncHttpClient createKinesisHttpClient(final
ProcessContext context) {
- final int maxConcurrency = KINESIS_HTTP_CLIENT_CONCURRENCY_PER_TASK *
context.getMaxConcurrentTasks();
-
return createHttpClientBuilder(context)
.protocol(Protocol.HTTP2)
- .maxConcurrency(maxConcurrency)
+ // Since we're using HTTP/2, multiple concurrent requests will
reuse the same HTTP connection.
+ // Therefore, the number of real connections is going to be
relatively small.
+ .maxConcurrency(Integer.MAX_VALUE)
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES)
.healthCheckPingPeriod(KINESIS_HTTP_HEALTH_CHECK_PERIOD)