Repository: nifi
Updated Branches:
  refs/heads/master 56f79e1e8 -> c7df94e00


NIFI-1464 life-cycle refactoring part-2

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c7df94e0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c7df94e0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c7df94e0

Branch: refs/heads/master
Commit: c7df94e00f5ac456b2ca094841ffee9325c2ea53
Parents: 56f79e1
Author: Oleg Zhurakousky <[email protected]>
Authored: Mon Mar 14 13:29:41 2016 -0400
Committer: joewitt <[email protected]>
Committed: Mon Mar 14 17:13:55 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/ProcessorNode.java   |  2 +-
 .../controller/SchedulingAgentCallback.java     | 28 ++++++++
 .../nifi/controller/StandardProcessorNode.java  | 71 ++++++++++----------
 .../scheduling/AbstractSchedulingAgent.java     |  7 ++
 .../scheduling/EventDrivenSchedulingAgent.java  |  3 +-
 .../scheduling/QuartzSchedulingAgent.java       |  3 +-
 .../scheduling/StandardProcessScheduler.java    | 27 ++++++--
 .../scheduling/TimerDrivenSchedulingAgent.java  |  3 +-
 8 files changed, 98 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 6c37418..7813530 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -155,7 +155,7 @@ public abstract class ProcessorNode extends 
AbstractConfiguredComponent implemen
      *            execute upon successful start of the Processor
      */
     public abstract <T extends ProcessContext & ControllerServiceLookup> void 
start(ScheduledExecutorService scheduler,
-            long administrativeYieldMillis, T processContext, Runnable 
schedulingAgentCallback);
+            long administrativeYieldMillis, T processContext, 
SchedulingAgentCallback schedulingAgentCallback);
 
     /**
      * Will stop the {@link Processor} represented by this {@link 
ProcessorNode}.

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
new file mode 100644
index 0000000..9d66e38
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/SchedulingAgentCallback.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+public interface SchedulingAgentCallback {
+    void postMonitor();
+
+    Future<?> invokeMonitoringTask(Callable<?> task);
+
+    void trigger();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 9a6eba5..fa7bd30 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1231,41 +1231,41 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
      */
     @Override
     public <T extends ProcessContext & ControllerServiceLookup> void 
