awelless commented on code in PR #10526:
URL: https://github.com/apache/nifi/pull/10526#discussion_r2541784484


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,32 @@ public void setup(final ProcessContext context) {
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
 
-        getLogger().info("Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
-                streamName, applicationName, workerId);
+        final InitializationResult result;
+        try {
+            result = 
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
 SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            if (e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            cleanUpState();
+            throw new ProcessException(e);
+        }
+
+        switch (result) {
+            case InitializationResult.Success ignored ->
+                    getLogger().info(
+                            "Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
+                            streamName, applicationName, workerId);
+            case InitializationResult.Failure failure -> {
+                cleanUpState();
+
+                final ProcessException ex = failure.error()
+                        .map(err -> new ProcessException("Failed to initialize 
the processor.", err))
+                        .orElseGet(() -> new ProcessException("Failed to 
initialize the processor due to an unknown failure. Check application logs for 
more details."));

Review Comment:
   The reason behind "check the application logs for more details" is that the 
error text is visible in a bulletin. 
   When an initialization error is present, it's logged and, therefore 
available in the bulletin too.
   
   When error is not available for some reason, the bulletin will just mention 
that initialization failed. However, additional details may be present in 
application logs, which aren't available in NiFi canvas.
   
   Is it expected for the NiFi users to inspect application logs as well? If 
so, then the "check the application logs for more details" part is not needed.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -684,6 +728,49 @@ public void shutdownRequested(final ShutdownRequestedInput 
shutdownRequestedInpu
         }
     }
 
+    private static final class InitializationStateChangeListener implements 
WorkerStateChangeListener {
+
+        private final ComponentLog logger;
+
+        private final CompletableFuture<InitializationResult> resultFuture = 
new CompletableFuture<>();
+
+        private volatile @Nullable Throwable initializationFailure;
+
+        InitializationStateChangeListener(final ComponentLog logger) {
+            this.logger = logger;
+        }
+
+        @Override
+        public void onWorkerStateChange(final WorkerState newState) {
+            logger.info("Processor state changed to: {}", newState);
+
+            if (newState == WorkerState.STARTED) {
+                resultFuture.complete(new InitializationResult.Success());
+            } else if (newState == WorkerState.SHUT_DOWN) {
+                resultFuture.complete(new 
InitializationResult.Failure(Optional.ofNullable(initializationFailure)));
+            }
+        }
+
+        @Override
+        public void onAllInitializationAttemptsFailed(final Throwable e) {
+            // This method is called before the SHUT_DOWN_STARTED phase.
+            // Memorizing the error until the Scheduler is SHUT_DOWN.
+            initializationFailure = e;

Review Comment:
   This error is passed to the `ProcessException` in the `OnScheduled` method. 
[Line 
443](https://github.com/apache/nifi/pull/10526/files#diff-affbd29ecc52fdbcb9b91e24893b0c94cb7e07f53cf7a7afd9ba9c065283474eR443).



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java:
##########
@@ -57,4 +63,42 @@ void getRelationshipsForRecordProcessingStrategy() {
 
         assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
     }
+
+    @Test
+    void failInitializationWithInvalidValues() {
+        // With dummy values KCL Scheduler initialization will fail.
+        setDummyValues(testRunner);
+
+        // Using the processor object to avoid error wrapping by testRunner.
+        final ConsumeKinesis consumeKinesis = (ConsumeKinesis) 
testRunner.getProcessor();
+        final ProcessException ex = assertThrows(
+                ProcessException.class,
+                () -> consumeKinesis.setup(testRunner.getProcessContext()));
+
+        assertEquals("Failed to initialize the processor.", ex.getMessage());
+        assertNotNull(ex.getCause());
+    }
+
+    private static void setDummyValues(final TestRunner runner) {

Review Comment:
   I changed this method to `createTestRunner` instead



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -684,6 +728,49 @@ public void shutdownRequested(final ShutdownRequestedInput 
shutdownRequestedInpu
         }
     }
 
+    private static final class InitializationStateChangeListener implements 
WorkerStateChangeListener {
+
+        private final ComponentLog logger;
+
+        private final CompletableFuture<InitializationResult> resultFuture = 
new CompletableFuture<>();
+
+        private volatile @Nullable Throwable initializationFailure;
+
+        InitializationStateChangeListener(final ComponentLog logger) {
+            this.logger = logger;
+        }
+
+        @Override
+        public void onWorkerStateChange(final WorkerState newState) {
+            logger.info("Processor state changed to: {}", newState);

Review Comment:
   > status is also logged above
   
   Do you mean the logs in the `OnScheduled` method? 
   Currently there are 5 worker states: `CREATED, INITIALIZING, STARTED, 
SHUT_DOWN_STARTED, SHUT_DOWN`. The logs in `OnScheduled` only appear for either 
`STARTED` or `SHUT_DOWN` states. This log more low level and is emitted for 
each state. It can help to track a low-level status changes of a KCL worker.
   
   Perhaps, debug level would be a more appropriate here?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to