http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index fbd3dd7..66abf30 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -16,14 +16,27 @@ */ package org.apache.nifi.controller.service; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.UUID; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.StandardProcessorNode; +import org.apache.nifi.controller.service.mock.DummyProcessor; import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceB; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.StandardProcessorInitializationContext; +import org.apache.nifi.processor.StandardValidationContextFactory; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestStandardControllerServiceProvider { @@ -68,11 +81,11 @@ public class TestStandardControllerServiceProvider { @Test - public void testActivateReferencingComponentsGraph() { + public void testEnableReferencingServicesGraph() { final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler); - // build a graph of components with dependencies as such: + // build a graph of controller services with dependencies as such: // // A -> B -> D // C ---^----^ @@ -81,7 +94,7 @@ public class TestStandardControllerServiceProvider { // AND // C references B and D. // - // So we have to verify that if D is enabled, when we enable its referencing components, + // So we have to verify that if D is enabled, when we enable its referencing services, // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so // until B is first enabled so ensure that we enable B first. @@ -96,12 +109,102 @@ public class TestStandardControllerServiceProvider { serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); provider.enableControllerService(serviceNode4); - provider.activateReferencingComponents(serviceNode4); - - assertFalse(serviceNode3.isDisabled()); - assertFalse(serviceNode2.isDisabled()); - assertFalse(serviceNode1.isDisabled()); + provider.enableReferencingServices(serviceNode4); + assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState()); } + + @Test + public void testStartStopReferencingComponents() { + final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler); + + // build a graph of reporting tasks and controller services with dependencies as such: + // + // Processor P1 -> A -> B -> D + // Processor P2 -> C ---^----^ + // + // In other words, Processor P1 references Controller Service A, which references B, which references D. + // AND + // Processor P2 references Controller Service C, which references B and D. + // + // So we have to verify that if D is enabled, when we enable its referencing services, + // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so + // until B is first enabled so ensure that we enable B first. + + final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); + final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); + final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); + final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + + final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class); + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0]; + procNode.verifyCanStart(); + procNode.setScheduledState(ScheduledState.RUNNING); + return null; + } + }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class)); + + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0]; + procNode.verifyCanStop(); + procNode.setScheduledState(ScheduledState.STOPPED); + return null; + } + }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class)); + + final String id1 = UUID.randomUUID().toString(); + final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1, + new StandardValidationContextFactory(provider), scheduler, provider); + procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider)); + procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1"); + procNodeA.setProcessGroup(mockProcessGroup); + + final String id2 = UUID.randomUUID().toString(); + final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(),id2, + new StandardValidationContextFactory(provider), scheduler, provider); + procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider)); + procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3"); + procNodeB.setProcessGroup(mockProcessGroup); + + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + + provider.enableControllerService(serviceNode4); + provider.enableReferencingServices(serviceNode4); + provider.scheduleReferencingComponents(serviceNode4); + + assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState()); + assertTrue(procNodeA.isRunning()); + assertTrue(procNodeB.isRunning()); + + // stop processors and verify results. + provider.unscheduleReferencingComponents(serviceNode4); + assertFalse(procNodeA.isRunning()); + assertFalse(procNodeB.isRunning()); + assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState()); + + provider.disableReferencingServices(serviceNode4); + assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState()); + assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState()); + assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState()); + + provider.disableControllerService(serviceNode4); + assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState()); + } }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java new file mode 100644 index 0000000..615e172 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service.mock; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +public class DummyProcessor extends AbstractProcessor { + + public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder() + .name("Controller Service") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(SERVICE); + return descriptors; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index c55960f..9c2f0e0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -146,9 +146,9 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro if (enabled != null) { if (enabled) { - + serviceProvider.enableReferencingServices(controllerService); } else { - + serviceProvider.disableReferencingServices(controllerService); } } else if (state != null) { try { @@ -156,13 +156,14 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro switch (scheduledState) { case RUNNING: - + serviceProvider.scheduleReferencingComponents(controllerService); break; case STOPPED: - + serviceProvider.unscheduleReferencingComponents(controllerService); break; - default: - + default: + throw new IllegalArgumentException(String.format( + "The specified state (%s) is not valid. Valid options are 'RUNNING' and 'STOPPED'.", state)); } } catch (IllegalArgumentException iae) { throw new IllegalArgumentException(String.format(
