Repository: nifi
Updated Branches:
  refs/heads/master cee2426df -> 91cdd78eb


NIFI-1668 modified TestProcessorLifecycle to ensure FlowController is always 
shut down

This closes #324


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/91cdd78e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/91cdd78e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/91cdd78e

Branch: refs/heads/master
Commit: 91cdd78ebcc11e7997326b9e8f6462ca45098114
Parents: cee2426
Author: Oleg Zhurakousky <[email protected]>
Authored: Mon Apr 4 16:21:25 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Mon Jun 6 13:55:49 2016 -0400

----------------------------------------------------------------------
 .../scheduling/TestProcessorLifecycle.java      | 78 ++++----------------
 1 file changed, 13 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/91cdd78e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index eb6836f..a42cfca 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -78,24 +78,26 @@ import org.slf4j.LoggerFactory;
 public class TestProcessorLifecycle {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TestProcessorLifecycle.class);
+    private FlowController fc;
 
     @Before
-    public void before() {
+    public void before() throws Exception {
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
         
NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION,
 "1 sec");
         
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE,
 "target/test-classes/state-management.xml");
         
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID,
 "local-provider");
+        fc = this.buildFlowControllerForTest();
     }
 
     @After
     public void after() throws Exception {
         FileUtils.deleteDirectory(new File("./target/test-repo"));
         FileUtils.deleteDirectory(new File("./target/content_repository"));
+        fc.shutdown(true);
     }
 
     @Test
     public void validateEnableOperation() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
@@ -112,13 +114,11 @@ public class TestProcessorLifecycle {
         testProcNode.disable();
         assertEquals(ScheduledState.DISABLED, 
testProcNode.getScheduledState());
         assertEquals(ScheduledState.DISABLED, 
testProcNode.getPhysicalScheduledState());
-        fc.shutdown(true);
     }
 
 
     @Test
     public void validateDisableOperation() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
@@ -136,8 +136,6 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
         ps.startProcessor(testProcNode);
         assertEquals(ScheduledState.DISABLED, 
testProcNode.getPhysicalScheduledState());
-
-        fc.shutdown(true);
     }
 
     /**
@@ -146,7 +144,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateIdempotencyOfProcessorStartOperation() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -154,32 +151,16 @@ public class TestProcessorLifecycle {
         TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
 
         // sets the scenario for the processor to run
-        int randomDelayLimit = 3000;
-        this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
+        this.noop(testProcessor);
         final ProcessScheduler ps = fc.getProcessScheduler();
-        ExecutorService executor = Executors.newCachedThreadPool();
 
-        int startCallsCount = 100;
-        final CountDownLatch countDownCounter = new 
CountDownLatch(startCallsCount);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        for (int i = 0; i < startCallsCount; i++) {
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    ps.startProcessor(testProcNode);
-                    countDownCounter.countDown();
-                }
-            });
-        }
+        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode);
 
-        assertTrue(countDownCounter.await(2000, TimeUnit.MILLISECONDS));
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
-        // regardless of how many threads attempted to start Processor, it must
-        // only be started once, hence have only single entry for @OnScheduled
+        Thread.sleep(500);
         assertEquals(1, testProcessor.operationNames.size());
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
-        fc.shutdown(true);
-        executor.shutdownNow();
     }
 
     /**
@@ -188,7 +169,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -202,7 +182,6 @@ public class TestProcessorLifecycle {
         ps.stopProcessor(testProcNode);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
         assertTrue(testProcessor.operationNames.size() == 0);
-        fc.shutdown(true);
     }
 
     /**
@@ -211,7 +190,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateSuccessfullAndOrderlyShutdown() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -245,8 +223,6 @@ public class TestProcessorLifecycle {
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
         assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
         assertEquals("@OnStopped", testProcessor.operationNames.get(2));
-
-        fc.shutdown(true);
     }
 
     /**
@@ -255,7 +231,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -304,7 +279,6 @@ public class TestProcessorLifecycle {
             previousOperation = operationName;
         }
         executor.shutdownNow();
-        fc.shutdown(true);
     }
 
     /**
@@ -312,7 +286,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted()
 throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -339,7 +312,6 @@ public class TestProcessorLifecycle {
         assertEquals(2, testProcessor.operationNames.size());
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
         assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
-        fc.shutdown(true);
     }
 
     /**
@@ -348,7 +320,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -371,7 +342,6 @@ public class TestProcessorLifecycle {
         ps.stopProcessor(testProcNode);
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -381,7 +351,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -404,7 +373,6 @@ public class TestProcessorLifecycle {
         assertTrue(testProcNode.getPhysicalScheduledState() == 
ScheduledState.STOPPING);
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -414,7 +382,6 @@ public class TestProcessorLifecycle {
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() 
throws Exception {
         
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT,
 "5 sec");
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -434,7 +401,6 @@ public class TestProcessorLifecycle {
         assertTrue(testProcNode.getPhysicalScheduledState() == 
ScheduledState.STOPPING);
         Thread.sleep(4000);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -444,7 +410,6 @@ public class TestProcessorLifecycle {
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() 
throws Exception {
         
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT,
 "5 sec");
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -469,7 +434,6 @@ public class TestProcessorLifecycle {
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
         Thread.sleep(4000);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -478,7 +442,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -499,7 +462,6 @@ public class TestProcessorLifecycle {
         ps.stopProcessor(testProcNode);
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -508,17 +470,12 @@ public class TestProcessorLifecycle {
      */
     @Test(expected = IllegalStateException.class)
     public void validateStartFailsOnInvalidProcessorWithMissingProperty() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
         ProcessScheduler ps = fc.getProcessScheduler();
-        try {
-            ps.startProcessor(testProcNode);
-            fail();
-        } finally {
-            fc.shutdown(true);
-        }
+        ps.startProcessor(testProcNode);
+        fail();
     }
 
     /**
@@ -527,7 +484,6 @@ public class TestProcessorLifecycle {
      */
     @Test(expected = IllegalStateException.class)
     public void validateStartFailsOnInvalidProcessorWithDisabledService() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
 
@@ -542,12 +498,8 @@ public class TestProcessorLifecycle {
         testProcessor.withService = true;
 
         ProcessScheduler ps = fc.getProcessScheduler();
-        try {
-            ps.startProcessor(testProcNode);
-            fail();
-        } finally {
-            fc.shutdown(true);
-        }
+        ps.startProcessor(testProcNode);
+        fail();
     }
 
     /**
@@ -555,7 +507,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateStartSucceedsOnProcessorWithEnabledService() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
 
@@ -577,7 +528,6 @@ public class TestProcessorLifecycle {
 
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
-        fc.shutdown(true);
     }
 
     /**
@@ -586,7 +536,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateProcessorDeletion() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
 
@@ -644,7 +593,6 @@ public class TestProcessorLifecycle {
         testGroup.removeProcessor(testProcNodeA);
         testGroup.removeProcessor(testProcNodeB);
         testGroup.shutdown();
-        fc.shutdown(true);
     }
 
     /**

Reply via email to