exceptionfactory commented on code in PR #10526:
URL: https://github.com/apache/nifi/pull/10526#discussion_r2544618748


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,36 @@ public void setup(final ProcessContext context) {
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
 
-        getLogger().info("Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
-                streamName, applicationName, workerId);
+        final InitializationResult result;
+        try {
+            result = 
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
 SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            if (e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            cleanUpState();
+            throw new ProcessException(e);
+        }
+
+        switch (result) {
+            case InitializationResult.Success ignored ->
+                    getLogger().info(
+                            "Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
+                            streamName, applicationName, workerId);
+            case InitializationResult.Failure failure -> {
+                cleanUpState();
+
+                final ProcessException ex = failure.error()
+                        .map(err -> new ProcessException("Initialization 
failed for stream [%s]".formatted(streamName), err))
+                        // This branch is active only when a scheduler was 
shutdown, but no initialization error was provided.
+                        // This behavior isn't typical and wasn't observed.
+                        .orElseGet(() -> new ProcessException((
+                                "Initialization failed for stream [%s] due to 
an unknown failure." +
+                                " Check application logs for more 
details").formatted(streamName)));

Review Comment:
   Recommend streamlining this message
   ```suggestion
                                   "Initialization failed for stream 
[%s]").formatted(streamName)));
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,36 @@ public void setup(final ProcessContext context) {
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
 
-        getLogger().info("Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
-                streamName, applicationName, workerId);
+        final InitializationResult result;
+        try {
+            result = 
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
 SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            if (e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            cleanUpState();
+            throw new ProcessException(e);

Review Comment:
   A message should be added:
   
   ```suggestion
               throw new ProcessException("Kinesis Scheduler initialization 
failed", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to