NIFI-1777 Prevent deleting a connection going to a running processor NIFI-1777 Added unit tests to test processor removal This closes #357
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2859cd26 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2859cd26 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2859cd26 Branch: refs/heads/NIFI-1654 Commit: 2859cd26a0c2084bff80dece8c1d8cf9ba681b3e Parents: dedf9a5 Author: Pierre Villard <[email protected]> Authored: Fri Apr 15 16:49:47 2016 +0200 Committer: Oleg Zhurakousky <[email protected]> Committed: Mon Apr 18 07:57:12 2016 -0400 ---------------------------------------------------------------------- .../nifi/connectable/StandardConnection.java | 7 ++ .../scheduling/TestProcessorLifecycle.java | 70 ++++++++++++++++++++ 2 files changed, 77 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2859cd26/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 1ef18c0..b9a0245 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -399,5 +399,12 @@ public final class StandardConnection implements Connection { throw new IllegalStateException("Source of Connection (" + source + ") is running"); } } + + final Connectable dest = destination.get(); + if (dest.isRunning()) { + if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) { + throw new IllegalStateException("Destination of Connection (" + dest + ") is running"); + } + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2859cd26/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 560c4cb..b6962e3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock; import java.io.File; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -45,6 +47,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; @@ -575,6 +578,73 @@ public class TestProcessorLifecycle { } /** + * Test deletion of processor when connected to another + * @throws Exception exception + */ + @Test + public void validateProcessorDeletion() throws Exception { + FlowController fc = this.buildFlowControllerForTest(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); + this.setControllerRootGroup(fc, testGroup); + + ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + testProcNodeA.setProperty("P", "hello"); + testGroup.addProcessor(testProcNodeA); + + ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + testProcNodeB.setProperty("P", "hello"); + testGroup.addProcessor(testProcNodeB); + + Collection<String> relationNames = new ArrayList<String>(); + relationNames.add("relation"); + Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames); + testGroup.addConnection(connection); + + ProcessScheduler ps = fc.getProcessScheduler(); + ps.startProcessor(testProcNodeA); + ps.startProcessor(testProcNodeB); + + try { + testGroup.removeProcessor(testProcNodeA); + fail(); + } catch (Exception e) { + // should throw exception because processor running + } + + try { + testGroup.removeProcessor(testProcNodeB); + fail(); + } catch (Exception e) { + // should throw exception because processor running + } + + ps.stopProcessor(testProcNodeB); + Thread.sleep(100); + + try { + testGroup.removeProcessor(testProcNodeA); + fail(); + } catch (Exception e) { + // should throw exception because destination processor running + } + + try { + testGroup.removeProcessor(testProcNodeB); + fail(); + } catch (Exception e) { + // should throw exception because source processor running + } + + ps.stopProcessor(testProcNodeA); + Thread.sleep(100); + + testGroup.removeProcessor(testProcNodeA); + testGroup.removeProcessor(testProcNodeB); + testGroup.shutdown(); + fc.shutdown(true); + } + + /** * Scenario where onTrigger() is executed with random delay limited to * 'delayLimit', yet with guaranteed exit from onTrigger(). */