start(final ScheduledExecutorService taskScheduler,
-            final long administrativeYieldMillis, final T processContext, 
final Runnable schedulingAgentCallback) {
+            final long administrativeYieldMillis, final T processContext, 
final SchedulingAgentCallback schedulingAgentCallback) {
         if (!this.isValid()) {
             throw new IllegalStateException( "Processor " + this.getName() + " 
is not in a valid state due to " + this.getValidationErrors());
         }
-
+        final ProcessorLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
         if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.STARTING)) { // will ensure that the Processor represented by 
this node can only be started once
             final Runnable startProcRunnable = new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        final SchedulingContext schedulingContext = new 
StandardSchedulingContext(processContext, getControllerServiceProvider(),
-                                StandardProcessorNode.this, 
processContext.getStateManager());
-                        invokeTaskAsCancelableFuture(taskScheduler, new 
Callable<Void>() {
-                            @SuppressWarnings("deprecation")
+                        invokeTaskAsCancelableFuture(schedulingAgentCallback, 
new Callable<Void>() {
                             @Override
                             public Void call() throws Exception {
                                 try (final NarCloseable nc = 
NarCloseable.withNarLoader()) {
-                                    
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, 
org.apache.nifi.processor.annotation.OnScheduled.class, processor, 
schedulingContext);
+                                    SchedulingContext schedulingContext = new 
StandardSchedulingContext(processContext,
+                                            getControllerServiceProvider(), 
StandardProcessorNode.this,
+                                            processContext.getStateManager());
+                                    
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class,
+                                            
org.apache.nifi.processor.annotation.OnScheduled.class, processor,
+                                            schedulingContext);
                                     return null;
                                 }
                             }
                         });
+
                         if 
(scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) 
{
-                            schedulingAgentCallback.run(); // callback 
provided by StandardProcessScheduler to essentially initiate component's 
onTrigger() cycle
+                            schedulingAgentCallback.trigger(); // callback 
provided by StandardProcessScheduler to essentially initiate component's 
onTrigger() cycle
                         } else { // can only happen if stopProcessor was 
called before service was transitioned to RUNNING state
                             try (final NarCloseable nc = 
NarCloseable.withNarLoader()) {
                                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
                             }
-
                             scheduledState.set(ScheduledState.STOPPED);
                         }
                     } catch (final Exception e) {
                         final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
-                        final ProcessorLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
-
                         procLog.error( "{} failed to invoke @OnScheduled 
method due to {}; processor will not be scheduled to run for {}",
                                 new Object[] { 
StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis + " 
milliseconds" }, cause);
                         LOG.error("Failed to invoke @OnScheduled method due to 
{}", cause.toString(), cause);
@@ -1281,7 +1281,13 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             };
             taskScheduler.execute(startProcRunnable);
         } else {
-            LOG.warn("Can not start Processor since it's already in the 
process of being started or it is DISABLED");
+            String procName = this.processor.getClass().getSimpleName();
+            LOG.warn("Can not start '" + procName
+                    + "' since it's already in the process of being started or 
it is DISABLED - "
+                    + scheduledState.get());
+            procLog.warn("Can not start '" + procName
+                    + "' since it's already in the process of being started or 
it is DISABLED - "
+                    + scheduledState.get());
         }
     }
 
@@ -1313,27 +1319,23 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
     @Override
     public <T extends ProcessContext & ControllerServiceLookup> void 
stop(final ScheduledExecutorService scheduler,
             final T processContext, final Callable<Boolean> 
activeThreadMonitorCallback) {
+        LOG.info("Stopping processor: " + this.processor.getClass());
         if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, 
ScheduledState.STOPPING)) { // will ensure that the Processor represented by 
this node can only be stopped once
-            invokeTaskAsCancelableFuture(scheduler, new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    try (final NarCloseable nc = NarCloseable.withNarLoader()) 
{
-                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
-                        return null;
-                    }
-                }
-            });
             // will continue to monitor active threads, invoking OnStopped once
             // there are none
             scheduler.execute(new Runnable() {
+                boolean unscheduled = false;
                 @Override
                 public void run() {
+                    if (!this.unscheduled){
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                        this.unscheduled = true;
+                    }
                     try {
                         if (activeThreadMonitorCallback.call()) {
                             try (final NarCloseable nc = 
NarCloseable.withNarLoader()) {
                                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
                             }
-
                             scheduledState.set(ScheduledState.STOPPED);
                         } else {
                             scheduler.schedule(this, 100, 
TimeUnit.MILLISECONDS);
@@ -1382,31 +1384,32 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
      * be logged (WARN) informing a user so further actions could be taken.
      * </p>
      */
-    private void invokeTaskAsCancelableFuture(ScheduledExecutorService 
taskScheduler, Callable<Void> task) {
-        Future<Void> executionResult = taskScheduler.submit(task);
-
+    private <T> void invokeTaskAsCancelableFuture(SchedulingAgentCallback 
callback, Callable<T> task) {
         String timeoutString = 
NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
-        long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE
+        long onScheduleTimeout = timeoutString == null ? 60000
                 : FormatUtils.getTimeDuration(timeoutString.trim(), 
TimeUnit.MILLISECONDS);
-
+        Future<?> taskFuture = callback.invokeMonitoringTask(task);
         try {
-            executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
+            taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             LOG.warn("Thread was interrupted while waiting for processor '" + 
this.processor.getClass().getSimpleName()
-                    + "' lifecycle operation (OnScheduled or OnUnscheduled) to 
finish.");
+                    + "' lifecycle OnScheduled operation to finish.");
             Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted while executing one of 
processor's OnScheduled tasks.", e);
         } catch (TimeoutException e) {
-            executionResult.cancel(true);
-            LOG.warn("Timed out while waiting for lifecycle operation 
(OnScheduled or OnUnscheduled) of '"
+            taskFuture.cancel(true);
+            LOG.warn("Timed out while waiting for OnScheduled of '"
                     + this.processor.getClass().getSimpleName()
                     + "' processor to finish. An attempt is made to cancel the 
task via Thread.interrupt(). However it does not "
-                    + "guarantee that the task will be canceled since the code 
inside current lifecycle operation (OnScheduled or OnUnscheduled) may "
+                    + "guarantee that the task will be canceled since the code 
inside current OnScheduled operation may "
                     + "have been written to ignore interrupts which may result 
in runaway thread which could lead to more issues "
                     + "eventually requiring NiFi to be restarted. This is 
usually a bug in the target Processor '"
                     + this.processor + "' that needs to be documented, 
reported and eventually fixed.");
+            throw new RuntimeException("Timed out while executing one of 
processor's OnScheduled task.", e);
         } catch (ExecutionException e){
-            throw new RuntimeException(
-                    "Failed while executing one of processor's lifecycle tasks 
(OnScheduled or OnUnscheduled).", e);
+            throw new RuntimeException("Failed while executing one of 
processor's OnScheduled task.", e);
+        } finally {
+            callback.postMonitor();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
index 3544dac..8f36e1e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.scheduling;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.engine.FlowEngine;
 
 /**
  * Base implementation of the {@link SchedulingAgent} which encapsulates the
@@ -33,6 +34,12 @@ import org.apache.nifi.controller.ReportingTaskNode;
  */
 abstract class AbstractSchedulingAgent implements SchedulingAgent {
 
+    protected final FlowEngine flowEngine;
+
+    protected AbstractSchedulingAgent(FlowEngine flowEngine) {
+        this.flowEngine = flowEngine;
+    }
+
     @Override
     public void schedule(Connectable connectable, ScheduleState scheduleState) 
{
         scheduleState.setScheduled(true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 37cab01..228af70 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -55,7 +55,6 @@ public class EventDrivenSchedulingAgent extends 
AbstractSchedulingAgent {
 
     private static final Logger logger = 
LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
 
-    private final FlowEngine flowEngine;
     private final ControllerServiceProvider serviceProvider;
     private final StateManagerProvider stateManagerProvider;
     private final EventDrivenWorkerQueue workerQueue;
@@ -70,7 +69,7 @@ public class EventDrivenSchedulingAgent extends 
AbstractSchedulingAgent {
 
     public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final 
ControllerServiceProvider serviceProvider, final StateManagerProvider 
stateManagerProvider,
         final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory 
contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
-        this.flowEngine = flowEngine;
+        super(flowEngine);
         this.serviceProvider = serviceProvider;
         this.stateManagerProvider = stateManagerProvider;
         this.workerQueue = workerQueue;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 31c1ca4..3f19d28 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -49,16 +49,15 @@ public class QuartzSchedulingAgent extends 
AbstractSchedulingAgent {
 
     private final FlowController flowController;
     private final ProcessContextFactory contextFactory;
-    private final FlowEngine flowEngine;
     private final StringEncryptor encryptor;
 
     private volatile String adminYieldDuration = "1 sec";
     private final Map<Object, List<AtomicBoolean>> canceledTriggers = new 
HashMap<>();
 
     public QuartzSchedulingAgent(final FlowController flowController, final 
FlowEngine flowEngine, final ProcessContextFactory contextFactory, final 
StringEncryptor enryptor) {
+        super(flowEngine);
         this.flowController = flowController;
         this.contextFactory = contextFactory;
-        this.flowEngine = flowEngine;
         this.encryptor = enryptor;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index bbaa23b..f7e968e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -42,6 +42,7 @@ import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.annotation.OnConfigured;
 import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -79,7 +80,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> 
strategyAgentMap = new ConcurrentHashMap<>();
     // thread pool for starting/stopping components
 
-    private final ScheduledExecutorService componentLifeCycleThreadPool = 
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+    private final ScheduledExecutorService componentLifeCycleThreadPool = new 
FlowEngine(8, "StandardProcessScheduler", true);
+    private final ScheduledExecutorService componentMonitoringThreadPool = new 
FlowEngine(8, "StandardProcessScheduler", true);
 
     private final StringEncryptor encryptor;
 
@@ -160,6 +162,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
         frameworkTaskExecutor.shutdown();
         componentLifeCycleThreadPool.shutdown();
+        componentMonitoringThreadPool.shutdown();
     }
 
     @Override
@@ -295,14 +298,27 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         StandardProcessContext processContext = new 
StandardProcessContext(procNode, this.controllerServiceProvider,
                 this.encryptor, getStateManager(procNode.getIdentifier()));
         final ScheduleState scheduleState = 
getScheduleState(requireNonNull(procNode));
-        Runnable schedulingAgentCallback = new Runnable() {
+
+        SchedulingAgentCallback callback = new SchedulingAgentCallback() {
             @Override
-            public void run() {
+            public void trigger() {
                 getSchedulingAgent(procNode).schedule(procNode, scheduleState);
                 heartbeater.heartbeat();
             }
+
+            @Override
+            public Future<?> invokeMonitoringTask(Callable<?> task) {
+                scheduleState.incrementActiveThreadCount();
+                return componentMonitoringThreadPool.submit(task);
+            }
+
+            @Override
+            public void postMonitor() {
+                scheduleState.decrementActiveThreadCount();
+            }
         };
-        procNode.start(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, processContext, schedulingAgentCallback);
+
+        procNode.start(this.componentLifeCycleThreadPool, 
this.administrativeYieldMillis, processContext, callback);
     }
 
     /**
@@ -317,6 +333,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         StandardProcessContext processContext = new 
StandardProcessContext(procNode, this.controllerServiceProvider,
                 this.encryptor, getStateManager(procNode.getIdentifier()));
         final ScheduleState state = getScheduleState(procNode);
+
         procNode.stop(this.componentLifeCycleThreadPool, processContext, new 
Callable<Boolean>() {
             @Override
             public Boolean call() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7df94e0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index 76c413f..0436e21 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -48,15 +48,14 @@ public class TimerDrivenSchedulingAgent extends 
AbstractSchedulingAgent {
     private final long noWorkYieldNanos;
 
     private final FlowController flowController;
-    private final FlowEngine flowEngine;
     private final ProcessContextFactory contextFactory;
     private final StringEncryptor encryptor;
 
     private volatile String adminYieldDuration = "1 sec";
 
     public TimerDrivenSchedulingAgent(final FlowController flowController, 
final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final 
StringEncryptor encryptor) {
+        super(flowEngine);
         this.flowController = flowController;
-        this.flowEngine = flowEngine;
         this.contextFactory = contextFactory;
         this.encryptor = encryptor;
 

Reply via email to