Repository: nifi
Updated Branches:
  refs/heads/master a774f1df6 -> 8e6649ba1


NIFI-2776: This closes #2315. When joining a cluster, if a processor is 
stopping but cluster indicates that processor should be running, cause 
processor to start when its last thread finishes

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8e6649ba
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8e6649ba
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8e6649ba

Branch: refs/heads/master
Commit: 8e6649ba157f1389be4434f4f8e7fc81ae907ffc
Parents: a774f1d
Author: Mark Payne <[email protected]>
Authored: Mon Dec 4 13:29:24 2017 -0500
Committer: joewitt <[email protected]>
Committed: Thu Dec 7 15:45:18 2017 -0500

----------------------------------------------------------------------
 .../nifi/controller/ProcessScheduler.java       |   6 +-
 .../apache/nifi/controller/ProcessorNode.java   |  21 ++--
 .../controller/SchedulingAgentCallback.java     |   4 +-
 .../org/apache/nifi/groups/ProcessGroup.java    |   8 +-
 .../apache/nifi/controller/FlowController.java  |   8 +-
 .../controller/StandardFlowSynchronizer.java    |   2 +-
 .../nifi/controller/StandardProcessorNode.java  | 125 +++++++++++--------
 .../scheduling/StandardProcessScheduler.java    |  14 +--
 .../StandardControllerServiceProvider.java      |   2 +-
 .../nifi/groups/StandardProcessGroup.java       |   6 +-
 .../controller/TestStandardProcessorNode.java   |   6 +-
 .../scheduling/TestProcessorLifecycle.java      |  32 ++---
 .../TestStandardProcessScheduler.java           |   2 +-
 .../service/mock/MockProcessGroup.java          |   2 +-
 .../web/dao/impl/StandardProcessGroupDAO.java   |   2 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java |   2 +-
 16 files changed, 138 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index c6f30b5..4aa4066 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -40,9 +40,13 @@ public interface ProcessScheduler {
      * is already scheduled to run, does nothing.
      *
      * @param procNode to start
+     * @param failIfStopping If <code>false</code>, and the Processor is in 
the 'STOPPING' state,
+     *            then the Processor will automatically restart itself as soon 
as its last thread finishes. If this
+     *            value is <code>true</code> or if the Processor is in any 
state other than 'STOPPING' or 'RUNNING', then this method
+     *            will throw an {@link IllegalStateException}.
      * @throws IllegalStateException if the Processor is disabled
      */
-    Future<Void> startProcessor(ProcessorNode procNode);
+    Future<Void> startProcessor(ProcessorNode procNode, boolean 
failIfStopping);
 
     /**
      * Stops scheduling the given processor to run and invokes all methods on

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index ba2e59b..7aad6b4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -160,34 +160,37 @@ public abstract class ProcessorNode extends 
AbstractConfiguredComponent implemen
      * @param administrativeYieldMillis
      *            the amount of milliseconds to wait for administrative yield
      * @param processContext
-     *            the instance of {@link ProcessContext} and
-     *            {@link ControllerServiceLookup}
+     *            the instance of {@link ProcessContext}
      * @param schedulingAgentCallback
      *            the callback provided by the {@link ProcessScheduler} to
      *            execute upon successful start of the Processor
+     * @param failIfStopping If <code>false</code>, and the Processor is in 
the 'STOPPING' state,
+     *            then the Processor will automatically restart itself as soon 
as its last thread finishes. If this
+     *            value is <code>true</code> or if the Processor is in any 
state other than 'STOPPING' or 'RUNNING', then this method
+     *            will throw an {@link IllegalStateException}.
      */
-    public abstract <T extends ProcessContext & ControllerServiceLookup> void 
start(ScheduledExecutorService scheduler,
-            long administrativeYieldMillis, T processContext, 
SchedulingAgentCallback schedulingAgentCallback);
+    public abstract void start(ScheduledExecutorService scheduler,
+        long administrativeYieldMillis, ProcessContext processContext, 
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping);
 
     /**
      * Will stop the {@link Processor} represented by this {@link 
ProcessorNode}.
      * Stopping processor typically means invoking its operation that is
      * annotated with @OnUnschedule and then @OnStopped.
      *
-     * @param scheduler
+     * @param processScheduler the ProcessScheduler that can be used to 
re-schedule the processor if need be
+     * @param executor
      *            implementation of {@link ScheduledExecutorService} used to
      *            initiate processor <i>stop</i> task
      * @param processContext
-     *            the instance of {@link ProcessContext} and
-     *            {@link ControllerServiceLookup}
+     *            the instance of {@link ProcessContext}
      * @param schedulingAgent
      *            the SchedulingAgent that is responsible for managing the 
scheduling of the ProcessorNode
      * @param scheduleState
      *            the ScheduleState that can be used to ensure that the 
running state (STOPPED, RUNNING, etc.)
      *            as well as the active thread counts are kept in sync
      */
-    public abstract <T extends ProcessContext & ControllerServiceLookup> 
CompletableFuture<Void> stop(ScheduledExecutorService scheduler,
-        T processContext, SchedulingAgent schedulingAgent, ScheduleState 
scheduleState);
+    public abstract CompletableFuture<Void> stop(ProcessScheduler 
processScheduler, ScheduledExecutorService executor,
+        ProcessContext processContext, SchedulingAgent schedulingAgent, 
ScheduleState scheduleState);
 
     /**
      * Will set the state of the processor to STOPPED which essentially implies

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
index 9d66e38..31a8745 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
@@ -20,9 +20,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
 public interface SchedulingAgentCallback {
-    void postMonitor();
+    void onTaskComplete();
 
-    Future<?> invokeMonitoringTask(Callable<?> task);
+    Future<?> scheduleTask(Callable<?> task);
 
     void trigger();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index bf789f7..0baba23 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -161,10 +161,14 @@ public interface ProcessGroup extends 
ComponentAuthorizable, Positionable {
      * Starts the given Processor
      *
      * @param processor the processor to start
+     * @param failIfStopping If <code>false</code>, and the Processor is in 
the 'STOPPING' state,
+     *            then the Processor will automatically restart itself as soon 
as its last thread finishes. If this
+     *            value is <code>true</code> or if the Processor is in any 
state other than 'STOPPING' or 'RUNNING', then this method
+     *            will throw an {@link IllegalStateException}.
      * @throws IllegalStateException if the processor is not valid, or is
-     * already running
+     *             already running
      */
-    CompletableFuture<Void> startProcessor(ProcessorNode processor);
+    CompletableFuture<Void> startProcessor(ProcessorNode processor, boolean 
failIfStopping);
 
     /**
      * Starts the given Input Port

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 49c1789..56b2590 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -805,7 +805,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
                     try {
                         if (connectable instanceof ProcessorNode) {
-                            
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+                            
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
                         } else {
                             startConnectable(connectable);
                         }
@@ -2984,6 +2984,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     public void startProcessor(final String parentGroupId, final String 
processorId) {
+        startProcessor(parentGroupId, processorId, true);
+    }
+
+    public void startProcessor(final String parentGroupId, final String 
processorId, final boolean failIfStopping) {
         final ProcessGroup group = lookupGroup(parentGroupId);
         final ProcessorNode node = group.getProcessor(processorId);
         if (node == null) {
@@ -2993,7 +2997,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         writeLock.lock();
         try {
             if (initialized.get()) {
-                group.startProcessor(node);
+                group.startProcessor(node, failIfStopping);
             } else {
                 startConnectablesAfterInitialization.add(node);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 3d07456..3a0b093 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -761,7 +761,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                         case RUNNING:
                             // we want to run now. Make sure processor is not 
disabled and then start it.
                             
procNode.getProcessGroup().enableProcessor(procNode);
-                            
controller.startProcessor(procNode.getProcessGroupIdentifier(), 
procNode.getIdentifier());
+                            
controller.startProcessor(procNode.getProcessGroupIdentifier(), 
procNode.getIdentifier(), false);
                             break;
                         case STOPPED:
                             if (procNode.getScheduledState() == 
ScheduledState.DISABLED) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 36cb62e..88912aa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -130,6 +130,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private long runNanos = 0L;
     private volatile long yieldNanos;
     private final NiFiProperties nifiProperties;
+    private volatile ScheduledState desiredState;
 
     private SchedulingStrategy schedulingStrategy; // guarded by read/write 
lock
                                                    // ??????? NOT any more
@@ -1281,68 +1282,81 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
      * </p>
      */
     @Override
-    public <T extends ProcessContext & ControllerServiceLookup> void 
start(final ScheduledExecutorService taskScheduler,
-            final long administrativeYieldMillis, final T processContext, 
final SchedulingAgentCallback schedulingAgentCallback) {
+    public void start(final ScheduledExecutorService taskScheduler, final long 
administrativeYieldMillis, final ProcessContext processContext,
+            final SchedulingAgentCallback schedulingAgentCallback, final 
boolean failIfStopping) {
+
         if (!this.isValid()) {
             throw new IllegalStateException( "Processor " + this.getName() + " 
is not in a valid state due to " + this.getValidationErrors());
         }
         final Processor processor = processorRef.get().getProcessor();
         final ComponentLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
-        final boolean starting;
+        ScheduledState currentState;
+        boolean starting;
         synchronized (this) {
-            starting = 
this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.STARTING);
+            currentState = this.scheduledState.get();
+
+            if (currentState == ScheduledState.STOPPED) {
+                starting = 
this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.STARTING);
+                if (starting) {
+                    desiredState = ScheduledState.RUNNING;
+                }
+            } else if (currentState == ScheduledState.STOPPING && 
!failIfStopping) {
+                desiredState = ScheduledState.RUNNING;
+                return;
+            } else {
+                starting = false;
+            }
         }
 
         if (starting) { // will ensure that the Processor represented by this 
node can only be started once
-            final Runnable startProcRunnable = new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        invokeTaskAsCancelableFuture(schedulingAgentCallback, 
new Callable<Void>() {
-                            @Override
-                            public Void call() throws Exception {
-                                try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
-                                    
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, 
processContext);
-                                    return null;
-                                }
-                            }
-                        });
+            taskScheduler.execute(() -> initiateStart(taskScheduler, 
administrativeYieldMillis, processContext, schedulingAgentCallback));
+        } else {
+            final String procName = processorRef.get().toString();
+            LOG.warn("Cannot start {} because it is not currently stopped. 
Current state is {}", procName, currentState);
+            procLog.warn("Cannot start {} because it is not currently stopped. 
Current state is {}", new Object[] {procName, currentState});
+        }
+    }
 
-                        if 
(scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) 
{
-                            schedulingAgentCallback.trigger(); // callback 
provided by StandardProcessScheduler to essentially initiate component's 
onTrigger() cycle
-                        } else { // can only happen if stopProcessor was 
called before service was transitioned to RUNNING state
-                            try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
-                                
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
-                            }
-                            scheduledState.set(ScheduledState.STOPPED);
-                        }
-                    } catch (final Exception e) {
-                        final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
-                        procLog.error("{} failed to invoke @OnScheduled method 
due to {}; processor will not be scheduled to run for {} seconds",
-                                new 
Object[]{StandardProcessorNode.this.getProcessor(), cause, 
administrativeYieldMillis / 1000L}, cause);
-                        LOG.error("Failed to invoke @OnScheduled method due to 
{}", cause.toString(), cause);
+    private void initiateStart(final ScheduledExecutorService taskScheduler, 
final long administrativeYieldMillis,
+            final ProcessContext processContext, final SchedulingAgentCallback 
schedulingAgentCallback) {
 
-                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
-                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
+        final Processor processor = getProcessor();
+        final ComponentLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
-                        if (scheduledState.get() != ScheduledState.STOPPING) { 
// make sure we only continue retry loop if STOP action wasn't initiated
-                            taskScheduler.schedule(this, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
-                        } else {
-                            scheduledState.set(ScheduledState.STOPPED);
-                        }
+        try {
+            invokeTaskAsCancelableFuture(schedulingAgentCallback, new 
Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
+                        
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, 
processContext);
+                        return null;
                     }
                 }
-            };
-            taskScheduler.execute(startProcRunnable);
-        } else {
-            final String procName = processorRef.getClass().getSimpleName();
-            LOG.warn("Can not start '" + procName
-                    + "' since it's already in the process of being started or 
it is DISABLED - "
-                    + scheduledState.get());
-            procLog.warn("Can not start '" + procName
-                    + "' since it's already in the process of being started or 
it is DISABLED - "
-                    + scheduledState.get());
+            });
+
+            if (scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.RUNNING)) {
+                schedulingAgentCallback.trigger(); // callback provided by 
StandardProcessScheduler to essentially initiate component's onTrigger() cycle
+            } else { // can only happen if stopProcessor was called before 
service was transitioned to RUNNING state
+                try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
+                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                }
+                scheduledState.set(ScheduledState.STOPPED);
+            }
+        } catch (final Exception e) {
+            final Throwable cause = e instanceof InvocationTargetException ? 
e.getCause() : e;
+            procLog.error("{} failed to invoke @OnScheduled method due to {}; 
processor will not be scheduled to run for {} seconds",
+                    new Object[]{StandardProcessorNode.this.getProcessor(), 
cause, administrativeYieldMillis / 1000L}, cause);
+            LOG.error("Failed to invoke @OnScheduled method due to {}", 
cause.toString(), cause);
+
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
+
+            if (scheduledState.get() != ScheduledState.STOPPING) { // make 
sure we only continue retry loop if STOP action wasn't initiated
+                taskScheduler.schedule(() -> initiateStart(taskScheduler, 
administrativeYieldMillis, processContext, schedulingAgentCallback), 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
+            } else {
+                scheduledState.set(ScheduledState.STOPPED);
+            }
         }
     }
 
