This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch NIFI-6169-RC1
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit f996b6e07827a2a66e7f1e03dd0fb3732a09f987
Author: Mark Payne <[email protected]>
AuthorDate: Fri Mar 29 09:25:10 2019 -0400

    NIFI-6155: Ensure that any task submitted to FlowEngine catches Throwable 
so that the task doesn't die just die silently in the case of an unexpected 
error/exception
    
    This closes #3395.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../nifi/controller/tasks/ConnectableTask.java     |  4 ++
 .../java/org/apache/nifi/engine/FlowEngine.java    | 50 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 5a49c72..180e2a2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -149,6 +149,8 @@ public class ConnectableTask {
     }
 
     public InvocationResult invoke() {
+        logger.trace("Triggering {}", connectable);
+
         if (scheduleState.isTerminated()) {
             return InvocationResult.DO_NOT_YIELD;
         }
@@ -165,12 +167,14 @@ public class ConnectableTask {
 
         // Make sure processor has work to do.
         if (!isWorkToDo()) {
+            logger.debug("Yielding {} because it has no work to do", 
connectable);
             return InvocationResult.yield("No work to do");
         }
 
         if (numRelationships > 0) {
             final int requiredNumberOfAvailableRelationships = 
connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
             if 
(!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships))
 {
+                logger.debug("Yielding {} because Backpressure is Applied", 
connectable);
                 return InvocationResult.yield("Backpressure Applied");
             }
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
index ffc463f..af63dcc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
@@ -20,11 +20,14 @@ import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public final class FlowEngine extends ScheduledThreadPoolExecutor {
@@ -79,6 +82,53 @@ public final class FlowEngine extends 
ScheduledThreadPoolExecutor {
         super.beforeExecute(thread, runnable);
     }
 
+    @Override
+    public ScheduledFuture<?> schedule(final Runnable command, final long 
delay, final TimeUnit unit) {
+        return super.schedule(wrap(command), delay, unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, 
final long initialDelay, final long period, final TimeUnit unit) {
+        return super.scheduleAtFixedRate(wrap(command), initialDelay, period, 
unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, 
final long initialDelay, final long delay, final TimeUnit unit) {
+        return super.scheduleWithFixedDelay(wrap(command), initialDelay, 
delay, unit);
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final 
long delay, final TimeUnit unit) {
+        return super.schedule(wrap(callable), delay, unit);
+    }
+
+    private Runnable wrap(final Runnable runnable) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    runnable.run();
+                } catch (final Throwable t) {
+                    logger.error("Uncaught Exception in Runnable task", t);
+                }
+            }
+        };
+    }
+
+    private <T> Callable<T> wrap(final Callable<T> callable) {
+        return new Callable<T>() {
+            @Override
+            public T call() throws Exception {
+                try {
+                    return callable.call();
+                } catch (final Throwable t) {
+                    logger.error("Uncaught Exception in Callable task", t);
+                    throw t;
+                }
+            }
+        };
+    }
+
     /**
      * Hook method called by the thread that executed the given runnable after 
execution of the runnable completed. Logs the fact of completion and any errors 
that might have occurred.
      *

Reply via email to