This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fd41c45291 NIFI-15307 Updated ConsumeKinesis to wait for long 
initialization in onTrigger (#10664)
fd41c45291 is described below

commit fd41c45291856d7b37cd424fc2037c2e1f86c664
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Jan 8 22:12:02 2026 +0100

    NIFI-15307 Updated ConsumeKinesis to wait for long initialization in 
onTrigger (#10664)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/aws/kinesis/ConsumeKinesis.java     | 77 +++++++++++++++-------
 .../processors/aws/kinesis/ConsumeKinesisTest.java | 16 -----
 2 files changed, 53 insertions(+), 40 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index 8e4342b541..751ac6feef 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -100,6 +100,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -169,10 +170,10 @@ public class ConsumeKinesis extends AbstractProcessor {
     private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD = 
Duration.ofMinutes(1);
 
     /**
-     * Using a large enough value to ensure we don't wait infinitely for the 
initialization.
-     * Actual initialization shouldn't take that long.
+     * How long to wait for a Scheduler initialization to complete in the 
OnScheduled method.
+     * If the initialization takes longer than this, the processor will 
continue initialization checks in the onTrigger method.
      */
-    private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT = 
Duration.ofMinutes(15);
+    private static final Duration 
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT = Duration.ofSeconds(30);
     private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT 
= Duration.ofMinutes(3);
 
     static final PropertyDescriptor STREAM_NAME = new 
PropertyDescriptor.Builder()
@@ -339,6 +340,9 @@ public class ConsumeKinesis extends AbstractProcessor {
 
     private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
 
+    private volatile Future<InitializationResult> initializationResultFuture;
+    private final AtomicBoolean initialized = new AtomicBoolean();
+
     // An instance filed, so that it can be read in getRelationships.
     private volatile ProcessingStrategy processingStrategy = 
ProcessingStrategy.from(
             PROCESSING_STRATEGY.getDefaultValue());
@@ -418,6 +422,8 @@ public class ConsumeKinesis extends AbstractProcessor {
         final RetrievalSpecificConfig retrievalSpecificConfig = 
configureRetrievalSpecificConfig(context, kinesisClient, streamName, 
applicationName);
 
         final InitializationStateChangeListener initializationListener = new 
InitializationStateChangeListener(getLogger());
+        initialized.set(false);
+        initializationResultFuture = initializationListener.result();
 
         kinesisScheduler = new Scheduler(
                 configsBuilder.checkpointConfig(),
@@ -435,34 +441,20 @@ public class ConsumeKinesis extends AbstractProcessor {
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
 
-        final InitializationResult result;
         try {
-            result = 
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
 SECONDS);
-        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            final InitializationResult result = initializationResultFuture.get(
+                    
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
+            checkInitializationResult(result);
+        } catch (final TimeoutException e) {
+            // During a first run the processor will take more time to 
initialize. We return from OnSchedule and continue waiting in the onTrigger 
method.
+            getLogger().warn("Kinesis Scheduler initialization may take up to 
10 minutes on a first run, which is caused by AWS resources initialization");
+        } catch (final InterruptedException | ExecutionException e) {
             if (e instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
             }
             cleanUpState();
             throw new ProcessException("Initialization failed for stream 
[%s]".formatted(streamName), e);
         }
-
-        switch (result) {
-            case InitializationResult.Success ignored ->
-                getLogger().info(
-                        "Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
-                        streamName, applicationName, workerId);
-            case InitializationResult.Failure failure -> {
-                cleanUpState();
-
-                final ProcessException ex = failure.error()
-                        .map(err -> new ProcessException("Initialization 
failed for stream [%s]".formatted(streamName), err))
-                        // This branch is active only when a scheduler was 
shutdown, but no initialization error was provided.
-                        // This behavior isn't typical and wasn't observed.
-                        .orElseGet(() -> new ProcessException(("Initialization 
failed for stream [%s]").formatted(streamName)));
-
-                throw ex;
-            }
-        }
     }
 
     /**
@@ -575,6 +567,9 @@ public class ConsumeKinesis extends AbstractProcessor {
     @OnStopped
     public void onStopped() {
         cleanUpState();
+
+        initialized.set(false);
+        initializationResultFuture = null;
     }
 
     private void cleanUpState() {
@@ -633,6 +628,16 @@ public class ConsumeKinesis extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        if (!initialized.get()) {
+            if (!initializationResultFuture.isDone()) {
+                getLogger().debug("Waiting for Kinesis Scheduler to finish 
initialization");
+                context.yield();
+                return;
+            }
+
+            checkInitializationResult(initializationResultFuture.resultNow());
+        }
+
         final Optional<Lease> leaseAcquired = 
recordBuffer.acquireBufferLease();
 
         leaseAcquired.ifPresentOrElse(
@@ -641,6 +646,30 @@ public class ConsumeKinesis extends AbstractProcessor {
         );
     }
 
+    private void checkInitializationResult(final InitializationResult 
initializationResult) {
+        switch (initializationResult) {
+            case InitializationResult.Success ignored -> {
+                final boolean wasInitialized = initialized.getAndSet(true);
+                if (!wasInitialized) {
+                    getLogger().info(
+                            "Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
+                            streamName, kinesisScheduler.applicationName(), 
kinesisScheduler.leaseManagementConfig().workerIdentifier());
+                }
+            }
+            case InitializationResult.Failure failure -> {
+                cleanUpState();
+
+                final ProcessException ex = failure.error()
+                        .map(err -> new ProcessException("Initialization 
failed for stream [%s]".formatted(streamName), err))
+                        // This branch is active only when a scheduler was 
shutdown, but no initialization error was provided.
+                        // This behavior isn't typical and wasn't observed.
+                        .orElseGet(() -> new ProcessException("Initialization 
failed for stream [%s]".formatted(streamName)));
+
+                throw ex;
+            }
+        }
+    }
+
     private void processRecordsFromBuffer(final ProcessSession session, final 
Lease lease) {
         try {
             final List<KinesisClientRecord> records = 
recordBuffer.consumeRecords(lease);
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
index 0b8323dd1c..3d105cb0bf 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.processors.aws.kinesis;
 
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
 import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 import org.apache.nifi.processors.aws.region.RegionUtil;
 import org.apache.nifi.reporting.InitializationException;
@@ -34,8 +33,6 @@ import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingSt
 import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
 import static 
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_SUCCESS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 class ConsumeKinesisTest {
 
@@ -64,19 +61,6 @@ class ConsumeKinesisTest {
         assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
     }
 
-    @Test
-    void failInitializationWithInvalidValues() {
-        // KCL Scheduler initialization will fail, as the runner is configured 
with placeholder credentials.
-
-        // Using the processor object to avoid error wrapping by testRunner.
-        final ConsumeKinesis consumeKinesis = (ConsumeKinesis) 
testRunner.getProcessor();
-        final ProcessException ex = assertThrows(
-                ProcessException.class,
-                () -> consumeKinesis.setup(testRunner.getProcessContext()));
-
-        assertNotNull(ex.getCause(), "The initialization exception is expected 
to have a cause");
-    }
-
     private static TestRunner createTestRunner() {
         final TestRunner runner = 
TestRunners.newTestRunner(ConsumeKinesis.class);
 

Reply via email to