This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 222550e6ab NIFI-15183 Generate unique Worker ID in ConsumeKinesis 
(#10499)
222550e6ab is described below

commit 222550e6abc0f8f7817b6f242cba79245bd7e511
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Wed Nov 5 18:17:41 2025 +0100

    NIFI-15183 Generate unique Worker ID in ConsumeKinesis (#10499)
    
    Co-authored-by: David Handermann <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/aws/kinesis/ConsumeKinesis.java     | 24 +++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index afbc9d58b9..b6b3a138cf 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
@@ -87,6 +88,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
@@ -386,7 +388,7 @@ public class ConsumeKinesis extends AbstractProcessor {
         final ShardRecordProcessorFactory recordProcessorFactory = () -> new 
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
 
         final String applicationName = 
context.getProperty(APPLICATION_NAME).getValue();
-        final String workerId = getIdentifier();
+        final String workerId = generateWorkerId();
         final ConfigsBuilder configsBuilder = new 
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient, 
cloudWatchClient, workerId, recordProcessorFactory);
 
         final MetricsFactory metricsFactory = configureMetricsFactory(context);
@@ -406,6 +408,9 @@ public class ConsumeKinesis extends AbstractProcessor {
         schedulerThread.setDaemon(true);
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
+
+        getLogger().info("Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
+                streamName, applicationName, workerId);
     }
 
     /**
@@ -477,6 +482,23 @@ public class ConsumeKinesis extends AbstractProcessor {
         };
     }
 
+    private String generateWorkerId() {
+        final String processorId = getIdentifier();
+        final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+
+        final String workerId;
+
+        if (nodeTypeProvider.isClustered()) {
+            // If a node id is not available for some reason, generating a 
random UUID helps to avoid collisions.
+            final String nodeId = 
nodeTypeProvider.getCurrentNode().orElse(UUID.randomUUID().toString());
+            workerId = "%s@%s".formatted(processorId, nodeId);
+        } else {
+            workerId = processorId;
+        }
+
+        return workerId;
+    }
+
     private static @Nullable MetricsFactory configureMetricsFactory(final 
ProcessContext context) {
         final MetricsPublishing metricsPublishing = 
context.getProperty(METRICS_PUBLISHING).asAllowableValue(MetricsPublishing.class);
         return switch (metricsPublishing) {

Reply via email to