This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 76392ee NIFI-6155: Ensure that any task submitted to FlowEngine
catches Throwable so that the task doesn't die just die silently in the case of
an unexpected error/exception
76392ee is described below
commit 76392ee862294cdf783e20df6c7db39b6dc2f95d
Author: Mark Payne <[email protected]>
AuthorDate: Fri Mar 29 09:25:10 2019 -0400
NIFI-6155: Ensure that any task submitted to FlowEngine catches Throwable
so that the task doesn't die just die silently in the case of an unexpected
error/exception
This closes #3395.
Signed-off-by: Bryan Bende <[email protected]>
---
.../nifi/controller/tasks/ConnectableTask.java | 4 ++
.../java/org/apache/nifi/engine/FlowEngine.java | 50 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 5a49c72..180e2a2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -149,6 +149,8 @@ public class ConnectableTask {
}
public InvocationResult invoke() {
+ logger.trace("Triggering {}", connectable);
+
if (scheduleState.isTerminated()) {
return InvocationResult.DO_NOT_YIELD;
}
@@ -165,12 +167,14 @@ public class ConnectableTask {
// Make sure processor has work to do.
if (!isWorkToDo()) {
+ logger.debug("Yielding {} because it has no work to do",
connectable);
return InvocationResult.yield("No work to do");
}
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships =
connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
if
(!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships))
{
+ logger.debug("Yielding {} because Backpressure is Applied",
connectable);
return InvocationResult.yield("Backpressure Applied");
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
index ffc463f..af63dcc 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
@@ -20,11 +20,14 @@ import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class FlowEngine extends ScheduledThreadPoolExecutor {
@@ -79,6 +82,53 @@ public final class FlowEngine extends
ScheduledThreadPoolExecutor {
super.beforeExecute(thread, runnable);
}
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, final long
delay, final TimeUnit unit) {
+ return super.schedule(wrap(command), delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
final long initialDelay, final long period, final TimeUnit unit) {
+ return super.scheduleAtFixedRate(wrap(command), initialDelay, period,
unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
final long initialDelay, final long delay, final TimeUnit unit) {
+ return super.scheduleWithFixedDelay(wrap(command), initialDelay,
delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final
long delay, final TimeUnit unit) {
+ return super.schedule(wrap(callable), delay, unit);
+ }
+
+ private Runnable wrap(final Runnable runnable) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ runnable.run();
+ } catch (final Throwable t) {
+ logger.error("Uncaught Exception in Runnable task", t);
+ }
+ }
+ };
+ }
+
+ private <T> Callable<T> wrap(final Callable<T> callable) {
+ return new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ try {
+ return callable.call();
+ } catch (final Throwable t) {
+ logger.error("Uncaught Exception in Callable task", t);
+ throw t;
+ }
+ }
+ };
+ }
+
/**
* Hook method called by the thread that executed the given runnable after
execution of the runnable completed. Logs the fact of completion and any errors
that might have occurred.
*