Repository: nifi
Updated Branches:
  refs/heads/master 2a5e21c11 -> 14fef2de1


NIFI-4772: Refactored how the @OnScheduled methods of processors is 
invoked/monitored. The new method does away with the two previously created 
8-thread thread pools and just uses the Timer-Driven thread pool that is used 
by other framework tasks.

NIFI-4772: Introduced a new thread-pool with 2 threads that will be used for 
monitoring lifecycle task. This means that if all threads in the timer-driven 
thead pool are blocked by processors that don't complete their @OnScheduled 
methods, we have a separate thread pool that at least gives us a chance of 
interrupting those threads

NIFI-4772: Remove unused import
Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #2403


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/14fef2de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/14fef2de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/14fef2de

Branch: refs/heads/master
Commit: 14fef2de146eab8a42a3b72eda1744378c9584a2
Parents: 2a5e21c
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jan 12 14:12:57 2018 -0500
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Mon Feb 19 09:19:53 2018 -0500

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowController.java  |   8 +-
 .../nifi/controller/StandardProcessorNode.java  | 175 ++++++++++---------
 .../ComponentStartTimeoutException.java         |  24 +++
 .../scheduling/StandardProcessScheduler.java    |  13 +-
 .../controller/TestStandardProcessorNode.java   |   2 +-
 .../scheduling/LongEnablingService.java         |  56 ++++++
 .../scheduling/StandardProcessSchedulerIT.java  |  92 ++++++++++
 .../scheduling/TestProcessorLifecycle.java      |  11 +-
 .../TestStandardProcessScheduler.java           | 161 ++++++++++-------
 .../processors/FailOnScheduledProcessor.java    |  89 ++++++++++
 .../StandardControllerServiceProviderIT.java    | 158 +++++++++++++++++
 .../TestStandardControllerServiceProvider.java  |  75 +-------
 12 files changed, 625 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 88dc11c..e68000f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -509,7 +509,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             throw new RuntimeException(e);
         }
 
-        processScheduler = new StandardProcessScheduler(this, encryptor, 
stateManagerProvider, this.nifiProperties);
+        processScheduler = new 
StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, 
stateManagerProvider, this.nifiProperties);
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, 
processScheduler);
 
         final ProcessContextFactory contextFactory = new 
