NIFI-5686: Updated StandardProcessScheduler so that if it fails to schedule a 
Reporting Task, it re-schedules the @OnScheduled task instead of looping and 
calling Thread.sleep. As it was, the single-threaded Process Scheduler was, 
when calling ProcessScheduler.unschedule(), the unschedule task was not 
executing because the schedule task was using the only thread. But switching 
the logic to schedule the task for later and return, instead of calling 
Thread.sleep and looping, we are able to avoid blocking the one thread in the 
thread pool. Also, performed some trivial code cleanup and updated erroneous 
links in Java-docs.

NIFI-5686: Fixed unit test in TestSocketLoadBalancedFlowFileQueue; renamed 
TestProcessorLifecycle to ProcessorLifecycleIT as it is testing integration 
between many components and largely focuses on high numbers of concurrent tasks 
to see if it can trigger any threading bugs that may get introduced

NIFI-5686: Extended unit test timeouts
Signed-off-by: Matthew Burgess <[email protected]>

This closes #3062


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32db43b3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32db43b3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32db43b3

Branch: refs/heads/master
Commit: 32db43b3069b3c982b9c96576c11b447c229b360
Parents: 218063a
Author: Mark Payne <[email protected]>
Authored: Thu Oct 11 11:45:41 2018 -0400
Committer: Matthew Burgess <[email protected]>
Committed: Mon Oct 15 15:21:05 2018 -0400

----------------------------------------------------------------------
 .../scheduling/StandardProcessScheduler.java    | 110 ++-
 .../TestSocketLoadBalancedFlowFileQueue.java    |  20 +-
 .../scheduling/ProcessorLifecycleIT.java        | 868 +++++++++++++++++++
 .../scheduling/TestProcessorLifecycle.java      | 868 -------------------
 .../TestStandardProcessScheduler.java           |  64 +-
 .../standard/TestHandleHttpRequest.java         |  69 +-
 6 files changed, 996 insertions(+), 1003 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/32db43b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index b23e763..0459372 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -16,19 +16,6 @@
  */
 package org.apache.nifi.controller.scheduling;
 
-import static java.util.Objects.requireNonNull;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -53,6 +40,7 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessContext;
@@ -64,9 +52,21 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
 /**
- * Responsible for scheduling Processors, Ports, and Funnels to run at regular
- * intervals
+ * Responsible for scheduling Processors, Ports, and Funnels to run at regular 
intervals
  */
 public final class StandardProcessScheduler implements ProcessScheduler {
 
@@ -196,44 +196,39 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                 final long lastStopTime = lifecycleState.getLastStopTime();
                 final ReportingTask reportingTask = 
taskNode.getReportingTask();
 
-                // Continually attempt to start the Reporting Task, and if we 
fail sleep for a bit each time.
-                while (true) {
-                    try {
-                        synchronized (lifecycleState) {
-                            // if no longer scheduled to run, then we're 
finished. This can happen, for example,
-                            // if the @OnScheduled method throws an Exception 
and the user stops the reporting task
-                            // while we're administratively yielded.
-                            // we also check if the schedule state's last 
start time is equal to what it was before.
-                            // if not, then means that the reporting task has 
been stopped and started again, so we should just
-                            // bail; another thread will be responsible for 
invoking the @OnScheduled methods.
-                            if (!lifecycleState.isScheduled() || 
lifecycleState.getLastStopTime() != lastStopTime) {
-                                return;
-                            }
-
-                            try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(reportingTask.getClass(), 
reportingTask.getIdentifier())) {
-                                
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, 
taskNode.getConfigurationContext());
-                            }
-
-                            agent.schedule(taskNode, lifecycleState);
+                // Attempt to start the Reporting Task, and if we fail 
re-schedule the task again after #administrativeYielMillis milliseconds
+                try {
+                    synchronized (lifecycleState) {
+                        // if no longer scheduled to run, then we're finished. 
This can happen, for example,
+                        // if the @OnScheduled method throws an Exception and 
the user stops the reporting task
+                        // while we're administratively yielded.
+                        // we also check if the schedule state's last start 
time is equal to what it was before.
+                        // if not, then means that the reporting task has been 
stopped and started again, so we should just
+                        // bail; another thread will be responsible for 
invoking the @OnScheduled methods.
+                        if (!lifecycleState.isScheduled() || 
lifecycleState.getLastStopTime() != lastStopTime) {
+                            LOG.debug("Did not complete invocation of 
@OnScheduled task for {} but Lifecycle State is no longer scheduled. Will not 
attempt to invoke task anymore", reportingTask);
                             return;
                         }
-                    } catch (final Exception e) {
-                        final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
-                        final ComponentLog componentLog = new 
SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
-                        componentLog.error("Failed to invoke @OnEnabled method 
due to {}", cause);
-
-                        LOG.error("Failed to invoke the On-Scheduled Lifecycle 
methods of {} due to {}; administratively yielding this "
-                                + "ReportingTask and will attempt to schedule 
it again after {}",
-                                new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
 
-                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
reportingTask, taskNode.getConfigurationContext());
-                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, taskNode.getConfigurationContext());
-
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
+                        try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(reportingTask.getClass(), 
reportingTask.getIdentifier())) {
+                            
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, 
taskNode.getConfigurationContext());
                         }
+
+                        agent.schedule(taskNode, lifecycleState);
                     }
+                } catch (final Exception e) {
+                    final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
+                    final ComponentLog componentLog = new 
SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+                    componentLog.error("Failed to invoke @OnScheduled method 
due to {}", cause);
+
+                    LOG.error("Failed to invoke the On-Scheduled Lifecycle 
methods of {} due to {}; administratively yielding this "
+                            + "ReportingTask and will attempt to schedule it 
again after {}",
+                            new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
+
+                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
reportingTask, taskNode.getConfigurationContext());
+                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, taskNode.getConfigurationContext());
+
+                    componentLifeCycleThreadPool.schedule(this, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
                 }
             }
         };
