exceptionfactory commented on code in PR #10526:
URL: https://github.com/apache/nifi/pull/10526#discussion_r2544618748
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,36 @@ public void setup(final ProcessContext context) {
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the
onStopped method.
- getLogger().info("Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
- streamName, applicationName, workerId);
+ final InitializationResult result;
+ try {
+ result =
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
SECONDS);
+ } catch (final InterruptedException | ExecutionException |
TimeoutException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ cleanUpState();
+ throw new ProcessException(e);
+ }
+
+ switch (result) {
+ case InitializationResult.Success ignored ->
+ getLogger().info(
+ "Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
+ streamName, applicationName, workerId);
+ case InitializationResult.Failure failure -> {
+ cleanUpState();
+
+ final ProcessException ex = failure.error()
+ .map(err -> new ProcessException("Initialization
failed for stream [%s]".formatted(streamName), err))
+ // This branch is active only when a scheduler was
shutdown, but no initialization error was provided.
+ // This behavior isn't typical and wasn't observed.
+ .orElseGet(() -> new ProcessException((
+ "Initialization failed for stream [%s] due to
an unknown failure." +
+ " Check application logs for more
details").formatted(streamName)));
Review Comment:
Recommend streamlining this message
```suggestion
"Initialization failed for stream
[%s]").formatted(streamName)));
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -409,8 +420,36 @@ public void setup(final ProcessContext context) {
schedulerThread.start();
// The thread is stopped when kinesisScheduler is shutdown in the
onStopped method.
- getLogger().info("Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
- streamName, applicationName, workerId);
+ final InitializationResult result;
+ try {
+ result =
initializationListener.result().get(KINESIS_SCHEDULER_INITIALIZATION_TIMEOUT.getSeconds(),
SECONDS);
+ } catch (final InterruptedException | ExecutionException |
TimeoutException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ cleanUpState();
+ throw new ProcessException(e);
Review Comment:
A message should be added:
```suggestion
throw new ProcessException("Kinesis Scheduler initialization
failed", e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]