Repository: nifi Updated Branches: refs/heads/0.x f404d7d21 -> 9feeb7036
NIFI-1668 modified TestProcessorLifecycle to ensure FlowController is always shut down This closes #324 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9feeb703 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9feeb703 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9feeb703 Branch: refs/heads/0.x Commit: 9feeb703686088ff34928b5ed9d5cef6e1d2915f Parents: f404d7d Author: Oleg Zhurakousky <[email protected]> Authored: Mon Apr 4 16:21:25 2016 -0400 Committer: jpercivall <[email protected]> Committed: Mon Jun 6 13:57:15 2016 -0400 ---------------------------------------------------------------------- .../scheduling/TestProcessorLifecycle.java | 78 ++++---------------- 1 file changed, 13 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9feeb703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index b6962e3..df52516 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -77,24 +77,26 @@ import org.slf4j.LoggerFactory; public class TestProcessorLifecycle { private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); + private FlowController fc; @Before - public void before() { + public void before() throws Exception { System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider"); + fc = this.buildFlowControllerForTest(); } @After public void after() throws Exception { FileUtils.deleteDirectory(new File("./target/test-repo")); FileUtils.deleteDirectory(new File("./target/content_repository")); + fc.shutdown(true); } @Test public void validateEnableOperation() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), @@ -111,13 +113,11 @@ public class TestProcessorLifecycle { testProcNode.disable(); assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); - fc.shutdown(true); } @Test public void validateDisableOperation() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), @@ -135,8 +135,6 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); - - fc.shutdown(true); } /** @@ -145,7 +143,6 @@ public class TestProcessorLifecycle { */ @Test public void validateIdempotencyOfProcessorStartOperation() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -153,32 +150,16 @@ public class TestProcessorLifecycle { TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run - int randomDelayLimit = 3000; - this.randomOnTriggerDelay(testProcessor, randomDelayLimit); + this.noop(testProcessor); final ProcessScheduler ps = fc.getProcessScheduler(); - ExecutorService executor = Executors.newCachedThreadPool(); - int startCallsCount = 100; - final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - for (int i = 0; i < startCallsCount; i++) { - executor.execute(new Runnable() { - @Override - public void run() { - ps.startProcessor(testProcNode); - countDownCounter.countDown(); - } - }); - } + ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode); - assertTrue(countDownCounter.await(2000, TimeUnit.MILLISECONDS)); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); - // regardless of how many threads attempted to start Processor, it must - // only be started once, hence have only single entry for @OnScheduled + Thread.sleep(500); assertEquals(1, testProcessor.operationNames.size()); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); - fc.shutdown(true); - executor.shutdownNow(); } /** @@ -187,7 +168,6 @@ public class TestProcessorLifecycle { */ @Test public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -201,7 +181,6 @@ public class TestProcessorLifecycle { ps.stopProcessor(testProcNode); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcessor.operationNames.size() == 0); - fc.shutdown(true); } /** @@ -210,7 +189,6 @@ public class TestProcessorLifecycle { */ @Test public void validateSuccessfullAndOrderlyShutdown() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -244,8 +222,6 @@ public class TestProcessorLifecycle { assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); assertEquals("@OnStopped", testProcessor.operationNames.get(2)); - - fc.shutdown(true); } /** @@ -254,7 +230,6 @@ public class TestProcessorLifecycle { */ @Test public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -303,7 +278,6 @@ public class TestProcessorLifecycle { previousOperation = operationName; } executor.shutdownNow(); - fc.shutdown(true); } /** @@ -311,7 +285,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -338,7 +311,6 @@ public class TestProcessorLifecycle { assertEquals(2, testProcessor.operationNames.size()); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); - fc.shutdown(true); } /** @@ -347,7 +319,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -370,7 +341,6 @@ public class TestProcessorLifecycle { ps.stopProcessor(testProcNode); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -380,7 +350,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -403,7 +372,6 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -413,7 +381,6 @@ public class TestProcessorLifecycle { @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -433,7 +400,6 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); Thread.sleep(4000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -443,7 +409,6 @@ public class TestProcessorLifecycle { @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -468,7 +433,6 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); Thread.sleep(4000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -477,7 +441,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -498,7 +461,6 @@ public class TestProcessorLifecycle { ps.stopProcessor(testProcNode); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -507,17 +469,12 @@ public class TestProcessorLifecycle { */ @Test(expected = IllegalStateException.class) public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessScheduler ps = fc.getProcessScheduler(); - try { - ps.startProcessor(testProcNode); - fail(); - } finally { - fc.shutdown(true); - } + ps.startProcessor(testProcNode); + fail(); } /** @@ -526,7 +483,6 @@ public class TestProcessorLifecycle { */ @Test(expected = IllegalStateException.class) public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -541,12 +497,8 @@ public class TestProcessorLifecycle { testProcessor.withService = true; ProcessScheduler ps = fc.getProcessScheduler(); - try { - ps.startProcessor(testProcNode); - fail(); - } finally { - fc.shutdown(true); - } + ps.startProcessor(testProcNode); + fail(); } /** @@ -554,7 +506,6 @@ public class TestProcessorLifecycle { */ @Test public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -574,7 +525,6 @@ public class TestProcessorLifecycle { Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); - fc.shutdown(true); } /** @@ -583,7 +533,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorDeletion() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -641,7 +590,6 @@ public class TestProcessorLifecycle { testGroup.removeProcessor(testProcNodeA); testGroup.removeProcessor(testProcNodeB); testGroup.shutdown(); - fc.shutdown(true); } /**
