This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fd41c45291 NIFI-15307 Updated ConsumeKinesis to wait for long
initialization in onTrigger (#10664)
fd41c45291 is described below
commit fd41c45291856d7b37cd424fc2037c2e1f86c664
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Jan 8 22:12:02 2026 +0100
NIFI-15307 Updated ConsumeKinesis to wait for long initialization in
onTrigger (#10664)
Signed-off-by: David Handermann <[email protected]>
---
.../processors/aws/kinesis/ConsumeKinesis.java | 77 +++++++++++++++-------
.../processors/aws/kinesis/ConsumeKinesisTest.java | 16 -----
2 files changed, 53 insertions(+), 40 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
index 8e4342b541..751ac6feef 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java
@@ -100,6 +100,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -169,10 +170,10 @@ public class ConsumeKinesis extends AbstractProcessor {
private static final Duration KINESIS_HTTP_HEALTH_CHECK_PERIOD =
Duration.ofMinutes(1);
/**
- * Using a large enough value to ensure we don't wait infinitely for the
initialization.
- * Actual initialization shouldn't take that long.
+ * How long to wait for a Scheduler initialization to complete in the
OnScheduled method.
+ * If the initialization takes longer than this, the processor will
continue initialization checks in the onTrigger method.
*/
- private static final Duration KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT =
Duration.ofMinutes(15);
+ private static final Duration
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT
= Duration.ofMinutes(3);
static final PropertyDescriptor STREAM_NAME = new
PropertyDescriptor.Builder()
@@ -339,6 +340,9 @@ public class ConsumeKinesis extends AbstractProcessor {
private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
+ private volatile Future<InitializationResult> initializationResultFuture;
+ private final AtomicBoolean initialized = new AtomicBoolean();
+
// An instance filed, so that it can be read in getRelationships.
private volatile ProcessingStrategy processingStrategy =
ProcessingStrategy.from(
PROCESSING_STRATEGY.getDefaultValue());
@@ -418,6 +422,8 @@ public class ConsumeKinesis extends AbstractProcessor {
final RetrievalSpecificConfig retrievalSpecificConfig =
configureRetrievalSpecificConfig(context, kinesisClient, streamName,
applicationName);
final InitializationStateChangeListener initializationListener = new
InitializationStateChangeListener(getLogger());
+ initialized.set(false);
+ initializationResultFuture = initializationListener.result();
kinesisScheduler = new Scheduler(
configsBuilder.checkpointConfig(),
@@ -435,34 +441,20 @@ public class ConsumeKinesis extends AbstractProcessor {
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the
onStopped method.
- final InitializationResult result;
try {
- result =
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
SECONDS);
- } catch (final InterruptedException | ExecutionException |
TimeoutException e) {
+ final InitializationResult result = initializationResultFuture.get(
+
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
+ checkInitializationResult(result);
+ } catch (final TimeoutException e) {
+ // During a first run the processor will take more time to
initialize. We return from OnSchedule and continue waiting in the onTrigger
method.
+ getLogger().warn("Kinesis Scheduler initialization may take up to
10 minutes on a first run, which is caused by AWS resources initialization");
+ } catch (final InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cleanUpState();
throw new ProcessException("Initialization failed for stream
[%s]".formatted(streamName), e);
}
-
- switch (result) {
- case InitializationResult.Success ignored ->
- getLogger().info(
- "Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
- streamName, applicationName, workerId);
- case InitializationResult.Failure failure -> {
- cleanUpState();
-
- final ProcessException ex = failure.error()
- .map(err -> new ProcessException("Initialization
failed for stream [%s]".formatted(streamName), err))
- // This branch is active only when a scheduler was
shutdown, but no initialization error was provided.
- // This behavior isn't typical and wasn't observed.
- .orElseGet(() -> new ProcessException(("Initialization
failed for stream [%s]").formatted(streamName)));
-
- throw ex;
- }
- }
}
/**
@@ -575,6 +567,9 @@ public class ConsumeKinesis extends AbstractProcessor {
@OnStopped
public void onStopped() {
cleanUpState();
+
+ initialized.set(false);
+ initializationResultFuture = null;
}
private void cleanUpState() {
@@ -633,6 +628,16 @@ public class ConsumeKinesis extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ if (!initialized.get()) {
+ if (!initializationResultFuture.isDone()) {
+ getLogger().debug("Waiting for Kinesis Scheduler to finish
initialization");
+ context.yield();
+ return;
+ }
+
+ checkInitializationResult(initializationResultFuture.resultNow());
+ }
+
final Optional<Lease> leaseAcquired =
recordBuffer.acquireBufferLease();
leaseAcquired.ifPresentOrElse(
@@ -641,6 +646,30 @@ public class ConsumeKinesis extends AbstractProcessor {
);
}
+ private void checkInitializationResult(final InitializationResult
initializationResult) {
+ switch (initializationResult) {
+ case InitializationResult.Success ignored -> {
+ final boolean wasInitialized = initialized.getAndSet(true);
+ if (!wasInitialized) {
+ getLogger().info(
+ "Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
+ streamName, kinesisScheduler.applicationName(),
kinesisScheduler.leaseManagementConfig().workerIdentifier());
+ }
+ }
+ case InitializationResult.Failure failure -> {
+ cleanUpState();
+
+ final ProcessException ex = failure.error()
+ .map(err -> new ProcessException("Initialization
failed for stream [%s]".formatted(streamName), err))
+ // This branch is active only when a scheduler was
shutdown, but no initialization error was provided.
+ // This behavior isn't typical and wasn't observed.
+ .orElseGet(() -> new ProcessException("Initialization
failed for stream [%s]".formatted(streamName)));
+
+ throw ex;
+ }
+ }
+ }
+
private void processRecordsFromBuffer(final ProcessSession session, final
Lease lease) {
try {
final List<KinesisClientRecord> records =
recordBuffer.consumeRecords(lease);
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
index 0b8323dd1c..3d105cb0bf 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.aws.kinesis;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.region.RegionUtil;
import org.apache.nifi.reporting.InitializationException;
@@ -34,8 +33,6 @@ import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingSt
import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
import static
org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_SUCCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
class ConsumeKinesisTest {
@@ -64,19 +61,6 @@ class ConsumeKinesisTest {
assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
}
- @Test
- void failInitializationWithInvalidValues() {
- // KCL Scheduler initialization will fail, as the runner is configured
with placeholder credentials.
-
- // Using the processor object to avoid error wrapping by testRunner.
- final ConsumeKinesis consumeKinesis = (ConsumeKinesis)
testRunner.getProcessor();
- final ProcessException ex = assertThrows(
- ProcessException.class,
- () -> consumeKinesis.setup(testRunner.getProcessContext()));
-
- assertNotNull(ex.getCause(), "The initialization exception is expected
to have a cause");
- }
-
private static TestRunner createTestRunner() {
final TestRunner runner =
TestRunners.newTestRunner(ConsumeKinesis.class);