@@ -262,10 +257,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                 synchronized (lifecycleState) {
                     lifecycleState.setScheduled(false);
 
-                    try {
-                        try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(reportingTask.getClass(), 
reportingTask.getIdentifier())) {
-                            
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, 
configurationContext);
-                        }
+                    try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(reportingTask.getClass(), 
reportingTask.getIdentifier())) {
+                        
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, 
configurationContext);
                     } catch (final Exception e) {
                         final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
                         final ComponentLog componentLog = new 
SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
@@ -274,11 +267,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                         LOG.error("Failed to invoke the @OnUnscheduled methods 
of {} due to {}; administratively yielding this ReportingTask and will attempt 
to schedule it again after {}",
                                 reportingTask, cause.toString(), 
administrativeYieldDuration);
                         LOG.error("", cause);
-
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
-                        }
                     }
 
                     agent.unschedule(taskNode, lifecycleState);
@@ -295,10 +283,10 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     /**
      * Starts the given {@link Processor} by invoking its
-     * {@link ProcessorNode#start(ScheduledExecutorService, long, 
org.apache.nifi.processor.ProcessContext, Runnable)}
+     * {@link ProcessorNode#start(ScheduledExecutorService, long, 
ProcessContext, SchedulingAgentCallback, boolean)}
      * method.
      *
-     * @see StandardProcessorNode#start(ScheduledExecutorService, long, 
org.apache.nifi.processor.ProcessContext, Runnable)
+     * @see StandardProcessorNode#start(ScheduledExecutorService, long, 
ProcessContext, SchedulingAgentCallback, boolean)
      */
     @Override
     public synchronized CompletableFuture<Void> startProcessor(final 
ProcessorNode procNode, final boolean failIfStopping) {
@@ -335,10 +323,10 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     /**
      * Stops the given {@link Processor} by invoking its
-     * {@link ProcessorNode#stop(ScheduledExecutorService, 
org.apache.nifi.processor.ProcessContext, SchedulingAgent, LifecycleState)}
+     * {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, 
ProcessContext, SchedulingAgent, LifecycleState)}
      * method.
      *
-     * @see StandardProcessorNode#stop(ScheduledExecutorService, 
org.apache.nifi.processor.ProcessContext, SchedulingAgent, LifecycleState)
+     * @see StandardProcessorNode#stop(ProcessScheduler, 
ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState)
      */
     @Override
     public synchronized CompletableFuture<Void> stopProcessor(final 
ProcessorNode procNode) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/32db43b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index 971770a..bb1ad49 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -96,12 +96,12 @@ public class TestSocketLoadBalancedFlowFileQueue {
         swapManager = new MockSwapManager();
         eventReporter = EventReporter.NO_OP;
 
-        final NodeIdentifier localNodeIdentifier = createNodeIdentifier();
+        final NodeIdentifier localNodeIdentifier = 
createNodeIdentifier("00000000-0000-0000-0000-000000000000");
 
         nodeIds = new ArrayList<>();
         nodeIds.add(localNodeIdentifier);
-        nodeIds.add(createNodeIdentifier());
-        nodeIds.add(createNodeIdentifier());
+        
nodeIds.add(createNodeIdentifier("11111111-1111-1111-1111-111111111111"));
+        
nodeIds.add(createNodeIdentifier("22222222-2222-2222-2222-222222222222"));
 
         Mockito.doAnswer(new Answer<Set<NodeIdentifier>>() {
             @Override
@@ -128,7 +128,11 @@ public class TestSocketLoadBalancedFlowFileQueue {
     }
 
     private NodeIdentifier createNodeIdentifier() {
-        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
nodePort++, "localhost", nodePort++,
+        return createNodeIdentifier(UUID.randomUUID().toString());
+    }
+
+    private NodeIdentifier createNodeIdentifier(final String uuid) {
+        return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", 
nodePort++,
             "localhost", nodePort++, "localhost", nodePort++, nodePort++, 
true, Collections.emptySet());
     }
 
@@ -339,9 +343,12 @@ public class TestSocketLoadBalancedFlowFileQueue {
         }
     }
 
-    @Test(timeout = 10000)
+
+    @Test(timeout = 30000)
     public void 
testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() 
throws InterruptedException {
-        // Create partitioner that sends first 2 FlowFiles to Partition 0, 
next 2 to Partition 1, and then next 4 to Partition 3.
+        // Create partitioner that sends first 1 FlowFile to Partition 0, next 
to Partition 2, and then next 2 to Partition 2.
+        // Then, cycle back to partitions 0 and 1. This will result in 
partitions 0 & 1 getting 1 FlowFile each and Partition 2
+        // getting 2 FlowFiles. Then, when Partition 2 is removed, those 2 
FlowFiles will be rebalanced to Partitions 0 and 1.
         queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] 
{0, 1, 2, 2, 0, 1}, false));
 
         for (int i = 0; i < 4; i++) {
@@ -359,6 +366,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
 
         final int[] expectedPartitionSizes = new int[] {2, 2};
         final int[] partitionSizes = new int[2];
+
         while (!Arrays.equals(expectedPartitionSizes, partitionSizes)) {
             Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/32db43b3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
new file mode 100644
index 0000000..b8fed54
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.scheduling;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.MockProvenanceRepository;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Validate Processor's life-cycle operation within the context of
+ * {@link FlowController} and {@link StandardProcessScheduler}
+ */
+public class ProcessorLifecycleIT {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProcessorLifecycleIT.class);
+    private static final long SHORT_DELAY_TOLERANCE = 10000L;
+    private static final long MEDIUM_DELAY_TOLERANCE = 15000L;
+    private static final long LONG_DELAY_TOLERANCE = 20000L;
+
+    private FlowController fc;
+    private Map<String, String> properties = new HashMap<>();
+    private volatile String propsFile = 
"src/test/resources/lifecycletest.nifi.properties";
+
+    @Before
+    public void before() throws Exception {
+        properties.put("P", "hello");
+    }
+
+    @After
+    public void after() throws Exception {
+        fc.shutdown(true);
+        FileUtils.deleteDirectory(new File("./target/lifecycletest"));
+    }
+
+    private void assertCondition(final Supplier<Boolean> supplier) {
+        assertCondition(supplier, SHORT_DELAY_TOLERANCE);
+    }
+
+    private void assertCondition(final Supplier<Boolean> supplier, final long 
delayToleranceMillis) {
+        final long startTime = System.currentTimeMillis();
+        while (((System.currentTimeMillis() - startTime) < 
delayToleranceMillis) && !supplier.get()) {
+            try {
+                Thread.sleep(50);
+            } catch (InterruptedException ex) {
+                Thread.interrupted();
+                break;
+            }
+        }
+        assertTrue(supplier.get());
+    }
+
+    @Test
+    public void validateEnableOperation() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
+                UUID.randomUUID().toString(), 
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getPhysicalScheduledState());
+        // validates idempotency
+        for (int i = 0; i < 2; i++) {
+            testProcNode.enable();
+        }
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getPhysicalScheduledState());
+        testProcNode.disable();
+        assertCondition(() -> ScheduledState.DISABLED == 
testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == 
testProcNode.getPhysicalScheduledState());
+    }
+
+    @Test
+    public void validateDisableOperation() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
+                UUID.randomUUID().toString(), 
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getPhysicalScheduledState());
+        // validates idempotency
+        for (int i = 0; i < 2; i++) {
+            testProcNode.disable();
+        }
+        assertCondition(() -> ScheduledState.DISABLED == 
testProcNode.getScheduledState());
+        assertCondition(() -> ScheduledState.DISABLED == 
testProcNode.getPhysicalScheduledState());
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.DISABLED == 
testProcNode.getPhysicalScheduledState());
+    }
+
+    /**
+     * Will validate the idempotent nature of processor start operation which
+     * can be called multiple times without any side-effects.
+     */
+    @Test
+    public void validateIdempotencyOfProcessorStartOperation() throws 
Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+        final ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        ps.startProcessor(testProcNode, true);
+        ps.startProcessor(testProcNode, true);
+
+        Thread.sleep(500);
+        assertCondition(() -> testProcessor.operationNames.size() == 1);
+        assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
+    }
+
+    /**
+     * Validates that stop calls are harmless and idempotent if processor is 
not
+     * in STARTING or RUNNING state.
+     */
+    @Test
+    public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws 
Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState());
+        // sets the scenario for the processor to run
+        int randomDelayLimit = 3000;
+        this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
+        final ProcessScheduler ps = fc.getProcessScheduler();
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState());
+        assertTrue(testProcessor.operationNames.isEmpty());
+    }
+
+    /**
+     * Validates the processors start/stop sequence where the order of
+     * operations can only be @OnScheduled, @OnUnscheduled, @OnStopped.
+     */
+    @Test
+    @Ignore
+    public void validateSuccessfulAndOrderlyShutdown() throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        int randomDelayLimit = 3000;
+        this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
+
+        testProcNode.setMaxConcurrentTasks(4);
+        testProcNode.setScheduldingPeriod("500 millis");
+        testProcNode.setAutoTerminatedRelationships(Collections.singleton(new 
Relationship.Builder().name("success").build()));
+
+        testGroup.addProcessor(testProcNode);
+
+        fc.startProcessGroup(testGroup.getIdentifier());
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+
+        fc.stopAllProcessors();
+
+        Thread.sleep(randomDelayLimit); // up to randomDelayLimit, otherwise 
next assertion may fail as the processor still executing
+
+        // validates that regardless of how many running tasks, lifecycle
+        // operation are invoked atomically (once each).
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+        // . . . hence only 3 operations must be in the list
+        assertCondition(() -> testProcessor.operationNames.size() == 3, 
SHORT_DELAY_TOLERANCE);
+        // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped
+        assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
+        assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
+        assertEquals("@OnStopped", testProcessor.operationNames.get(2));
+    }
+
+    /**
+     * Concurrency test that is basically hammers on both stop and start
+     * operation validating their idempotency.
+     */
+    @Test
+    @Ignore
+    public void 
validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws 
Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+
+        final ProcessScheduler ps = fc.getProcessScheduler();
+        ExecutorService executor = Executors.newFixedThreadPool(100);
+        int startCallsCount = 10000;
+        final CountDownLatch countDownCounter = new 
CountDownLatch(startCallsCount);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState());
+        final Random random = new Random();
+        for (int i = 0; i < startCallsCount / 2; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LockSupport.parkNanos(random.nextInt(9000000));
+                    ps.stopProcessor(testProcNode);
+                    countDownCounter.countDown();
+                }
+            });
+        }
+        for (int i = 0; i < startCallsCount / 2; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LockSupport.parkNanos(random.nextInt(9000000));
+                    ps.startProcessor(testProcNode, true);
+                    countDownCounter.countDown();
+                }
+            });
+        }
+        assertTrue(countDownCounter.await(1000000, TimeUnit.MILLISECONDS));
+        String previousOperation = null;
+        for (String operationName : testProcessor.operationNames) {
+            if (previousOperation == null || 
previousOperation.equals("@OnStopped")) {
+                assertEquals("@OnScheduled", operationName);
+            } else if (previousOperation.equals("@OnScheduled")) {
+                assertEquals("@OnUnscheduled", operationName);
+            } else if (previousOperation.equals("@OnUnscheduled")) {
+                assertTrue(operationName.equals("@OnStopped") || 
operationName.equals("@OnScheduled"));
+            }
+            previousOperation = operationName;
+        }
+        executor.shutdownNow();
+    }
+
+    /**
+     * Validates that processor can be stopped before start sequence finished.
+     */
+    @Test
+    public void 
validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted()
 throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        int delay = 200;
