This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 222550e6ab NIFI-15183 Generate unique Worker ID in ConsumeKinesis
(#10499)
222550e6ab is described below
commit 222550e6abc0f8f7817b6f242cba79245bd7e511
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Wed Nov 5 18:17:41 2025 +0100
NIFI-15183 Generate unique Worker ID in ConsumeKinesis (#10499)
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../processors/aws/kinesis/ConsumeKinesis.java | 24 +++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index afbc9d58b9..b6b3a138cf 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -87,6 +88,7 @@ import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -386,7 +388,7 @@ public class ConsumeKinesis extends AbstractProcessor {
final ShardRecordProcessorFactory recordProcessorFactory = () -> new
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
final String applicationName =
context.getProperty(APPLICATION_NAME).getValue();
- final String workerId = getIdentifier();
+ final String workerId = generateWorkerId();
final ConfigsBuilder configsBuilder = new
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient,
cloudWatchClient, workerId, recordProcessorFactory);
final MetricsFactory metricsFactory = configureMetricsFactory(context);
@@ -406,6 +408,9 @@ public class ConsumeKinesis extends AbstractProcessor {
schedulerThread.setDaemon(true);
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the
onStopped method.
+
+ getLogger().info("Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
+ streamName, applicationName, workerId);
}
/**
@@ -477,6 +482,23 @@ public class ConsumeKinesis extends AbstractProcessor {
};
}
+ private String generateWorkerId() {
+ final String processorId = getIdentifier();
+ final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+
+ final String workerId;
+
+ if (nodeTypeProvider.isClustered()) {
+ // If a node id is not available for some reason, generating a
random UUID helps to avoid collisions.
+ final String nodeId =
nodeTypeProvider.getCurrentNode().orElse(UUID.randomUUID().toString());
+ workerId = "%s@%s".formatted(processorId, nodeId);
+ } else {
+ workerId = processorId;
+ }
+
+ return workerId;
+ }
+
private static @Nullable MetricsFactory configureMetricsFactory(final
ProcessContext context) {
final MetricsPublishing metricsPublishing =
context.getProperty(METRICS_PUBLISHING).asAllowableValue(MetricsPublishing.class);
return switch (metricsPublishing) {