ProcessContextFactory(contentRepository, flowFileRepository, 
flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
@@ -1661,13 +1661,13 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     /**
      * Updates the number of threads that can be simultaneously used for
      * executing processors.
+     * This method must be called while holding the write lock!
      *
-     * @param maxThreadCount This method must be called while holding the write
-     * lock!
+     * @param maxThreadCount max number of threads
      */
     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine 
engine, final AtomicInteger maxThreads) {
         if (maxThreadCount < 1) {
-            throw new IllegalArgumentException();
+            throw new IllegalArgumentException("Cannot set max number of 
threads to less than 2");
         }
 
         maxThreads.getAndSet(maxThreadCount);

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 187b62f..12f4b1e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -18,7 +18,6 @@ package org.apache.nifi.controller;
 
 import static java.util.Objects.requireNonNull;
 
-import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,11 +31,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -131,12 +128,12 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private final ProcessScheduler processScheduler;
     private long runNanos = 0L;
     private volatile long yieldNanos;
-    private final NiFiProperties nifiProperties;
     private volatile ScheduledState desiredState;
 
     private SchedulingStrategy schedulingStrategy; // guarded by read/write 
lock
                                                    // ??????? NOT any more
     private ExecutionNode executionNode;
+    private final long onScheduleTimeoutMillis;
 
     public StandardProcessorNode(final LoggableComponent<Processor> processor, 
final String uuid,
                                  final ValidationContextFactory 
validationContextFactory, final ProcessScheduler scheduler,
@@ -176,7 +173,9 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         this.processGroup = new AtomicReference<>();
         processScheduler = scheduler;
         penalizationPeriod = new 
AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
-        this.nifiProperties = nifiProperties;
+
+        final String timeoutString = 
nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
+        onScheduleTimeoutMillis = timeoutString == null ? 60000 : 
FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
 
         schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
         executionNode = ExecutionNode.ALL;
@@ -300,6 +299,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public boolean isIsolated() {
         return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || 
executionNode == ExecutionNode.PRIMARY;
     }
@@ -465,6 +465,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public synchronized void setScheduldingPeriod(final String 
schedulingPeriod) {
         if (isRunning()) {
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
@@ -1312,7 +1313,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         }
 
         if (starting) { // will ensure that the Processor represented by this 
node can only be started once
-            taskScheduler.execute(() -> initiateStart(taskScheduler, 
administrativeYieldMillis, processContext, schedulingAgentCallback));
+            initiateStart(taskScheduler, administrativeYieldMillis, 
processContext, schedulingAgentCallback);
         } else {
             final String procName = processorRef.get().toString();
             LOG.warn("Cannot start {} because it is not currently stopped. 
Current state is {}", procName, currentState);
@@ -1326,40 +1327,83 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
         final Processor processor = getProcessor();
         final ComponentLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
-        try {
-            invokeTaskAsCancelableFuture(schedulingAgentCallback, new 
Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
-                        
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, 
processContext);
-                        return null;
+        final long completionTimestamp = System.currentTimeMillis() + 
onScheduleTimeoutMillis;
+
+        // Create a task to invoke the @OnScheduled annotation of the processor
+        final Callable<Void> startupTask = () -> {
+            LOG.debug("Invoking @OnScheduled methods of {}", processor);
+
+            try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
+                try {
+                    
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, 
processContext);
+
+                    if (scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.RUNNING)) {
+                        LOG.debug("Successfully completed the @OnScheduled 
methods of {}; will now start triggering processor to run", processor);
+                        schedulingAgentCallback.trigger(); // callback 
provided by StandardProcessScheduler to essentially initiate component's 
onTrigger() cycle
+                    } else {
+                        LOG.debug("Successfully invoked @OnScheduled methods 
of {} but scheduled state is no longer STARTING so will stop processor now", 
processor);
+
+                        // can only happen if stopProcessor was called before 
service was transitioned to RUNNING state
+                        try {
+                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                        } finally {
+                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
+                        }
+
+                        scheduledState.set(ScheduledState.STOPPED);
                     }
+                } finally {
+                    schedulingAgentCallback.onTaskComplete();
                 }
-            });
+            } catch (final Exception e) {
+                procLog.error("Failed to properly initialize Processor. If 
still scheduled to run, NiFi will attempt to "
+                    + "initialize and run the Processor again after the 
'Administrative Yield Duration' has elapsed. Failure is due to " + e, e);
 
-            if (scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.RUNNING)) {
-                schedulingAgentCallback.trigger(); // callback provided by 
StandardProcessScheduler to essentially initiate component's onTrigger() cycle
-            } else { // can only happen if stopProcessor was called before 
service was transitioned to RUNNING state
+                // If processor's task completed Exceptionally, then we want 
to retry initiating the start (if Processor is still scheduled to run).
                 try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
-                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                    try {
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                    } finally {
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
+                    }
+                }
+
+                // make sure we only continue retry loop if STOP action wasn't 
initiated
+                if (scheduledState.get() != ScheduledState.STOPPING) {
+                    // re-initiate the entire process
+                    final Runnable initiateStartTask = () -> 
initiateStart(taskScheduler, administrativeYieldMillis, processContext, 
schedulingAgentCallback);
+                    taskScheduler.schedule(initiateStartTask, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
+                } else {
+                    scheduledState.set(ScheduledState.STOPPED);
                 }
-                scheduledState.set(ScheduledState.STOPPED);
             }
-        } catch (final Exception e) {
-            final Throwable cause = e instanceof InvocationTargetException ? 
e.getCause() : e;
-            procLog.error("{} failed to invoke @OnScheduled method due to {}; 
processor will not be scheduled to run for {} seconds",
-                    new Object[]{StandardProcessorNode.this.getProcessor(), 
cause, administrativeYieldMillis / 1000L}, cause);
-            LOG.error("Failed to invoke @OnScheduled method due to {}", 
cause.toString(), cause);
 
-            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
-            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
+            return null;
+        };
+
+        // Trigger the task in a background thread.
+        final Future<?> taskFuture = 
schedulingAgentCallback.scheduleTask(startupTask);
+
+        // Trigger a task periodically to check if @OnScheduled task 
completed. Once it has,
+        // this task will call SchedulingAgentCallback#onTaskComplete.
+        // However, if the task times out, we need to be able to cancel the 
monitoring. So, in order
+        // to do this, we use #scheduleWithFixedDelay and then make that 
Future available to the task
+        // itself by placing it into an AtomicReference.
+        final AtomicReference<Future<?>> futureRef = new AtomicReference<>();
+        final Runnable monitoringTask = new Runnable() {
+            @Override
+            public void run() {
+                Future<?> monitoringFuture = futureRef.get();
+                if (monitoringFuture == null) { // Future is not yet 
available. Just return and wait for the next invocation.
+                    return;
+                }
 
-            if (scheduledState.get() != ScheduledState.STOPPING) { // make 
sure we only continue retry loop if STOP action wasn't initiated
-                taskScheduler.schedule(() -> initiateStart(taskScheduler, 
administrativeYieldMillis, processContext, schedulingAgentCallback), 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
-            } else {
-                scheduledState.set(ScheduledState.STOPPED);
+                monitorAsyncTask(taskFuture, monitoringFuture, 
completionTimestamp);
             }
-        }
+        };
+
+        final Future<?> future = 
taskScheduler.scheduleWithFixedDelay(monitoringTask, 1, 10, 
TimeUnit.MILLISECONDS);
+        futureRef.set(future);
     }
 
     /**
@@ -1451,59 +1495,26 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
         return future;
     }
 
-    /**
-     * Will invoke lifecycle operation (OnScheduled or OnUnscheduled)
-     * asynchronously to ensure that it could be interrupted if stop action was
-     * initiated on the processor that may be infinitely blocking in such
-     * operation. While this approach paves the way for further enhancements
-     * related to managing processor'slife-cycle operation at the moment the
-     * interrupt will not happen automatically. This is primarily to preserve
-     * the existing behavior of the NiFi where stop operation can only be
-     * invoked once the processor is started. Unfortunately that could mean 
that
-     * the processor may be blocking indefinitely in lifecycle operation
-     * (OnScheduled or OnUnscheduled). To deal with that a new NiFi property 
has
-     * been introduced <i>nifi.processor.scheduling.timeout</i> which allows 
one
-     * to set the time (in milliseconds) of how long to wait before canceling
-     * such lifecycle operation (OnScheduled or OnUnscheduled) allowing
-     * processor's stop sequence to proceed. The default value for this 
property
-     * is {@link Long#MAX_VALUE}.
-     * <p>
-     * NOTE: Canceling the task does not guarantee that the task will actually
-     * completes (successfully or otherwise), since cancellation of the task
-     * will issue a simple Thread.interrupt(). However code inside of lifecycle
-     * operation (OnScheduled or OnUnscheduled) is written purely and will
-     * ignore thread interrupts you may end up with runaway thread which may
-     * eventually require NiFi reboot. In any event, the above explanation will
-     * be logged (WARN) informing a user so further actions could be taken.
-     * </p>
-     */
-    private <T> void invokeTaskAsCancelableFuture(final 
SchedulingAgentCallback callback, final Callable<T> task) {
-        final Processor processor = processorRef.get().getProcessor();
-        final String timeoutString = 
nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
-        final long onScheduleTimeout = timeoutString == null ? 60000
-                : FormatUtils.getTimeDuration(timeoutString.trim(), 
TimeUnit.MILLISECONDS);
-        final Future<?> taskFuture = callback.scheduleTask(task);
-        try {
-            taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
-        } catch (final InterruptedException e) {
-            LOG.warn("Thread was interrupted while waiting for processor '" + 
processor.getClass().getSimpleName()
-                    + "' lifecycle OnScheduled operation to finish.");
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("Interrupted while executing one of 
processor's OnScheduled tasks.", e);
-        } catch (final TimeoutException e) {
+
+    private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> 
monitoringFuture, final long completionTimestamp) {
+        if (taskFuture.isDone()) {
+            monitoringFuture.cancel(false); // stop scheduling this task
+        } else if (System.currentTimeMillis() > completionTimestamp) {
+            // Task timed out. Request an interrupt of the processor task
             taskFuture.cancel(true);
-            LOG.warn("Timed out while waiting for OnScheduled of '"
-                    + processor.getClass().getSimpleName()
-                    + "' processor to finish. An attempt is made to cancel the 
task via Thread.interrupt(). However it does not "
-                    + "guarantee that the task will be canceled since the code 
inside current OnScheduled operation may "
+
+            // Stop monitoring the processor. We have interrupted the thread 
so that's all we can do. If the processor responds to the interrupt, then
+            // it will be re-scheduled. If it does not, then it will either 
keep the thread indefinitely or eventually finish, at which point
+            // the Processor will begin running.
+            monitoringFuture.cancel(false);
+
+            final Processor processor = processorRef.get().getProcessor();
+            LOG.warn("Timed out while waiting for OnScheduled of "
+                + processor + " to finish. An attempt is made to cancel the 
task via Thread.interrupt(). However it does not "
+                + "guarantee that the task will be canceled since the code 
inside current OnScheduled operation may "
                 + "have been written to ignore interrupts which may result in 
a runaway thread. This could lead to more issues, "
-                    + "eventually requiring NiFi to be restarted. This is 
usually a bug in the target Processor '"
-                    + processor + "' that needs to be documented, reported and 
eventually fixed.");
-            throw new RuntimeException("Timed out while executing one of 
processor's OnScheduled task.", e);
-        } catch (final ExecutionException e){
-            throw new RuntimeException("Failed while executing one of 
processor's OnScheduled task.", e);
-        } finally {
-            callback.onTaskComplete();
+                + "eventually requiring NiFi to be restarted. This is usually 
a bug in the target Processor '"
+                + processor + "' that needs to be documented, reported and 
eventually fixed.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/exception/ComponentStartTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/exception/ComponentStartTimeoutException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/exception/ComponentStartTimeoutException.java
new file mode 100644
index 0000000..cca8698
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/exception/ComponentStartTimeoutException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.exception;
+
+public class ComponentStartTimeoutException extends Exception {
+    public ComponentStartTimeoutException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index d08d701..1155bfe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -77,19 +77,21 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     private final ConcurrentMap<Object, ScheduleState> scheduleStates = new 
ConcurrentHashMap<>();
     private final ScheduledExecutorService frameworkTaskExecutor;
     private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> 
strategyAgentMap = new ConcurrentHashMap<>();
-    // thread pool for starting/stopping components
 
-    private final ScheduledExecutorService componentLifeCycleThreadPool = new 
FlowEngine(8, "StandardProcessScheduler", true);
-    private final ScheduledExecutorService componentMonitoringThreadPool = new 
FlowEngine(8, "StandardProcessScheduler", true);
+    // thread pool for starting/stopping components
+    private final ScheduledExecutorService componentLifeCycleThreadPool;
+    private final ScheduledExecutorService componentMonitoringThreadPool = new 
FlowEngine(2, "Monitor Processore Lifecycle", true);
 
     private final StringEncryptor encryptor;
 
     public StandardProcessScheduler(
+        final FlowEngine componentLifecycleThreadPool,
             final ControllerServiceProvider controllerServiceProvider,
             final StringEncryptor encryptor,
             final StateManagerProvider stateManagerProvider,
             final NiFiProperties nifiProperties
     ) {
+        this.componentLifeCycleThreadPool = componentLifecycleThreadPool;
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
         this.stateManagerProvider = stateManagerProvider;
@@ -164,7 +166,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
         frameworkTaskExecutor.shutdown();
         componentLifeCycleThreadPool.shutdown();
-        componentMonitoringThreadPool.shutdown();
     }
 
     @Override
@@ -313,7 +314,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             @Override
             public Future<?> scheduleTask(Callable<?> task) {
                 scheduleState.incrementActiveThreadCount();
-                return componentMonitoringThreadPool.submit(task);
+                return componentLifeCycleThreadPool.submit(task);
             }
 
             @Override
@@ -323,7 +324,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         };
 
         LOG.info("Starting {}", procNode);
-        procNode.start(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, processContext, callback, failIfStopping);
+        procNode.start(this.componentMonitoringThreadPool, 
this.administrativeYieldMillis, processContext, callback, failIfStopping);
         return future;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index 7b39963..9bff6f6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -100,7 +100,7 @@ public class TestStandardProcessorNode {
         final LoggableComponent<Processor> loggableComponent = new 
LoggableComponent<>(processor, coordinate, null);
         final StandardProcessorNode procNode = new 
StandardProcessorNode(loggableComponent, uuid, 
createValidationContextFactory(), null, null,
             NiFiProperties.createBasicNiFiProperties(null, null), new 
StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), 
reloadComponent);
-        final ScheduledExecutorService taskScheduler = new FlowEngine(2, 
"TestClasspathResources", true);
+        final ScheduledExecutorService taskScheduler = new FlowEngine(1, 
"TestClasspathResources", true);
 
         final StandardProcessContext processContext = new 
StandardProcessContext(procNode, null, null, null);
         final SchedulingAgentCallback schedulingAgentCallback = new 
SchedulingAgentCallback() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/LongEnablingService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/LongEnablingService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/LongEnablingService.java
new file mode 100644
index 0000000..09b824a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/LongEnablingService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.scheduling;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+
+public class LongEnablingService extends AbstractControllerService {
+
+    private final AtomicInteger enableCounter = new AtomicInteger();
+    private final AtomicInteger disableCounter = new AtomicInteger();
+
+    private volatile long limit;
+
+    @OnEnabled
+    public void enable(final ConfigurationContext context) throws Exception {
+        this.enableCounter.incrementAndGet();
+        Thread.sleep(limit);
+    }
+
+    @OnDisabled
+    public void disable(final ConfigurationContext context) {
+        this.disableCounter.incrementAndGet();
+    }
+
+    public int enableInvocationCount() {
+        return this.enableCounter.get();
+    }
+
+    public int disableInvocationCount() {
+        return this.disableCounter.get();
+    }
+
+    public void setLimit(final long limit) {
+        this.limit = limit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
new file mode 100644
index 0000000..2d7b22f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.scheduling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class StandardProcessSchedulerIT {
+    private final StateManagerProvider stateMgrProvider = 
Mockito.mock(StateManagerProvider.class);
+    private VariableRegistry variableRegistry = 
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
+    private FlowController controller;
+    private NiFiProperties nifiProperties;
+    private Bundle systemBundle;
+    private volatile String propsFile = 
TestStandardProcessScheduler.class.getResource("/standardprocessschedulertest.nifi.properties").getFile();
+
+    @Before
+    public void setup() throws InitializationException {
+        this.nifiProperties = 
NiFiProperties.createBasicNiFiProperties(propsFile, null);
+
+        // load the system bundle
+        systemBundle = SystemBundle.create(nifiProperties);
+        ExtensionManager.discoverExtensions(systemBundle, 
Collections.emptySet());
+
+        controller = Mockito.mock(FlowController.class);
+    }
+
+    /**
+     * Validates that the service that is currently in ENABLING state can be
+     * disabled and that its @OnDisabled operation will be invoked as soon as
+     *
+     * @OnEnable finishes.
+     */
+    @Test
+    public void validateLongEnablingServiceCanStillBeDisabled() throws 
Exception {
+        final StandardProcessScheduler scheduler = new 
StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, 
stateMgrProvider, nifiProperties);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(controller, scheduler, null, 
stateMgrProvider, variableRegistry, nifiProperties);
+        final ControllerServiceNode serviceNode = 
provider.createControllerService(LongEnablingService.class.getName(),
+            "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final LongEnablingService ts = (LongEnablingService) 
serviceNode.getControllerServiceImplementation();
+        ts.setLimit(3000);
+        scheduler.enableControllerService(serviceNode);
+        Thread.sleep(2000);
+        assertTrue(serviceNode.isActive());
+        assertEquals(1, ts.enableInvocationCount());
+
+        Thread.sleep(500);
+        scheduler.disableControllerService(serviceNode);
+        assertFalse(serviceNode.isActive());
+        assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
+        assertEquals(0, ts.disableInvocationCount());
+        // wait a bit. . . Enabling will finish and @OnDisabled will be invoked
+        // automatically
+        Thread.sleep(4000);
+        assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
+        assertEquals(1, ts.disableInvocationCount());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index 1b54c64..d459f5c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -339,7 +339,7 @@ public class TestProcessorLifecycle {
         TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
 
         // sets the scenario for the processor to run
-        int delay = 2000;
+        int delay = 200;
         this.longRunningOnSchedule(testProcessor, delay);
         ProcessScheduler ps = fc.getProcessScheduler();
 
@@ -348,9 +348,10 @@ public class TestProcessorLifecycle {
 
         ps.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), 5000L);
-        assertCondition(() -> testProcessor.operationNames.size() == 2, 8000L);
+        assertCondition(() -> testProcessor.operationNames.size() == 3, 8000L);
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
         assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
+        assertEquals("@OnStopped", testProcessor.operationNames.get(2));
     }
 
     /**
@@ -442,7 +443,7 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() 
throws Exception {
-        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 
sec");
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 
sec");
         fc = fcsb.getFlowController();
 
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
@@ -462,7 +463,7 @@ public class TestProcessorLifecycle {
     }
 
     /**
-     * Validates that processor can be stopped if onTrigger() keeps trowing
+     * Validates that processor can be stopped if onTrigger() keeps throwing
      * exceptions.
      */
     @Test
@@ -593,7 +594,7 @@ public class TestProcessorLifecycle {
         testProcNodeB.setProperties(properties);
         testGroup.addProcessor(testProcNodeB);
 
-        Collection<String> relationNames = new ArrayList<String>();
+        Collection<String> relationNames = new ArrayList<>();
         relationNames.add("relation");
         Connection connection = 
fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), 
testProcNodeA, testProcNodeB, relationNames);
         testGroup.addConnection(connection);

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index c0b36c9..2497ac7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -55,12 +55,14 @@ import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.cluster.Heartbeater;
 import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
 import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
+import 
org.apache.nifi.controller.scheduling.processors.FailOnScheduledProcessor;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceNode;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.service.mock.MockProcessGroup;
+import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
@@ -104,13 +106,16 @@ public class TestStandardProcessScheduler {
 
     @Before
     public void setup() throws InitializationException {
-        this.nifiProperties = 
NiFiProperties.createBasicNiFiProperties(propsFile, null);
+        final Map<String, String> overrideProperties = new HashMap<>();
+        overrideProperties.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, 
"2 millis");
+        overrideProperties.put(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, 
"10 millis");
+        this.nifiProperties = 
NiFiProperties.createBasicNiFiProperties(propsFile, overrideProperties);
 
         // load the system bundle
         systemBundle = SystemBundle.create(nifiProperties);
         ExtensionManager.discoverExtensions(systemBundle, 
Collections.emptySet());
 
-        scheduler = new 
StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, 
stateMgrProvider, nifiProperties);
+        scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit 
Test", true), Mockito.mock(ControllerServiceProvider.class), null, 
stateMgrProvider, nifiProperties);
         scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, 
Mockito.mock(SchedulingAgent.class));
 
         reportingTask = new TestReportingTask();
@@ -167,13 +172,13 @@ public class TestStandardProcessScheduler {
         scheduler.schedule(taskNode);
 
         // Let it try to run a few times.
-        Thread.sleep(1000L);
+        Thread.sleep(25L);
 
         scheduler.unschedule(taskNode);
 
         final int attempts = reportingTask.onScheduleAttempts.get();
         // give it a sec to make sure that it's finished running.
-        Thread.sleep(1500L);
+        Thread.sleep(250L);
         final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - 
attempts;
 
         // allow 1 extra run, due to timing issues that could call it as it's 
being stopped.
@@ -207,7 +212,7 @@ public class TestStandardProcessScheduler {
         scheduler.enableControllerService(service);
         scheduler.startProcessor(procNode, true);
 
-        Thread.sleep(1000L);
+        Thread.sleep(25L);
 
         scheduler.stopProcessor(procNode);
         assertTrue(service.isActive());
@@ -215,7 +220,10 @@ public class TestStandardProcessScheduler {
         scheduler.disableControllerService(service);
         assertTrue(service.getState() == ControllerServiceState.DISABLING);
         assertFalse(service.isActive());
-        Thread.sleep(2000);
+
+        while (service.getState() != ControllerServiceState.DISABLED) {
+            Thread.sleep(5L);
+        }
         assertTrue(service.getState() == ControllerServiceState.DISABLED);
     }
 
@@ -472,37 +480,87 @@ public class TestStandardProcessScheduler {
         assertEquals(0, ts.disableInvocationCount());
     }
 
-    /**
-     * Validates that the service that is currently in ENABLING state can be
-     * disabled and that its @OnDisabled operation will be invoked as soon as
-     *
-     * @OnEnable finishes.
-     */
-    @Test
-    public void validateLongEnablingServiceCanStillBeDisabled() throws 
Exception {
-        final StandardProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(controller, scheduler, null, 
stateMgrProvider, variableRegistry, nifiProperties);
-        final ControllerServiceNode serviceNode = 
provider.createControllerService(LongEnablingService.class.getName(),
-                "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false);
-        final LongEnablingService ts = (LongEnablingService) 
serviceNode.getControllerServiceImplementation();
-        ts.setLimit(3000);
-        scheduler.enableControllerService(serviceNode);
-        Thread.sleep(2000);
-        assertTrue(serviceNode.isActive());
-        assertEquals(1, ts.enableInvocationCount());
+    // Test that if processor throws Exception in @OnScheduled, it keeps 
getting scheduled
+    @Test(timeout = 10000)
+    public void testProcessorThrowsExceptionOnScheduledRetry() throws 
InterruptedException {
+        final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
+        proc.setDesiredFailureCount(3);
 
-        Thread.sleep(500);
-        scheduler.disableControllerService(serviceNode);
-        assertFalse(serviceNode.isActive());
-        assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
-        assertEquals(0, ts.disableInvocationCount());
-        // wait a bit. . . Enabling will finish and @OnDisabled will be invoked
-        // automatically
-        Thread.sleep(4000);
-        assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
-        assertEquals(1, ts.disableInvocationCount());
+        proc.initialize(new 
StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, 
null, null, nifiProperties));
+        final ReloadComponent reloadComponent = 
Mockito.mock(ReloadComponent.class);
+        final LoggableComponent<Processor> loggableComponent = new 
LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), 
null);
+
+        final ProcessorNode procNode = new 
StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
+            new StandardValidationContextFactory(controller, variableRegistry),
+            scheduler, controller, nifiProperties, new 
StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), 
reloadComponent);
+
+        rootGroup.addProcessor(procNode);
+
+        scheduler.startProcessor(procNode, true);
+        while (!proc.isSucceess()) {
+            Thread.sleep(5L);
+        }
+
+        assertEquals(3, proc.getOnScheduledInvocationCount());
+    }
+
+    // Test that if processor times out in the @OnScheduled but responds to 
interrupt, it keeps getting scheduled
+    @Test(timeout = 1000000)
+    public void testProcessorTimeOutRespondsToInterrupt() throws 
InterruptedException {
+        final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
+        proc.setDesiredFailureCount(0);
+        proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, true, 1);
+
+        proc.initialize(new 
StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, 
null, null, nifiProperties));
+        final ReloadComponent reloadComponent = 
Mockito.mock(ReloadComponent.class);
+        final LoggableComponent<Processor> loggableComponent = new 
LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), 
null);
+
+        final ProcessorNode procNode = new 
StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
+            new StandardValidationContextFactory(controller, variableRegistry),
+            scheduler, controller, nifiProperties, new 
StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), 
reloadComponent);
+
+        rootGroup.addProcessor(procNode);
+
+        scheduler.startProcessor(procNode, true);
+        while (!proc.isSucceess()) {
+            Thread.sleep(5L);
+        }
+
+        // The first time that the processor's @OnScheduled method is called, 
it will sleep for 20 minutes. The scheduler should interrupt
+        // that thread and then try again. The second time, the Processor will 
not sleep because setOnScheduledSleepDuration was called
+        // above with iterations = 1
+        assertEquals(2, proc.getOnScheduledInvocationCount());
     }
 
+    // Test that if processor times out in the @OnScheduled and does not 
respond to interrupt, it is not scheduled again
+    @Test(timeout = 10000)
+    public void testProcessorTimeOutNoResponseToInterrupt() throws 
InterruptedException {
+        final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
+        proc.setDesiredFailureCount(0);
+        proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, false, 1);
+
+        proc.initialize(new 
StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, 
null, null, nifiProperties));
+        final ReloadComponent reloadComponent = 
Mockito.mock(ReloadComponent.class);
+        final LoggableComponent<Processor> loggableComponent = new 
LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), 
null);
+
+        final ProcessorNode procNode = new 
StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
+            new StandardValidationContextFactory(controller, variableRegistry),
+            scheduler, controller, nifiProperties, new 
StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), 
reloadComponent);
+
+        rootGroup.addProcessor(procNode);
+
+        scheduler.startProcessor(procNode, true);
+
+        Thread.sleep(100L);
+        assertEquals(1, proc.getOnScheduledInvocationCount());
+        Thread.sleep(100L);
+        assertEquals(1, proc.getOnScheduledInvocationCount());
+
+        // Allow test to complete.
+        proc.setAllowSleepInterrupt(true);
+    }
+
+
     public static class FailingService extends AbstractControllerService {
 
         @OnEnabled
@@ -512,7 +570,6 @@ public class TestStandardProcessScheduler {
     }
 
     public static class RandomShortDelayEnablingService extends 
AbstractControllerService {
-
         private final Random random = new Random();
 
         @OnEnabled
@@ -526,7 +583,6 @@ public class TestStandardProcessScheduler {
     }
 
     public static class SimpleTestService extends AbstractControllerService {
-
         private final AtomicInteger enableCounter = new AtomicInteger();
         private final AtomicInteger disableCounter = new AtomicInteger();
 
@@ -549,38 +605,7 @@ public class TestStandardProcessScheduler {
         }
     }
 
-    public static class LongEnablingService extends AbstractControllerService {
-
-        private final AtomicInteger enableCounter = new AtomicInteger();
-        private final AtomicInteger disableCounter = new AtomicInteger();
-
-        private volatile long limit;
-
-        @OnEnabled
-        public void enable(final ConfigurationContext context) throws 
Exception {
-            this.enableCounter.incrementAndGet();
-            Thread.sleep(limit);
-        }
-
-        @OnDisabled
-        public void disable(final ConfigurationContext context) {
-            this.disableCounter.incrementAndGet();
-        }
-
-        public int enableInvocationCount() {
-            return this.enableCounter.get();
-        }
-
-        public int disableInvocationCount() {
-            return this.disableCounter.get();
-        }
-
-        public void setLimit(final long limit) {
-            this.limit = limit;
-        }
-    }
-
     private StandardProcessScheduler createScheduler() {
-        return new StandardProcessScheduler(null, null, stateMgrProvider, 
nifiProperties);
+        return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", 
true), null, null, stateMgrProvider, nifiProperties);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/processors/FailOnScheduledProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/processors/FailOnScheduledProcessor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/processors/FailOnScheduledProcessor.java
new file mode 100644
index 0000000..acfe390
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/processors/FailOnScheduledProcessor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.scheduling.processors;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class FailOnScheduledProcessor extends AbstractProcessor {
+
+    private volatile int invocationCount = 0;
+    private volatile int desiredFailureCount = 1;
+    private volatile long onScheduledSleepMillis = 0L;
+    private volatile int onScheduledSleepIterations = 0;
+    private volatile boolean allowSleepInterrupt = true;
+    private final AtomicBoolean succeeded = new AtomicBoolean();
+
+    public void setDesiredFailureCount(final int desiredFailureCount) {
+        this.desiredFailureCount = desiredFailureCount;
+    }
+
+    public void setOnScheduledSleepDuration(final long duration, final 
TimeUnit unit, final boolean allowInterrupt, final int iterations) {
+        this.onScheduledSleepMillis = unit.toMillis(duration);
+        this.onScheduledSleepIterations = iterations;
+        this.allowSleepInterrupt = allowInterrupt;
+    }
+
+    public void setAllowSleepInterrupt(final boolean allow) {
+        this.allowSleepInterrupt = allow;
+    }
+
+    @OnScheduled
+    public void onScheduled() throws InterruptedException {
+        invocationCount++;
+
+        if (invocationCount <= onScheduledSleepIterations && 
onScheduledSleepMillis > 0L) {
+            final long sleepFinish = System.currentTimeMillis() + 
onScheduledSleepMillis;
+
+            while (System.currentTimeMillis() < sleepFinish) {
+                try {
+                    Thread.sleep(Math.max(0, sleepFinish - 
System.currentTimeMillis()));
+                } catch (final InterruptedException ie) {
+                    if (allowSleepInterrupt) {
+                        Thread.currentThread().interrupt();
+                        throw ie;
+                    } else {
+                        continue;
+                    }
+                }
+            }
+        }
+
+        if (invocationCount < desiredFailureCount) {
+            throw new ProcessException("Intentional failure for unit test");
+        } else {
+            succeeded.set(true);
+        }
+    }
+
+    public int getOnScheduledInvocationCount() {
+        return invocationCount;
+    }
+
+    public boolean isSucceess() {
+        return succeeded.get();
+    }
+
+    @Override
+    public void onTrigger(org.apache.nifi.processor.ProcessContext context, 
org.apache.nifi.processor.ProcessSession session) throws ProcessException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
new file mode 100644
index 0000000..015ae67
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.service;
+
+import static org.junit.Assert.assertTrue;
+
+import java.beans.PropertyDescriptor;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+import org.apache.nifi.controller.service.mock.MockProcessGroup;
+import org.apache.nifi.controller.service.mock.ServiceA;
+import org.apache.nifi.controller.service.mock.ServiceB;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class StandardControllerServiceProviderIT {
+    private static Bundle systemBundle;
+    private static NiFiProperties niFiProperties;
+    private static VariableRegistry variableRegistry = 
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
+
+    private static StateManagerProvider stateManagerProvider = new 
StateManagerProvider() {
+        @Override
+        public StateManager getStateManager(final String componentId) {
+            return Mockito.mock(StateManager.class);
+        }
+
+        @Override
+        public void shutdown() {
+        }
+
+        @Override
+        public void enableClusterProvider() {
+        }
+
+        @Override
+        public void disableClusterProvider() {
+        }
+
+        @Override
+        public void onComponentRemoved(final String componentId) {
+        }
+    };
+
+    @BeforeClass
+    public static void setNiFiProps() {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
TestStandardControllerServiceProvider.class.getResource("/conf/nifi.properties").getFile());
+        niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
+
+        // load the system bundle
+        systemBundle = SystemBundle.create(niFiProperties);
+        ExtensionManager.discoverExtensions(systemBundle, 
Collections.emptySet());
+    }
+
+    /**
+     * We run the same test 1000 times and prior to bug fix (see NIFI-1143) it
+     * would fail on some iteration. For more details please see
+     * {@link PropertyDescriptor}.isDependentServiceEnableable() as well as
+     * https://issues.apache.org/jira/browse/NIFI-1143
+     */
+    @Test(timeout = 120000)
+    public void testConcurrencyWithEnablingReferencingServicesGraph() throws 
InterruptedException {
+        final StandardProcessScheduler scheduler = new 
StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, 
stateManagerProvider, niFiProperties);
+        for (int i = 0; i < 5000; i++) {
+            testEnableReferencingServicesGraph(scheduler);
+        }
+    }
+
+    public void testEnableReferencingServicesGraph(final 
StandardProcessScheduler scheduler) {
+        final FlowController controller = Mockito.mock(FlowController.class);
+        final ProcessGroup procGroup = new MockProcessGroup(controller);
+        
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
+
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(controller, scheduler, null, 
stateManagerProvider, variableRegistry, niFiProperties);
+
+        // build a graph of controller services with dependencies as such:
+        //
+        // A -> B -> D
+        // C ---^----^
+        //
+        // In other words, A references B, which references D.
+        // AND
+        // C references B and D.
+        //
+        // So we have to verify that if D is enabled, when we enable its 
referencing services,
+        // we enable C and B, even if we attempt to enable C before B... i.e., 
if we try to enable C, we cannot do so
+        // until B is first enabled so ensure that we enable B first.
+        final ControllerServiceNode serviceNode1 = 
provider.createControllerService(ServiceA.class.getName(), "1",
+            systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final ControllerServiceNode serviceNode2 = 
provider.createControllerService(ServiceA.class.getName(), "2",
+            systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final ControllerServiceNode serviceNode3 = 
provider.createControllerService(ServiceA.class.getName(), "3",
+            systemBundle.getBundleDetails().getCoordinate(), null, false);
+        final ControllerServiceNode serviceNode4 = 
provider.createControllerService(ServiceB.class.getName(), "4",
+            systemBundle.getBundleDetails().getCoordinate(), null, false);
+
+        procGroup.addControllerService(serviceNode1);
+        procGroup.addControllerService(serviceNode2);
+        procGroup.addControllerService(serviceNode3);
+        procGroup.addControllerService(serviceNode4);
+
+        setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
+        setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4");
+        setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
+        setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
+
+        provider.enableControllerService(serviceNode4);
+        provider.enableReferencingServices(serviceNode4);
+
+        // Verify that the services are either ENABLING or ENABLED, and wait 
for all of them to become ENABLED.
+        // Note that we set a timeout of 10 seconds, in case a bug occurs and 
the services never become ENABLED.
+        final Set<ControllerServiceState> validStates = new HashSet<>();
+        validStates.add(ControllerServiceState.ENABLED);
+        validStates.add(ControllerServiceState.ENABLING);
+
+        while (serviceNode3.getState() != ControllerServiceState.ENABLED || 
serviceNode2.getState() != ControllerServiceState.ENABLED || 
serviceNode1.getState() != ControllerServiceState.ENABLED) {
+            assertTrue(validStates.contains(serviceNode3.getState()));
+            assertTrue(validStates.contains(serviceNode2.getState()));
+            assertTrue(validStates.contains(serviceNode1.getState()));
+        }
+    }
+
+    private void setProperty(ControllerServiceNode serviceNode, String 
propName, String propValue) {
+        Map<String, String> props = new LinkedHashMap<>();
+        props.put(propName, propValue);
+        serviceNode.setProperties(props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14fef2de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index ed335e9..fac04a6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -21,14 +21,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.beans.PropertyDescriptor;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -48,6 +45,7 @@ import 
org.apache.nifi.controller.service.mock.MockProcessGroup;
 import org.apache.nifi.controller.service.mock.ServiceA;
 import org.apache.nifi.controller.service.mock.ServiceB;
 import org.apache.nifi.controller.service.mock.ServiceC;
+import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.StandardProcessGroup;
 import org.apache.nifi.nar.ExtensionManager;
@@ -130,7 +128,7 @@ public class TestStandardControllerServiceProvider {
     }
 
     private StandardProcessScheduler createScheduler() {
-        return new StandardProcessScheduler(null, null, stateManagerProvider, 
niFiProperties);
+        return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", 
true), null, null, stateManagerProvider, niFiProperties);
     }
 
     private void setProperty(ControllerServiceNode serviceNode, String 
propName, String propValue) {
@@ -205,75 +203,6 @@ public class TestStandardControllerServiceProvider {
         }
     }
 
-    /**
-     * We run the same test 1000 times and prior to bug fix (see NIFI-1143) it
-     * would fail on some iteration. For more details please see
-     * {@link PropertyDescriptor}.isDependentServiceEnableable() as well as
-     * https://issues.apache.org/jira/browse/NIFI-1143
-     */
-    @Test(timeout = 120000)
-    public void testConcurrencyWithEnablingReferencingServicesGraph() throws 
InterruptedException {
-        final StandardProcessScheduler scheduler = createScheduler();
-        for (int i = 0; i < 5000; i++) {
-            testEnableReferencingServicesGraph(scheduler);
-        }
-    }
-
-    public void testEnableReferencingServicesGraph(final 
StandardProcessScheduler scheduler) {
-        final ProcessGroup procGroup = new MockProcessGroup(controller);
-        final FlowController controller = Mockito.mock(FlowController.class);
-        
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
-
-        final StandardControllerServiceProvider provider =
-                new StandardControllerServiceProvider(controller, scheduler, 
null, stateManagerProvider, variableRegistry, niFiProperties);
-
-        // build a graph of controller services with dependencies as such:
-        //
-        // A -> B -> D
-        // C ---^----^
-        //
-        // In other words, A references B, which references D.
-        // AND
-        // C references B and D.
-        //
-        // So we have to verify that if D is enabled, when we enable its 
referencing services,
-        // we enable C and B, even if we attempt to enable C before B... i.e., 
if we try to enable C, we cannot do so
-        // until B is first enabled so ensure that we enable B first.
-        final ControllerServiceNode serviceNode1 = 
provider.createControllerService(ServiceA.class.getName(), "1",
-                systemBundle.getBundleDetails().getCoordinate(), null, false);
-        final ControllerServiceNode serviceNode2 = 
provider.createControllerService(ServiceA.class.getName(), "2",
-                systemBundle.getBundleDetails().getCoordinate(), null, false);
-        final ControllerServiceNode serviceNode3 = 
provider.createControllerService(ServiceA.class.getName(), "3",
-                systemBundle.getBundleDetails().getCoordinate(), null, false);
-        final ControllerServiceNode serviceNode4 = 
provider.createControllerService(ServiceB.class.getName(), "4",
-                systemBundle.getBundleDetails().getCoordinate(), null, false);
-
-        procGroup.addControllerService(serviceNode1);
-        procGroup.addControllerService(serviceNode2);
-        procGroup.addControllerService(serviceNode3);
-        procGroup.addControllerService(serviceNode4);
-
-        setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
-        setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4");
-        setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
-        setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
-
-        provider.enableControllerService(serviceNode4);
-        provider.enableReferencingServices(serviceNode4);
-
-        // Verify that the services are either ENABLING or ENABLED, and wait 
for all of them to become ENABLED.
-        // Note that we set a timeout of 10 seconds, in case a bug occurs and 
the services never become ENABLED.
-        final Set<ControllerServiceState> validStates = new HashSet<>();
-        validStates.add(ControllerServiceState.ENABLED);
-        validStates.add(ControllerServiceState.ENABLING);
-
-        while (serviceNode3.getState() != ControllerServiceState.ENABLED || 
serviceNode2.getState() != ControllerServiceState.ENABLED || 
serviceNode1.getState() != ControllerServiceState.ENABLED) {
-            assertTrue(validStates.contains(serviceNode3.getState()));
-            assertTrue(validStates.contains(serviceNode2.getState()));
-            assertTrue(validStates.contains(serviceNode1.getState()));
-        }
-    }
-
     @Test
     public void testOrderingOfServices() {
         final ProcessGroup procGroup = new MockProcessGroup(controller);

Reply via email to