This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch NIFI-6169-RC1 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit f996b6e07827a2a66e7f1e03dd0fb3732a09f987 Author: Mark Payne <[email protected]> AuthorDate: Fri Mar 29 09:25:10 2019 -0400 NIFI-6155: Ensure that any task submitted to FlowEngine catches Throwable so that the task doesn't die just die silently in the case of an unexpected error/exception This closes #3395. Signed-off-by: Bryan Bende <[email protected]> --- .../nifi/controller/tasks/ConnectableTask.java | 4 ++ .../java/org/apache/nifi/engine/FlowEngine.java | 50 ++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java index 5a49c72..180e2a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java @@ -149,6 +149,8 @@ public class ConnectableTask { } public InvocationResult invoke() { + logger.trace("Triggering {}", connectable); + if (scheduleState.isTerminated()) { return InvocationResult.DO_NOT_YIELD; } @@ -165,12 +167,14 @@ public class ConnectableTask { // Make sure processor has work to do. if (!isWorkToDo()) { + logger.debug("Yielding {} because it has no work to do", connectable); return InvocationResult.yield("No work to do"); } if (numRelationships > 0) { final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships; if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) { + logger.debug("Yielding {} because Backpressure is Applied", connectable); return InvocationResult.yield("Backpressure Applied"); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java index ffc463f..af63dcc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java @@ -20,11 +20,14 @@ import org.apache.nifi.nar.NarThreadContextClassLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public final class FlowEngine extends ScheduledThreadPoolExecutor { @@ -79,6 +82,53 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor { super.beforeExecute(thread, runnable); } + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + return super.schedule(wrap(command), delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + return super.scheduleAtFixedRate(wrap(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + return super.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit); + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { + return super.schedule(wrap(callable), delay, unit); + } + + private Runnable wrap(final Runnable runnable) { + return new Runnable() { + @Override + public void run() { + try { + runnable.run(); + } catch (final Throwable t) { + logger.error("Uncaught Exception in Runnable task", t); + } + } + }; + } + + private <T> Callable<T> wrap(final Callable<T> callable) { + return new Callable<T>() { + @Override + public T call() throws Exception { + try { + return callable.call(); + } catch (final Throwable t) { + logger.error("Uncaught Exception in Callable task", t); + throw t; + } + } + }; + } + /** * Hook method called by the thread that executed the given runnable after execution of the runnable completed. Logs the fact of completion and any errors that might have occurred. *
