Repository: nifi
Updated Branches:
  refs/heads/0.x f404d7d21 -> 9feeb7036


NIFI-1668 modified TestProcessorLifecycle to ensure FlowController is always 
shut down

This closes #324


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9feeb703
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9feeb703
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9feeb703

Branch: refs/heads/0.x
Commit: 9feeb703686088ff34928b5ed9d5cef6e1d2915f
Parents: f404d7d
Author: Oleg Zhurakousky <[email protected]>
Authored: Mon Apr 4 16:21:25 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Mon Jun 6 13:57:15 2016 -0400

----------------------------------------------------------------------
 .../scheduling/TestProcessorLifecycle.java      | 78 ++++----------------
 1 file changed, 13 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9feeb703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index b6962e3..df52516 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -77,24 +77,26 @@ import org.slf4j.LoggerFactory;
 public class TestProcessorLifecycle {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TestProcessorLifecycle.class);
+    private FlowController fc;
 
     @Before
-    public void before() {
+    public void before() throws Exception {
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
         
NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION,
 "1 sec");
         
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE,
 "target/test-classes/state-management.xml");
         
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID,
 "local-provider");
+        fc = this.buildFlowControllerForTest();
     }
 
     @After
     public void after() throws Exception {
         FileUtils.deleteDirectory(new File("./target/test-repo"));
         FileUtils.deleteDirectory(new File("./target/content_repository"));
+        fc.shutdown(true);
     }
 
     @Test
     public void validateEnableOperation() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
@@ -111,13 +113,11 @@ public class TestProcessorLifecycle {
         testProcNode.disable();
         assertEquals(ScheduledState.DISABLED, 
testProcNode.getScheduledState());
         assertEquals(ScheduledState.DISABLED, 
testProcNode.getPhysicalScheduledState());
-        fc.shutdown(true);
     }
 
 
     @Test
     public void validateDisableOperation() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
@@ -135,8 +135,6 @@ public class TestProcessorLifecycle {
         ProcessScheduler ps = fc.getProcessScheduler();
         ps.startProcessor(testProcNode);
         assertEquals(ScheduledState.DISABLED, 
testProcNode.getPhysicalScheduledState());
-
-        fc.shutdown(true);
     }
 
     /**
@@ -145,7 +143,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateIdempotencyOfProcessorStartOperation() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -153,32 +150,16 @@ public class TestProcessorLifecycle {
         TestProcessor testProcessor = (TestProcessor) 
testProcNode.getProcessor();
 
         // sets the scenario for the processor to run
-        int randomDelayLimit = 3000;
-        this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
+        this.noop(testProcessor);
         final ProcessScheduler ps = fc.getProcessScheduler();
-        ExecutorService executor = Executors.newCachedThreadPool();
 
-        int startCallsCount = 100;
-        final CountDownLatch countDownCounter = new 
CountDownLatch(startCallsCount);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        for (int i = 0; i < startCallsCount; i++) {
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    ps.startProcessor(testProcNode);
-                    countDownCounter.countDown();
-                }
-            });
-        }
+        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode);
+        ps.startProcessor(testProcNode);
 
-        assertTrue(countDownCounter.await(2000, TimeUnit.MILLISECONDS));
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
-        // regardless of how many threads attempted to start Processor, it must
-        // only be started once, hence have only single entry for @OnScheduled
+        Thread.sleep(500);
         assertEquals(1, testProcessor.operationNames.size());
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
-        fc.shutdown(true);
-        executor.shutdownNow();
     }
 
     /**
@@ -187,7 +168,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -201,7 +181,6 @@ public class TestProcessorLifecycle {
         ps.stopProcessor(testProcNode);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
         assertTrue(testProcessor.operationNames.size() == 0);
-        fc.shutdown(true);
     }
 
     /**
@@ -210,7 +189,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateSuccessfullAndOrderlyShutdown() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -244,8 +222,6 @@ public class TestProcessorLifecycle {
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
         assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
         assertEquals("@OnStopped", testProcessor.operationNames.get(2));
-
-        fc.shutdown(true);
     }
 
     /**
@@ -254,7 +230,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -303,7 +278,6 @@ public class TestProcessorLifecycle {
             previousOperation = operationName;
         }
         executor.shutdownNow();
-        fc.shutdown(true);
     }
 
     /**
@@ -311,7 +285,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted()
 throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -338,7 +311,6 @@ public class TestProcessorLifecycle {
         assertEquals(2, testProcessor.operationNames.size());
         assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
         assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
-        fc.shutdown(true);
     }
 
     /**
@@ -347,7 +319,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -370,7 +341,6 @@ public class TestProcessorLifecycle {
         ps.stopProcessor(testProcNode);
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -380,7 +350,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -403,7 +372,6 @@ public class TestProcessorLifecycle {
         assertTrue(testProcNode.getPhysicalScheduledState() == 
ScheduledState.STOPPING);
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -413,7 +381,6 @@ public class TestProcessorLifecycle {
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() 
throws Exception {
         
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT,
 "5 sec");
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -433,7 +400,6 @@ public class TestProcessorLifecycle {
         assertTrue(testProcNode.getPhysicalScheduledState() == 
ScheduledState.STOPPING);
         Thread.sleep(4000);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -443,7 +409,6 @@ public class TestProcessorLifecycle {
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() 
throws Exception {
         
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT,
 "5 sec");
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -468,7 +433,6 @@ public class TestProcessorLifecycle {
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
         Thread.sleep(4000);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -477,7 +441,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@@ -498,7 +461,6 @@ public class TestProcessorLifecycle {
         ps.stopProcessor(testProcNode);
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
-        fc.shutdown(true);
     }
 
     /**
@@ -507,17 +469,12 @@ public class TestProcessorLifecycle {
      */
     @Test(expected = IllegalStateException.class)
     public void validateStartFailsOnInvalidProcessorWithMissingProperty() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
         ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
         ProcessScheduler ps = fc.getProcessScheduler();
-        try {
-            ps.startProcessor(testProcNode);
-            fail();
-        } finally {
-            fc.shutdown(true);
-        }
+        ps.startProcessor(testProcNode);
+        fail();
     }
 
     /**
@@ -526,7 +483,6 @@ public class TestProcessorLifecycle {
      */
     @Test(expected = IllegalStateException.class)
     public void validateStartFailsOnInvalidProcessorWithDisabledService() 
throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
 
@@ -541,12 +497,8 @@ public class TestProcessorLifecycle {
         testProcessor.withService = true;
 
         ProcessScheduler ps = fc.getProcessScheduler();
-        try {
-            ps.startProcessor(testProcNode);
-            fail();
-        } finally {
-            fc.shutdown(true);
-        }
+        ps.startProcessor(testProcNode);
+        fail();
     }
 
     /**
@@ -554,7 +506,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateStartSucceedsOnProcessorWithEnabledService() throws 
Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
 
@@ -574,7 +525,6 @@ public class TestProcessorLifecycle {
 
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
-        fc.shutdown(true);
     }
 
     /**
@@ -583,7 +533,6 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void validateProcessorDeletion() throws Exception {
-        FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
 
@@ -641,7 +590,6 @@ public class TestProcessorLifecycle {
         testGroup.removeProcessor(testProcNodeA);
         testGroup.removeProcessor(testProcNodeB);
         testGroup.shutdown();
-        fc.shutdown(true);
     }
 
     /**

Reply via email to