exceptionfactory commented on code in PR #10526:
URL: https://github.com/apache/nifi/pull/10526#discussion_r2541212178


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -684,6 +728,49 @@ public void shutdownRequested(final ShutdownRequestedInput 
shutdownRequestedInpu
         }
     }
 
+    private static final class InitializationStateChangeListener implements 
WorkerStateChangeListener {
+
+        private final ComponentLog logger;
+
+        private final CompletableFuture<InitializationResult> resultFuture = 
new CompletableFuture<>();
+
+        private volatile @Nullable Throwable initializationFailure;
+
+        InitializationStateChangeListener(final ComponentLog logger) {
+            this.logger = logger;
+        }
+
+        @Override
+        public void onWorkerStateChange(final WorkerState newState) {
+            logger.info("Processor state changed to: {}", newState);

Review Comment:
   Is this log necessary if the status is also logged above?
   ```suggestion
               logger.info("Worker state changed to [{}]", newState);
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -684,6 +728,49 @@ public void shutdownRequested(final ShutdownRequestedInput 
shutdownRequestedInpu
         }
     }
 
+    private static final class InitializationStateChangeListener implements 
WorkerStateChangeListener {
+
+        private final ComponentLog logger;
+
+        private final CompletableFuture<InitializationResult> resultFuture = 
new CompletableFuture<>();
+
+        private volatile @Nullable Throwable initializationFailure;
+
+        InitializationStateChangeListener(final ComponentLog logger) {
+            this.logger = logger;
+        }
+
+        @Override
+        public void onWorkerStateChange(final WorkerState newState) {
+            logger.info("Processor state changed to: {}", newState);
+
+            if (newState == WorkerState.STARTED) {
+                resultFuture.complete(new InitializationResult.Success());
+            } else if (newState == WorkerState.SHUT_DOWN) {
+                resultFuture.complete(new 
InitializationResult.Failure(Optional.ofNullable(initializationFailure)));
+            }
+        }
+
+        @Override
+        public void onAllInitializationAttemptsFailed(final Throwable e) {
+            // This method is called before the SHUT_DOWN_STARTED phase.
+            // Memorizing the error until the Scheduler is SHUT_DOWN.
+            initializationFailure = e;

Review Comment:
   Is this logged in the failure handler?



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java:
##########
@@ -57,4 +63,42 @@ void getRelationshipsForRecordProcessingStrategy() {
 
         assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
     }
+
+    @Test
+    void failInitializationWithInvalidValues() {
+        // With dummy values KCL Scheduler initialization will fail.
+        setDummyValues(testRunner);
+
+        // Using the processor object to avoid error wrapping by testRunner.
+        final ConsumeKinesis consumeKinesis = (ConsumeKinesis) 
testRunner.getProcessor();
+        final ProcessException ex = assertThrows(
+                ProcessException.class,
+                () -> consumeKinesis.setup(testRunner.getProcessContext()));
+
+        assertEquals("Failed to initialize the processor.", ex.getMessage());
+        assertNotNull(ex.getCause());
+    }
+
+    private static void setDummyValues(final TestRunner runner) {

Review Comment:
   ```suggestion
       private static void setExpectedValues(final TestRunner runner) {
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,32 @@ public void setup(final ProcessContext context) {
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
 
-        getLogger().info("Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
-                streamName, applicationName, workerId);
+        final InitializationResult result;
+        try {
+            result = 
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
 SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            if (e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            cleanUpState();
+            throw new ProcessException(e);
+        }
+
+        switch (result) {
+            case InitializationResult.Success ignored ->
+                    getLogger().info(
+                            "Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
+                            streamName, applicationName, workerId);
+            case InitializationResult.Failure failure -> {
+                cleanUpState();
+
+                final ProcessException ex = failure.error()
+                        .map(err -> new ProcessException("Failed to initialize 
the processor.", err))
+                        .orElseGet(() -> new ProcessException("Failed to 
initialize the processor due to an unknown failure. Check application logs for 
more details."));

Review Comment:
   If that is the case, it seems worth mentioning. Mentioning "check the 
application logs for more details" is not helpful because it already appears in 
the logs, so the message should be shortened.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,32 @@ public void setup(final ProcessContext context) {
         schedulerThread.start();
         // The thread is stopped when kinesisScheduler is shutdown in the 
onStopped method.
 
-        getLogger().info("Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
-                streamName, applicationName, workerId);
+        final InitializationResult result;
+        try {
+            result = 
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
 SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            if (e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            cleanUpState();
+            throw new ProcessException(e);
+        }
+
+        switch (result) {
+            case InitializationResult.Success ignored ->
+                    getLogger().info(
+                            "Started Kinesis Scheduler for stream [{}] with 
application name [{}] and workerId [{}]",
+                            streamName, applicationName, workerId);
+            case InitializationResult.Failure failure -> {
+                cleanUpState();
+
+                final ProcessException ex = failure.error()
+                        .map(err -> new ProcessException("Failed to initialize 
the processor.", err))

Review Comment:
   Recommend removing the trailing `.` and adding the stream name to the 
message.
   ```suggestion
                           .map(err -> new ProcessException("Initialization 
failed for stream [%s]".formatted(streamName), err))
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java:
##########
@@ -57,4 +63,42 @@ void getRelationshipsForRecordProcessingStrategy() {
 
         assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
     }
+
+    @Test
+    void failInitializationWithInvalidValues() {
+        // With dummy values KCL Scheduler initialization will fail.
+        setDummyValues(testRunner);
+
+        // Using the processor object to avoid error wrapping by testRunner.
+        final ConsumeKinesis consumeKinesis = (ConsumeKinesis) 
testRunner.getProcessor();
+        final ProcessException ex = assertThrows(
+                ProcessException.class,
+                () -> consumeKinesis.setup(testRunner.getProcessContext()));
+
+        assertEquals("Failed to initialize the processor.", ex.getMessage());

Review Comment:
   Asserting a particular message is generally poor practice because it is tied 
to the implementation details. I recommend removing this assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to