Repository: nifi Updated Branches: refs/heads/master cb3aa8f5c -> d64fe416b
NIFI-2032 put back bulletinRepo reporting logic NIFI-2032 polishing (+2 squashed commits) Squashed commits: [4549443] NIFI-2032 fixed 'enableControllerServices' logic added getRequiredControllerServices() operation to ControllerServiceNode [3d95c8e] NIFI-2032 Removed legacy references to bulletin repository in StandardControllerServiceProvider. (+1 squashed commit) Squashed commits: [6402eb9] NIFI-2032 fixed 'enableControllerServices' logic added getRequiredControllerServices() operation to ControllerServiceNode This closes #541. Signed-off-by: Andy LoPresto <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d64fe416 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d64fe416 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d64fe416 Branch: refs/heads/master Commit: d64fe416be6627682c7458ded71ba2ea6e05a899 Parents: cb3aa8f Author: Oleg Zhurakousky <[email protected]> Authored: Fri Jun 17 12:08:26 2016 -0400 Committer: Andy LoPresto <[email protected]> Committed: Mon Jun 20 18:56:41 2016 -0700 ---------------------------------------------------------------------- .../service/ControllerServiceNode.java | 11 ++ .../service/StandardControllerServiceNode.java | 19 +++ .../StandardControllerServiceProvider.java | 119 +++++-------------- .../TestStandardControllerServiceProvider.java | 92 ++++++++++++++ .../nifi/controller/service/mock/ServiceC.java | 55 +++++++++ 5 files changed, 207 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d64fe416/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index bd3a42b..c0ff480 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -20,6 +20,7 @@ import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.groups.ProcessGroup; +import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -51,6 +52,16 @@ public interface ControllerServiceNode extends ConfiguredComponent { ControllerService getProxiedControllerService(); /** + * Returns the list of services that are required to be enabled before this + * service is enabled. The returned list is flattened and contains both + * immediate and transient dependencies. + * + * @return list of services required to be enabled before this service is + * enabled + */ + List<ControllerServiceNode> getRequiredControllerServices(); + + /** * <p> * Returns the actual implementation of the Controller Service that this ControllerServiceNode * encapsulates. This direct implementation should <strong>NEVER</strong> be passed to another http://git-wip-us.apache.org/repos/asf/nifi/blob/d64fe416/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 6e29053..05dbc2d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -17,10 +17,13 @@ package org.apache.nifi.controller.service; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +38,7 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; @@ -165,6 +169,21 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } @Override + public List<ControllerServiceNode> getRequiredControllerServices() { + List<ControllerServiceNode> requiredServices = new ArrayList<>(); + for (Entry<PropertyDescriptor, String> pEntry : this.getProperties().entrySet()) { + PropertyDescriptor descriptor = pEntry.getKey(); + if (descriptor.getControllerServiceDefinition() != null && descriptor.isRequired()) { + ControllerServiceNode rNode = this.processGroup.getControllerService(pEntry.getValue()); + requiredServices.add(rNode); + requiredServices.addAll(rNode.getRequiredControllerServices()); + } + } + return requiredServices; + } + + + @Override public void removeReference(final ConfiguredComponent referencingComponent) { writeLock.lock(); try { http://git-wip-us.apache.org/repos/asf/nifi/blob/d64fe416/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 4e3249f..3861355 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -25,14 +25,12 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnAdded; @@ -57,6 +55,7 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; + import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.ObjectHolder; @@ -372,98 +371,40 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { - final Set<ControllerServiceNode> servicesToEnable = new HashSet<>(); - // Ensure that all nodes are already disabled - for (final ControllerServiceNode serviceNode : serviceNodes) { - final ControllerServiceState curState = serviceNode.getState(); - if (ControllerServiceState.DISABLED.equals(curState)) { - servicesToEnable.add(serviceNode); - } else { - logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState); + boolean shouldStart = true; + + Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator(); + while (serviceIter.hasNext() && shouldStart) { + ControllerServiceNode controllerServiceNode = serviceIter.next(); + List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices(); + for (ControllerServiceNode requiredService : requiredServices) { + if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) { + shouldStart = false; + } } } - // determine the order to load the services. We have to ensure that if service A references service B, then B - // is enabled first, and so on. - final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>(); - for (final ControllerServiceNode node : servicesToEnable) { - idToNodeMap.put(node.getIdentifier(), node); - } - - // We can have many Controller Services dependent on one another. We can have many of these - // disparate lists of Controller Services that are dependent on one another. We refer to each - // of these as a branch. - final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap); - - if (branches.isEmpty()) { - logger.info("No Controller Services to enable"); - return; - } else { - logger.info("Will enable {} Controller Services", servicesToEnable.size()); - } - - final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>()); - final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()), new ThreadFactory() { - @Override - public Thread newThread(final Runnable r) { - final Thread t = Executors.defaultThreadFactory().newThread(r); - t.setDaemon(true); - t.setName("Enable Controller Services"); - return t; - } - }); - - for (final List<ControllerServiceNode> branch : branches) { - final Runnable enableBranchRunnable = new Runnable() { + if (shouldStart) { + List<ControllerServiceNode> services = new ArrayList<>(serviceNodes); + Collections.sort(services, new Comparator<ControllerServiceNode>() { @Override - public void run() { - logger.debug("Enabling Controller Service Branch {}", branch); - - for (final ControllerServiceNode serviceNode : branch) { - try { - if (!enabledNodes.contains(serviceNode)) { - enabledNodes.add(serviceNode); - - logger.info("Enabling {}", serviceNode); - try { - serviceNode.verifyCanEnable(); - processScheduler.enableControllerService(serviceNode); - } catch (final Exception e) { - logger.error("Failed to enable " + serviceNode + " due to " + e); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - - if (bulletinRepo != null) { - bulletinRepo.addBulletin(BulletinFactory.createBulletin( - "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); - } - } - } - - // wait for service to finish enabling. - while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) { - try { - Thread.sleep(100L); - } catch (final InterruptedException ie) { - } - } - - logger.info("State for {} is now {}", serviceNode, serviceNode.getState()); - } catch (final Exception e) { - logger.error("Failed to enable {} due to {}", serviceNode, e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } + public int compare(ControllerServiceNode s1, ControllerServiceNode s2) { + return s2.getRequiredControllerServices().contains(s1) ? -1 : 1; + } + }); + + for (ControllerServiceNode controllerServiceNode : services) { + try { + this.enableControllerService(controllerServiceNode); + } catch (Exception e) { + logger.error("Failed to enable " + controllerServiceNode + " due to " + e); + if (this.bulletinRepo != null) { + this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service", + Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e)); } } - }; - - executor.submit(enableBranchRunnable); + } } - - executor.shutdown(); } static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { http://git-wip-us.apache.org/repos/asf/nifi/blob/d64fe416/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 91a2e7a..16f5481 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -17,9 +17,11 @@ package org.apache.nifi.controller.service; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.beans.PropertyDescriptor; +import java.util.Arrays; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -39,6 +41,7 @@ import org.apache.nifi.controller.service.mock.DummyProcessor; import org.apache.nifi.controller.service.mock.MockProcessGroup; import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceB; +import org.apache.nifi.controller.service.mock.ServiceC; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.processor.StandardValidationContextFactory; @@ -386,4 +389,93 @@ public class TestStandardControllerServiceProvider { provider.unscheduleReferencingComponents(serviceNode); assertEquals(ScheduledState.STOPPED, procNode.getScheduledState()); } + + @Test + public void validateEnableServices() { + StandardProcessScheduler scheduler = createScheduler(); + FlowController controller = Mockito.mock(FlowController.class); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider); + ProcessGroup procGroup = new MockProcessGroup(); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + + ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); + ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); + ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); + ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false); + + procGroup.addControllerService(serviceNode1); + procGroup.addControllerService(serviceNode2); + procGroup.addControllerService(serviceNode3); + procGroup.addControllerService(serviceNode4); + procGroup.addControllerService(serviceNode5); + + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "1"); + + provider.enableControllerServices( + Arrays.asList(new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5})); + + assertTrue(serviceNode1.isActive()); + assertTrue(serviceNode2.isActive()); + assertTrue(serviceNode3.isActive()); + assertTrue(serviceNode4.isActive()); + assertTrue(serviceNode5.isActive()); + } + + @Test + public void validateEnableServicesWithDisabledMissingService() { + StandardProcessScheduler scheduler = createScheduler(); + FlowController controller = Mockito.mock(FlowController.class); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider); + ProcessGroup procGroup = new MockProcessGroup(); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + + ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); + ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); + ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); + ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false); + ControllerServiceNode serviceNode6 = provider.createControllerService(ServiceB.class.getName(), "6", false); + ControllerServiceNode serviceNode7 = provider.createControllerService(ServiceC.class.getName(), "7", false); + + procGroup.addControllerService(serviceNode1); + procGroup.addControllerService(serviceNode2); + procGroup.addControllerService(serviceNode3); + procGroup.addControllerService(serviceNode4); + procGroup.addControllerService(serviceNode5); + procGroup.addControllerService(serviceNode6); + procGroup.addControllerService(serviceNode7); + + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "6"); + serviceNode7.setProperty(ServiceC.REQ_SERVICE_1.getName(), "2"); + serviceNode7.setProperty(ServiceC.REQ_SERVICE_2.getName(), "3"); + + provider.enableControllerServices(Arrays.asList( + new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5, serviceNode7})); + assertFalse(serviceNode1.isActive()); + assertFalse(serviceNode2.isActive()); + assertFalse(serviceNode3.isActive()); + assertFalse(serviceNode4.isActive()); + assertFalse(serviceNode5.isActive()); + assertFalse(serviceNode6.isActive()); + + provider.enableControllerService(serviceNode6); + provider.enableControllerServices(Arrays.asList( + new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5 })); + + assertTrue(serviceNode1.isActive()); + assertTrue(serviceNode2.isActive()); + assertTrue(serviceNode3.isActive()); + assertTrue(serviceNode4.isActive()); + assertTrue(serviceNode5.isActive()); + assertTrue(serviceNode6.isActive()); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d64fe416/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceC.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceC.java new file mode 100644 index 0000000..8c46bb6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceC.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service.mock; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ControllerService; + +public class ServiceC extends AbstractControllerService { + + public static final PropertyDescriptor REQ_SERVICE_1 = new PropertyDescriptor.Builder() + .name("S1") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + + public static final PropertyDescriptor REQ_SERVICE_2 = new PropertyDescriptor.Builder() + .name("S2") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + + public static final PropertyDescriptor OPT_SERVICE = new PropertyDescriptor.Builder() + .name("S3") + .identifiesControllerService(ControllerService.class) + .required(false) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(REQ_SERVICE_1); + descriptors.add(REQ_SERVICE_2); + descriptors.add(OPT_SERVICE); + return descriptors; + } + +}
