awelless commented on code in PR #10526:
URL: https://github.com/apache/nifi/pull/10526#discussion_r2527958840
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,32 @@ public void setup(final ProcessContext context) {
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the
onStopped method.
- getLogger().info("Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
- streamName, applicationName, workerId);
+ final InitializationResult result;
+ try {
+ result =
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
SECONDS);
+ } catch (final InterruptedException | ExecutionException |
TimeoutException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ cleanUpState();
+ throw new ProcessException(e);
+ }
+
+ switch (result) {
+ case InitializationResult.Success ignored ->
+ getLogger().info(
+ "Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
+ streamName, applicationName, workerId);
+ case InitializationResult.Failure failure -> {
+ cleanUpState();
+
+ final ProcessException ex = failure.error()
+ .map(err -> new ProcessException("Failed to initialize
the processor.", err))
+ .orElseGet(() -> new ProcessException("Failed to
initialize the processor due to an unknown failure. Check application logs for
more details."));
Review Comment:
This branch is active only when a scheduler was shutdown, but no
initialization error was provided. However, I didn't observe this behavior
while testing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]