+        this.longRunningOnSchedule(testProcessor, delay);
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+        assertCondition(() -> testProcessor.operationNames.size() == 3, 
LONG_DELAY_TOLERANCE);
+        assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
+        assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
+        assertEquals("@OnStopped", testProcessor.operationNames.get(2));
+    }
+
+    /**
+     * Validates that Processor is eventually started once invocation of
+     *
+     * @OnSchedule stopped throwing exceptions.
+     */
+    @Test
+    public void 
validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() 
throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+        testProcessor.generateExceptionOnScheduled = true;
+        testProcessor.keepFailingOnScheduledTimes = 2;
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that Processor can be stopped when @OnScheduled constantly
+     * fails. Basically validates that the re-try loop breaks if user initiated
+     * stopProcessor.
+     */
+    @Test
+    public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() 
throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.longRunningOnUnschedule(testProcessor, 100);
+        testProcessor.generateExceptionOnScheduled = true;
+        testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that the Processor can be stopped when @OnScheduled blocks
+     * indefinitely but written to react to thread interrupts
+     */
+    @Test
+    public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() 
throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 
sec");
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+        // sets the scenario for the processor to run
+        this.blockingInterruptableOnUnschedule(testProcessor);
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that the Processor can be stopped when @OnScheduled blocks
+     * indefinitely and written to ignore thread interrupts
+     */
+    @Test
+    public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() 
throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 
sec");
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+        // sets the scenario for the processor to run
+        this.blockingUninterruptableOnUnschedule(testProcessor);
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validates that processor can be stopped if onTrigger() keeps throwing
+     * exceptions.
+     */
+    @Test
+    public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() 
throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testProcNode.setProperties(properties);
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+
+        // sets the scenario for the processor to run
+        this.noop(testProcessor);
+        testProcessor.generateExceptionOnTrigger = true;
+        ProcessScheduler ps = fc.getProcessScheduler();
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+        ps.disableProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.RUNNING == 
testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+        ps.stopProcessor(testProcNode);
+        assertCondition(() -> ScheduledState.STOPPED == 
testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
+    }
+
+    /**
+     * Validate that processor will not be validated on failing
+     * PropertyDescriptor validation.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void validateStartFailsOnInvalidProcessorWithMissingProperty() 
throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        ProcessScheduler ps = fc.getProcessScheduler();
+        ps.startProcessor(testProcNode, true);
+        fail();
+    }
+
+    /**
+     * Validate that processor will not be validated on failing
+     * ControllerService validation (not enabled).
+     */
+    @Test(expected = IllegalStateException.class)
+    public void validateStartFailsOnInvalidProcessorWithDisabledService() 
throws Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+
+        ControllerServiceNode testServiceNode = 
fc.createControllerService(TestService.class.getName(), "serv",
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), 
null, true);
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+
+        properties.put("S", testServiceNode.getIdentifier());
+        testProcNode.setProperties(properties);
+
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+        testProcessor.withService = true;
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        ps.startProcessor(testProcNode, true);
+        fail();
+    }
+
+    /**
+     * The successful processor start with ControllerService dependency.
+     */
+    @Test
+    public void validateStartSucceedsOnProcessorWithEnabledService() throws 
Exception {
+        final FlowControllerAndSystemBundle fcsb = 
this.buildFlowControllerForTest();
+        fc = fcsb.getFlowController();
+
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+
+        ControllerServiceNode testServiceNode = 
fc.createControllerService(TestService.class.getName(), "foo",
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), 
null, true);
+        testGroup.addControllerService(testServiceNode);
+
+        ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
+                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
+        testGroup.addProcessor(testProcNode);
+
+        properties.put("S", testServiceNode.getIdentifier());
+        testProcNode.setProperties(properties);
+
+        TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
+        testProcessor.withService = true;
+        this.noop(testProcessor);
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        testServiceNode.performValidation();
+        ps.enableControllerService(testServiceNode);
+
+        testProcNode.performValidation();
+        ps.startProcessor(testProcNode, true);
+
+        Thread.sleep(500);
+        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+    }
+
+
+    /**
+     * Scenario where onTrigger() is executed with random delay limited to
+     * 'delayLimit', yet with guaranteed exit from onTrigger().
+     */
+    private void randomOnTriggerDelay(TestProcessor testProcessor, int 
delayLimit) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        RandomOrFixedDelayedRunnable delayedRunnable = new 
RandomOrFixedDelayedRunnable(delayLimit, true);
+        testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, 
delayedRunnable);
+    }
+
+    /**
+     * Scenario where @OnSchedule is executed with delay limited to
+     * 'delayLimit'.
+     */
+    private void longRunningOnSchedule(TestProcessor testProcessor, int 
delayLimit) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        RandomOrFixedDelayedRunnable delayedRunnable = new 
RandomOrFixedDelayedRunnable(delayLimit, false);
+        testProcessor.setScenario(delayedRunnable, emptyRunnable, 
emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where @OnUnschedule is executed with delay limited to
+     * 'delayLimit'.
+     */
+    private void longRunningOnUnschedule(TestProcessor testProcessor, int 
delayLimit) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        RandomOrFixedDelayedRunnable delayedRunnable = new 
RandomOrFixedDelayedRunnable(delayLimit, false);
+        testProcessor.setScenario(emptyRunnable, delayedRunnable, 
emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where @OnSchedule blocks indefinitely yet interruptible.
+     */
+    private void blockingInterruptableOnUnschedule(TestProcessor 
testProcessor) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        BlockingInterruptableRunnable blockingRunnable = new 
BlockingInterruptableRunnable();
+        testProcessor.setScenario(blockingRunnable, emptyRunnable, 
emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where @OnSchedule blocks indefinitely and un-interruptible.
+     */
+    private void blockingUninterruptableOnUnschedule(TestProcessor 
testProcessor) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        BlockingUninterruptableRunnable blockingRunnable = new 
BlockingUninterruptableRunnable();
+        testProcessor.setScenario(blockingRunnable, emptyRunnable, 
emptyRunnable, emptyRunnable);
+    }
+
+    /**
+     * Scenario where all tasks are no op.
+     */
+    private void noop(TestProcessor testProcessor) {
+        EmptyRunnable emptyRunnable = new EmptyRunnable();
+        testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, 
emptyRunnable);
+    }
+
+    private FlowControllerAndSystemBundle buildFlowControllerForTest(final 
String propKey, final String propValue) throws Exception {
+        final Map<String, String> addProps = new HashMap<>();
+        addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
+        addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, 
"target/test-classes/state-management.xml");
+        addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, 
"local-provider");
+        addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, 
MockProvenanceRepository.class.getName());
+        addProps.put("nifi.remote.input.socket.port", "");
+        addProps.put("nifi.remote.input.secure", "");
+        if (propKey != null && propValue != null) {
+            addProps.put(propKey, propValue);
+        }
+        final NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties(propsFile, addProps);
+
+        final Bundle systemBundle = SystemBundle.create(nifiProperties);
+        ExtensionManager.discoverExtensions(systemBundle, 
Collections.emptySet());
+
+        final FlowController flowController = 
FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), 
nifiProperties,
+                mock(Authorizer.class), mock(AuditService.class), null, new 
VolatileBulletinRepository(),
+            new 
FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
+            mock(FlowRegistryClient.class));
+
+        return new FlowControllerAndSystemBundle(flowController, systemBundle);
+    }
+
+    private FlowControllerAndSystemBundle buildFlowControllerForTest() throws 
Exception {
+        return buildFlowControllerForTest(null, null);
+    }
+
+    /**
+     *
+     */
+    private void setControllerRootGroup(FlowController controller, 
ProcessGroup processGroup) {
+        try {
+            Method m = FlowController.class.getDeclaredMethod("setRootGroup", 
ProcessGroup.class);
+            m.setAccessible(true);
+            m.invoke(controller, processGroup);
+            controller.initializeFlow();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to set root group", e);
+        }
+    }
+
+    private static class FlowControllerAndSystemBundle {
+
+        private final FlowController flowController;
+        private final Bundle systemBundle;
+
+        public FlowControllerAndSystemBundle(FlowController flowController, 
Bundle systemBundle) {
+            this.flowController = flowController;
+            this.systemBundle = systemBundle;
+        }
+
+        public FlowController getFlowController() {
+            return flowController;
+        }
+
+        public Bundle getSystemBundle() {
+            return systemBundle;
+        }
+    }
+
+    /**
+     */
+    public static class TestProcessor extends AbstractProcessor {
+        private static final Runnable NOP = () -> {};
+
+        private Runnable onScheduleCallback = NOP;
+        private Runnable onUnscheduleCallback = NOP;
+        private Runnable onStopCallback = NOP;
+        private Runnable onTriggerCallback = NOP;
+
+        private boolean generateExceptionOnScheduled;
+        private boolean generateExceptionOnTrigger;
+
+        private boolean withService;
+
+        private int keepFailingOnScheduledTimes;
+
+        private int onScheduledExceptionCount;
+
+        private final List<String> operationNames = new LinkedList<>();
+
+        void setScenario(Runnable onScheduleCallback, Runnable 
onUnscheduleCallback, Runnable onStopCallback,
+                Runnable onTriggerCallback) {
+            this.onScheduleCallback = onScheduleCallback;
+            this.onUnscheduleCallback = onUnscheduleCallback;
+            this.onStopCallback = onStopCallback;
+            this.onTriggerCallback = onTriggerCallback;
+        }
+
+        @OnScheduled
+        public void schedule(ProcessContext ctx) {
+            this.operationNames.add("@OnScheduled");
+            if (this.generateExceptionOnScheduled
+                    && this.onScheduledExceptionCount++ < 
this.keepFailingOnScheduledTimes) {
+                throw new RuntimeException("Intentional");
+            }
+            this.onScheduleCallback.run();
+        }
+
+        @OnUnscheduled
+        public void unschedule() {
+            this.operationNames.add("@OnUnscheduled");
+            this.onUnscheduleCallback.run();
+        }
+
+        @OnStopped
+        public void stop() {
+            this.operationNames.add("@OnStopped");
+            this.onStopCallback.run();
+        }
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            PropertyDescriptor PROP = new PropertyDescriptor.Builder()
+                    .name("P")
+                    .description("Blah Blah")
+                    .required(true)
+                    .addValidator(new Validator() {
+                        @Override
+                        public ValidationResult validate(final String subject, 
final String value, final ValidationContext context) {
+                            return new 
ValidationResult.Builder().subject(subject).input(value).valid(value != null && 
!value.isEmpty()).explanation(subject + " cannot be empty").build();
+                        }
+                    })
+                    .build();
+
+            PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
+                    .name("S")
+                    .description("Blah Blah")
+                    .required(true)
+                    .identifiesControllerService(ITestservice.class)
+                    .build();
+
+            return this.withService ? Arrays.asList(new 
PropertyDescriptor[]{PROP, SERVICE})
+                    : Arrays.asList(new PropertyDescriptor[]{PROP});
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+            if (this.generateExceptionOnTrigger) {
+                throw new RuntimeException("Intentional");
+            }
+            this.onTriggerCallback.run();
+        }
+    }
+
+    /**
+     */
+    public static class TestService extends AbstractControllerService 
implements ITestservice {
+
+    }
+
+    /**
+     */
+    public static interface ITestservice extends ControllerService {
+
+    }
+
+    /**
+     */
+    private static class EmptyRunnable implements Runnable {
+
+        @Override
+        public void run() {
+
+        }
+    }
+
+    /**
+     */
+    private static class BlockingInterruptableRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     */
+    private static class BlockingUninterruptableRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    /**
+     */
+    private static class RandomOrFixedDelayedRunnable implements Runnable {
+
+        private final int delayLimit;
+        private final boolean randomDelay;
+
+        public RandomOrFixedDelayedRunnable(int delayLimit, boolean 
randomDelay) {
+            this.delayLimit = delayLimit;
+            this.randomDelay = randomDelay;
+        }
+        Random random = new Random();
+
+        @Override
+        public void run() {
+            try {
+                if (this.randomDelay) {
+                    Thread.sleep(random.nextInt(this.delayLimit));
+                } else {
+                    Thread.sleep(this.delayLimit);
+                }
+            } catch (InterruptedException e) {
+                logger.warn("Interrupted while sleeping");
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}

Reply via email to