awelless commented on code in PR #10526:
URL: https://github.com/apache/nifi/pull/10526#discussion_r2527958840


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,32 @@ public void setup(final ProcessContext context) {
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
 
-        getLogger().info("Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
-                streamName, applicationName, workerId);
+        final InitializationResult result;
+        try {
+            result = 
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
 SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            if (e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            cleanUpState();
+            throw new ProcessException(e);
+        }
+
+        switch (result) {
+            case InitializationResult.Success ignored ->
+                    getLogger().info(
+                            "Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
+                            streamName, applicationName, workerId);
+            case InitializationResult.Failure failure -> {
+                cleanUpState();
+
+                final ProcessException ex = failure.error()
+                        .map(err -> new ProcessException("Failed to initialize 
the processor.", err))
+                        .orElseGet(() -> new ProcessException("Failed to 
initialize the processor due to an unknown failure. Check application logs for 
more details."));

Review Comment:
   This branch is active only when a scheduler was shutdown, but no 
initialization error was provided. However, I didn't observe this behavior 
while testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to