@@ -1373,11 +1387,12 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
      * </p>
      */
     @Override
-    public <T extends ProcessContext & ControllerServiceLookup> 
CompletableFuture<Void> stop(final ScheduledExecutorService scheduler,
-        final T processContext, final SchedulingAgent schedulingAgent, final 
ScheduleState scheduleState) {
+    public CompletableFuture<Void> stop(final ProcessScheduler 
processScheduler, final ScheduledExecutorService executor, final ProcessContext 
processContext,
+            final SchedulingAgent schedulingAgent, final ScheduleState 
scheduleState) {
 
         final Processor processor = processorRef.get().getProcessor();
         LOG.info("Stopping processor: " + processor.getClass());
+        desiredState = ScheduledState.STOPPED;
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
         if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, 
ScheduledState.STOPPING)) { // will ensure that the Processor represented by 
this node can only be stopped once
@@ -1385,7 +1400,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
             // will continue to monitor active threads, invoking OnStopped 
once there are no
             // active threads (with the exception of the thread performing 
shutdown operations)
-            scheduler.execute(new Runnable() {
+            executor.execute(new Runnable() {
                 @Override
                 public void run() {
                     try {
@@ -1407,9 +1422,13 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                             scheduleState.decrementActiveThreadCount();
                             scheduledState.set(ScheduledState.STOPPED);
                             future.complete(null);
+
+                            if (desiredState == ScheduledState.RUNNING) {
+                                
processScheduler.startProcessor(StandardProcessorNode.this, true);
+                            }
                         } else {
                             // Not all of the active threads have finished. 
Try again in 100 milliseconds.
-                            scheduler.schedule(this, 100, 
TimeUnit.MILLISECONDS);
+                            executor.schedule(this, 100, 
TimeUnit.MILLISECONDS);
                         }
                     } catch (final Exception e) {
                         LOG.warn("Failed while shutting down processor " + 
processor, e);
@@ -1461,7 +1480,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         final String timeoutString = 
nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
         final long onScheduleTimeout = timeoutString == null ? 60000
                 : FormatUtils.getTimeDuration(timeoutString.trim(), 
TimeUnit.MILLISECONDS);
-        final Future<?> taskFuture = callback.invokeMonitoringTask(task);
+        final Future<?> taskFuture = callback.scheduleTask(task);
         try {
             taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
         } catch (final InterruptedException e) {
@@ -1482,7 +1501,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         } catch (final ExecutionException e){
             throw new RuntimeException("Failed while executing one of 
processor's OnScheduled task.", e);
         } finally {
-            callback.postMonitor();
+            callback.onTaskComplete();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index c7f1581..d08d701 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -297,13 +297,13 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
      * @see StandardProcessorNode#start(ScheduledExecutorService, long, 
org.apache.nifi.processor.ProcessContext, Runnable)
      */
     @Override
-    public synchronized CompletableFuture<Void> startProcessor(final 
ProcessorNode procNode) {
-        StandardProcessContext processContext = new 
StandardProcessContext(procNode, this.controllerServiceProvider,
+    public synchronized CompletableFuture<Void> startProcessor(final 
ProcessorNode procNode, final boolean failIfStopping) {
+        final StandardProcessContext processContext = new 
StandardProcessContext(procNode, this.controllerServiceProvider,
             this.encryptor, getStateManager(procNode.getIdentifier()));
         final ScheduleState scheduleState = 
getScheduleState(requireNonNull(procNode));
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
-        SchedulingAgentCallback callback = new SchedulingAgentCallback() {
+        final SchedulingAgentCallback callback = new SchedulingAgentCallback() 
{
             @Override
             public void trigger() {
                 getSchedulingAgent(procNode).schedule(procNode, scheduleState);
@@ -311,19 +311,19 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             }
 
             @Override
-            public Future<?> invokeMonitoringTask(Callable<?> task) {
+            public Future<?> scheduleTask(Callable<?> task) {
                 scheduleState.incrementActiveThreadCount();
                 return componentMonitoringThreadPool.submit(task);
             }
 
             @Override
-            public void postMonitor() {
+            public void onTaskComplete() {
                 scheduleState.decrementActiveThreadCount();
             }
         };
 
         LOG.info("Starting {}", procNode);
-        procNode.start(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, processContext, callback);
+        procNode.start(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, processContext, callback, failIfStopping);
         return future;
     }
 
@@ -341,7 +341,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         final ScheduleState state = getScheduleState(procNode);
 
         LOG.info("Stopping {}", procNode);
-        return procNode.stop(this.componentLifeCycleThreadPool, 
processContext, getSchedulingAgent(procNode), state);
+        return procNode.stop(this, this.componentLifeCycleThreadPool, 
processContext, getSchedulingAgent(procNode), state);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 02e190a..48ad849 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -285,7 +285,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         // start all of the components that are not disabled
         for (final ProcessorNode node : processors) {
             if (node.getScheduledState() != ScheduledState.DISABLED) {
-                node.getProcessGroup().startProcessor(node);
+                node.getProcessGroup().startProcessor(node, true);
                 updated.add(node);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 1754cf7..ec32cc1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -320,7 +320,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         try {
             
findAllProcessors().stream().filter(SCHEDULABLE_PROCESSORS).forEach(node -> {
                 try {
-                    node.getProcessGroup().startProcessor(node);
+                    node.getProcessGroup().startProcessor(node, true);
                 } catch (final Throwable t) {
                     LOG.error("Unable to start processor {} due to {}", new 
Object[]{node.getIdentifier(), t});
                 }
@@ -1092,7 +1092,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     @Override
-    public CompletableFuture<Void> startProcessor(final ProcessorNode 
processor) {
+    public CompletableFuture<Void> startProcessor(final ProcessorNode 
processor, final boolean failIfStopping) {
         readLock.lock();
         try {
             if (getProcessor(processor.getIdentifier()) == null) {
@@ -1106,7 +1106,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 return CompletableFuture.completedFuture(null);
             }
 
-            return scheduler.startProcessor(processor);
+            return scheduler.startProcessor(processor, failIfStopping);
         } finally {
             readLock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index 33c33c9..7b39963 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -105,11 +105,11 @@ public class TestStandardProcessorNode {
         final StandardProcessContext processContext = new 
StandardProcessContext(procNode, null, null, null);
         final SchedulingAgentCallback schedulingAgentCallback = new 
SchedulingAgentCallback() {
             @Override
-            public void postMonitor() {
+            public void onTaskComplete() {
             }
 
             @Override
-            public Future<?> invokeMonitoringTask(final Callable<?> task) {
+            public Future<?> scheduleTask(final Callable<?> task) {
                 return taskScheduler.submit(task);
             }
 
@@ -119,7 +119,7 @@ public class TestStandardProcessorNode {
             }
         };
 
-        procNode.start(taskScheduler, 20000L, processContext, 
schedulingAgentCallback);
+        procNode.start(taskScheduler, 20000L, processContext, 
schedulingAgentCallback, true);
 
         Thread.sleep(1000L);
         assertEquals(1, processor.onScheduledCount);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index f8f0426..b55e98d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -160,7 +160,7 @@ public class TestProcessorLifecycle {
         assertCondition(() -> ScheduledState.DISABLED == 
testProcNode.getPhysicalScheduledState());
 
         ProcessScheduler ps = fc.getProcessScheduler();
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.DISABLED == 
testProcNode.getPhysicalScheduledState());
     }
 
@@ -184,9 +184,9 @@ public class TestProcessorLifecycle {
         this.noop(testProcessor);
         final ProcessScheduler ps = fc.getProcessScheduler();
 
-        ps.startProcessor(testProcNode);
-        ps.startProcessor(testProcNode);
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
+        ps.startProcessor(testProcNode, true);
+        ps.startProcessor(testProcNode, true);
 
         Thread.sleep(500);
         assertCondition(() -> testProcessor.operationNames.size() == 1);
@@ -302,7 +302,7 @@ public class TestProcessorLifecycle {
                 @Override
                 public void run() {
                     LockSupport.parkNanos(random.nextInt(9000000));
-                    ps.startProcessor(testProcNode);
+                    ps.startProcessor(testProcNode, true);
                     countDownCounter.countDown();
                 }
             });
@@ -342,7 +342,7 @@ public class TestProcessorLifecycle {
         this.longRunningOnSchedule(testProcessor, delay);
         ProcessScheduler ps = fc.getProcessScheduler();
 
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), 5000L);
 
         ps.stopProcessor(testProcNode);
@@ -375,7 +375,7 @@ public class TestProcessorLifecycle {
         testProcessor.keepFailingOnScheduledTimes = 2;
         ProcessScheduler ps = fc.getProcessScheduler();
 
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), 10000L);
         ps.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), 2000L);
@@ -404,7 +404,7 @@ public class TestProcessorLifecycle {
         testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
         ProcessScheduler ps = fc.getProcessScheduler();
 
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), 2000L);
         ps.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), 2000L);
@@ -429,7 +429,7 @@ public class TestProcessorLifecycle {
         this.blockingInterruptableOnUnschedule(testProcessor);
         ProcessScheduler ps = fc.getProcessScheduler();
 
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), 2000L);
         ps.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), 5000L);
@@ -454,7 +454,7 @@ public class TestProcessorLifecycle {
         this.blockingUninterruptableOnUnschedule(testProcessor);
         ProcessScheduler ps = fc.getProcessScheduler();
 
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), 3000L);
         ps.stopProcessor(testProcNode);
         assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), 4000L);
@@ -481,7 +481,7 @@ public class TestProcessorLifecycle {
         testProcessor.generateExceptionOnTrigger = true;
         ProcessScheduler ps = fc.getProcessScheduler();
 
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), 2000L);
         ps.disableProcessor(testProcNode);
         assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), 2000L);
@@ -503,7 +503,7 @@ public class TestProcessorLifecycle {
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
                 fcsb.getSystemBundle().getBundleDetails().getCoordinate());
         ProcessScheduler ps = fc.getProcessScheduler();
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         fail();
     }
 
@@ -531,7 +531,7 @@ public class TestProcessorLifecycle {
         testProcessor.withService = true;
 
         ProcessScheduler ps = fc.getProcessScheduler();
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
         fail();
     }
 
@@ -563,7 +563,7 @@ public class TestProcessorLifecycle {
 
         ProcessScheduler ps = fc.getProcessScheduler();
         ps.enableControllerService(testServiceNode);
-        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode, true);
 
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
@@ -598,8 +598,8 @@ public class TestProcessorLifecycle {
         testGroup.addConnection(connection);
 
         ProcessScheduler ps = fc.getProcessScheduler();
-        ps.startProcessor(testProcNodeA);
-        ps.startProcessor(testProcNodeB);
+        ps.startProcessor(testProcNodeA, true);
+        ps.startProcessor(testProcNodeB, true);
 
         try {
             testGroup.removeProcessor(testProcNodeA);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 0c4acd8..314738a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -206,7 +206,7 @@ public class TestStandardProcessScheduler {
         procNode.setProperties(procProps);
 
         scheduler.enableControllerService(service);
-        scheduler.startProcessor(procNode);
+        scheduler.startProcessor(procNode, true);
 
         Thread.sleep(1000L);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 9725ed8..a28eb34 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -149,7 +149,7 @@ public class MockProcessGroup implements ProcessGroup {
     }
 
     @Override
-    public CompletableFuture<Void> startProcessor(final ProcessorNode 
processor) {
+    public CompletableFuture<Void> startProcessor(final ProcessorNode 
processor, final boolean failIfStopping) {
         return CompletableFuture.completedFuture(null);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 258af72..ec584de 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -135,7 +135,7 @@ public class StandardProcessGroupDAO extends ComponentDAO 
implements ProcessGrou
             final Connectable connectable = 
group.findLocalConnectable(componentId);
             if (ScheduledState.RUNNING.equals(state)) {
                 if 
(ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
-                    final CompletableFuture<?> processorFuture = 
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+                    final CompletableFuture<?> processorFuture = 
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
                     future = CompletableFuture.allOf(future, processorFuture);
                 } else if 
(ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
                     connectable.getProcessGroup().startInputPort((Port) 
connectable);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e6649ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index 429592c..ffbe21c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -426,7 +426,7 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
                     // perform the appropriate action
                     switch (purposedScheduledState) {
                         case RUNNING:
-                            parentGroup.startProcessor(processor);
+                            parentGroup.startProcessor(processor, true);
                             break;
                         case STOPPED:
                             switch (processor.getScheduledState()) {

Reply via email to