NIFI-250: Fixed controller service handling on startup
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/800f80bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/800f80bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/800f80bc Branch: refs/heads/NIFI-250 Commit: 800f80bc05ead5f097c9e948f619e0dd681ca592 Parents: f0c660c Author: Mark Payne <[email protected]> Authored: Thu Mar 19 09:28:05 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Mar 19 09:28:05 2015 -0400 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 7 +- .../service/ControllerServiceProvider.java | 8 + .../apache/nifi/controller/FlowController.java | 13 +- .../scheduling/StandardProcessScheduler.java | 14 +- .../service/ControllerServiceLoader.java | 124 ++----------- .../StandardControllerServiceProvider.java | 145 ++++++++++++++- .../service/TestControllerServiceLoader.java | 175 ------------------- .../TestStandardControllerServiceProvider.java | 158 ++++++++++++++++- .../cache/server/AbstractCacheServer.java | 2 +- 9 files changed, 347 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 6fe1f80..37465e9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -434,7 +434,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); - controllerServiceProvider = new StandardControllerServiceProvider(processScheduler); + controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository); } public void start() throws IOException { @@ -1411,6 +1411,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } @Override + public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { + controllerServiceProvider.enableControllerServices(serviceNodes); + } + + @Override public void disableControllerService(final ControllerServiceNode serviceNode) { controllerServiceProvider.disableControllerService(serviceNode); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index aac65dc..1901fb6 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.service; +import java.util.Collection; import java.util.Set; import org.apache.nifi.annotation.lifecycle.OnAdded; @@ -63,6 +64,13 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { void enableControllerService(ControllerServiceNode serviceNode); /** + * Enables the collection of services. If a service in this collection depends on another service, + * the service being depended on must either already be enabled or must be in the collection as well. + * @param serviceNodes + */ + void enableControllerServices(Collection<ControllerServiceNode> serviceNodes); + + /** * Disables the given controller service so that it cannot be used by other components. This allows * configuration to be updated or allows service to be removed. * @param serviceNode http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 26f2369..fd01711 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -403,7 +403,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R processScheduler = new StandardProcessScheduler(this, this, encryptor); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); - controllerServiceProvider = new StandardControllerServiceProvider(processScheduler); + controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository); processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( @@ -598,7 +598,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R startConnectable(connectable); } } catch (final Throwable t) { - LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); + LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()}); + if ( LOG.isDebugEnabled() ) { + LOG.error("", t); + } } } @@ -2631,11 +2634,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public void enableControllerService(final ControllerServiceNode serviceNode) { - serviceNode.verifyCanEnable(); controllerServiceProvider.enableControllerService(serviceNode); } @Override + public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { + controllerServiceProvider.enableControllerServices(serviceNodes); + } + + @Override public void disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); controllerServiceProvider.disableControllerService(serviceNode); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index b005a57..89850cc 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -281,7 +281,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { } if (!procNode.isValid()) { - throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state"); + throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state due to " + procNode.getValidationErrors()); } final Runnable startProcRunnable = new Runnable() { @@ -301,9 +301,18 @@ public final class StandardProcessScheduler implements ProcessScheduler { } } - while (true) { + attemptOnScheduled: while (true) { try { synchronized (scheduleState) { + for ( final String serviceId : serviceIds ) { + final boolean enabled = processContext.isControllerServiceEnabled(serviceId); + if ( !enabled ) { + LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode); + Thread.sleep(administrativeYieldMillis); + continue attemptOnScheduled; + } + } + // if no longer scheduled to run, then we're finished. This can happen, for example, // if the @OnScheduled method throws an Exception and the user stops the processor // while we're administratively yielded. @@ -607,7 +616,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void enableControllerService(final ControllerServiceNode service) { - service.verifyCanEnable(); service.setState(ControllerServiceState.ENABLING); final ScheduleState scheduleState = getScheduleState(service); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 7c2d4df..1fde670 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -26,19 +26,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.DomUtils; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.slf4j.Logger; @@ -116,122 +111,29 @@ public class ControllerServiceLoader { // Start services if ( autoResumeState ) { - // determine the order to load the services. We have to ensure that if service A references service B, then B - // is enabled first, and so on. - final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>(); - for ( final ControllerServiceNode node : nodeMap.keySet() ) { - idToNodeMap.put(node.getIdentifier(), node); - } - - // We can have many Controller Services dependent on one another. We can have many of these - // disparate lists of Controller Services that are dependent on one another. We refer to each - // of these as a branch. - final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap); - - final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size())); + final Set<ControllerServiceNode> nodesToEnable = new HashSet<>(); - for ( final List<ControllerServiceNode> branch : branches ) { - final Runnable enableBranchRunnable = new Runnable() { - @Override - public void run() { - logger.debug("Enabling Controller Service Branch {}", branch); - - for ( final ControllerServiceNode serviceNode : branch ) { - try { - final Element controllerServiceElement = nodeMap.get(serviceNode); - - final ControllerServiceDTO dto; - synchronized (controllerServiceElement.getOwnerDocument()) { - dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); - } - - final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); - final boolean enable = (state == ControllerServiceState.ENABLED); - if (enable) { - if ( ControllerServiceState.DISABLED.equals(serviceNode.getState()) ) { - logger.info("Enabling {}", serviceNode); - try { - provider.enableControllerService(serviceNode); - } catch (final Exception e) { - logger.error("Failed to enable " + serviceNode + " due to " + e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - - bulletinRepo.addBulletin(BulletinFactory.createBulletin( - "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); - } - } - - // wait for service to finish enabling. - while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) { - try { - Thread.sleep(100L); - } catch (final InterruptedException ie) {} - } - - logger.info("State for {} is now {}", serviceNode, serviceNode.getState()); - } - } catch (final Exception e) { - logger.error("Failed to enable {} due to {}", serviceNode, e.toString()); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - } - } - } - }; + for ( final ControllerServiceNode node : nodeMap.keySet() ) { + final Element controllerServiceElement = nodeMap.get(node); + + final ControllerServiceDTO dto; + synchronized (controllerServiceElement.getOwnerDocument()) { + dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); + } - executor.submit(enableBranchRunnable); + final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); + if (state == ControllerServiceState.ENABLED) { + nodesToEnable.add(node); + } } - executor.shutdown(); + provider.enableControllerServices(nodesToEnable); } return nodeMap.keySet(); } - static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { - final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>(); - - for ( final ControllerServiceNode node : serviceNodeMap.values() ) { - if ( orderedNodeLists.contains(node) ) { - continue; // this node is already in the list. - } - - final List<ControllerServiceNode> branch = new ArrayList<>(); - determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>()); - orderedNodeLists.add(branch); - } - - return orderedNodeLists; - } - - - private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) { - if ( visited.contains(contextNode) ) { - return; - } - - for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) { - if ( entry.getKey().getControllerServiceDefinition() != null ) { - final String referencedServiceId = entry.getValue(); - if ( referencedServiceId != null ) { - final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId); - if ( !orderedNodes.contains(referencedNode) ) { - visited.add(contextNode); - determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited); - } - } - } - } - - if ( !orderedNodes.contains(contextNode) ) { - orderedNodes.add(contextNode); - } - } - private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) { final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index b2d61bd..ded55b4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -23,13 +23,17 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnRemoved; @@ -44,11 +48,14 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ControllerServiceNotFoundException; import org.apache.nifi.controller.exception.ProcessorLifeCycleException; +import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; @@ -64,6 +71,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private final ProcessScheduler processScheduler; private final ConcurrentMap<String, ControllerServiceNode> controllerServices; private static final Set<Method> validDisabledMethods; + private final BulletinRepository bulletinRepo; static { // methods that are okay to be called when the service is disabled. @@ -77,11 +85,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi validDisabledMethods = Collections.unmodifiableSet(validMethods); } - public StandardControllerServiceProvider(final ProcessScheduler scheduler) { + public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo) { // the following 2 maps must be updated atomically, but we do not lock around them because they are modified // only in the createControllerService method, and both are modified before the method returns this.controllerServices = new ConcurrentHashMap<>(); this.processScheduler = scheduler; + this.bulletinRepo = bulletinRepo; } private Class<?>[] getInterfaces(final Class<?> cls) { @@ -285,6 +294,140 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override + public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { + final Set<ControllerServiceNode> servicesToEnable = new HashSet<>(); + // Ensure that all nodes are already disabled + for ( final ControllerServiceNode serviceNode : serviceNodes ) { + final ControllerServiceState curState = serviceNode.getState(); + if ( ControllerServiceState.DISABLED.equals(curState) ) { + servicesToEnable.add(serviceNode); + } else { + logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState); + } + } + + // determine the order to load the services. We have to ensure that if service A references service B, then B + // is enabled first, and so on. + final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>(); + for ( final ControllerServiceNode node : servicesToEnable ) { + idToNodeMap.put(node.getIdentifier(), node); + } + + // We can have many Controller Services dependent on one another. We can have many of these + // disparate lists of Controller Services that are dependent on one another. We refer to each + // of these as a branch. + final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap); + + if ( branches.isEmpty() ) { + logger.info("No Controller Services to enable"); + return; + } else { + logger.info("Will enable {} Controller Services", servicesToEnable.size()); + } + + // Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks + // to be valid so that they can be scheduled. + for ( final List<ControllerServiceNode> branch : branches ) { + for ( final ControllerServiceNode nodeToEnable : branch ) { + nodeToEnable.setState(ControllerServiceState.ENABLING); + } + } + + final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>()); + final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size())); + for ( final List<ControllerServiceNode> branch : branches ) { + final Runnable enableBranchRunnable = new Runnable() { + @Override + public void run() { + logger.debug("Enabling Controller Service Branch {}", branch); + + for ( final ControllerServiceNode serviceNode : branch ) { + try { + if ( !enabledNodes.contains(serviceNode) ) { + enabledNodes.add(serviceNode); + + logger.info("Enabling {}", serviceNode); + try { + processScheduler.enableControllerService(serviceNode); + } catch (final Exception e) { + logger.error("Failed to enable " + serviceNode + " due to " + e); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + + if ( bulletinRepo != null ) { + bulletinRepo.addBulletin(BulletinFactory.createBulletin( + "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); + } + } + } + + // wait for service to finish enabling. + while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) { + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) {} + } + + logger.info("State for {} is now {}", serviceNode, serviceNode.getState()); + } catch (final Exception e) { + logger.error("Failed to enable {} due to {}", serviceNode, e.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + } + } + } + }; + + executor.submit(enableBranchRunnable); + } + + executor.shutdown(); + } + + static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { + final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>(); + + for ( final ControllerServiceNode node : serviceNodeMap.values() ) { + if ( orderedNodeLists.contains(node) ) { + continue; // this node is already in the list. + } + + final List<ControllerServiceNode> branch = new ArrayList<>(); + determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>()); + orderedNodeLists.add(branch); + } + + return orderedNodeLists; + } + + + private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) { + if ( visited.contains(contextNode) ) { + return; + } + + for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) { + if ( entry.getKey().getControllerServiceDefinition() != null ) { + final String referencedServiceId = entry.getValue(); + if ( referencedServiceId != null ) { + final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId); + if ( !orderedNodes.contains(referencedNode) ) { + visited.add(contextNode); + determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited); + } + } + } + } + + if ( !orderedNodes.contains(contextNode) ) { + orderedNodes.add(contextNode); + } + } + + + @Override public void disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); processScheduler.disableControllerService(serviceNode); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java deleted file mode 100644 index 9451b07..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.service; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.controller.service.mock.ServiceA; -import org.apache.nifi.controller.service.mock.ServiceB; -import org.junit.Test; - -public class TestControllerServiceLoader { - @Test - public void testOrderingOfServices() { - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null); - final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); - final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false); - - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - - final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>(); - nodeMap.put("1", serviceNode1); - nodeMap.put("2", serviceNode2); - - List<List<ControllerServiceNode>> branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(2, branches.size()); - List<ControllerServiceNode> ordered = branches.get(0); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - assertTrue(ordered.get(1) == serviceNode1); - assertEquals(1, branches.get(1).size()); - assertTrue(branches.get(1).get(0) == serviceNode2); - - nodeMap.clear(); - nodeMap.put("2", serviceNode2); - nodeMap.put("1", serviceNode1); - - branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(2, branches.size()); - ordered = branches.get(1); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - assertTrue(ordered.get(1) == serviceNode1); - assertEquals(1, branches.get(0).size()); - assertTrue(branches.get(0).get(0) == serviceNode2); - - // add circular dependency on self. - nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1"); - nodeMap.put("1", serviceNode1); - nodeMap.put("2", serviceNode2); - - branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(2, branches.size()); - ordered = branches.get(0); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - assertTrue(ordered.get(1) == serviceNode1); - - nodeMap.clear(); - nodeMap.put("2", serviceNode2); - nodeMap.put("1", serviceNode1); - branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(2, branches.size()); - ordered = branches.get(1); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - assertTrue(ordered.get(1) == serviceNode1); - - // add circular dependency once removed. In this case, we won't actually be able to enable these because of the - // circular dependency because they will never be valid because they will always depend on a disabled service. - // But we want to ensure that the method returns successfully without throwing a StackOverflowException or anything - // like that. - nodeMap.clear(); - final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1"); - nodeMap.put("1", serviceNode1); - nodeMap.put("3", serviceNode3); - branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(2, branches.size()); - ordered = branches.get(0); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode3); - assertTrue(ordered.get(1) == serviceNode1); - - nodeMap.clear(); - nodeMap.put("3", serviceNode3); - nodeMap.put("1", serviceNode1); - branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(2, branches.size()); - ordered = branches.get(1); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode3); - assertTrue(ordered.get(1) == serviceNode1); - - - // Add multiple completely disparate branches. - nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); - final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5", false); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); - nodeMap.put("1", serviceNode1); - nodeMap.put("2", serviceNode2); - nodeMap.put("3", serviceNode3); - nodeMap.put("4", serviceNode4); - nodeMap.put("5", serviceNode5); - - branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(5, branches.size()); - - ordered = branches.get(0); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - assertTrue(ordered.get(1) == serviceNode1); - - assertEquals(1, branches.get(1).size()); - assertTrue(branches.get(1).get(0) == serviceNode2); - - ordered = branches.get(2); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode4); - assertTrue(ordered.get(1) == serviceNode3); - - assertEquals(1, branches.get(3).size()); - assertTrue(branches.get(3).get(0) == serviceNode4); - - assertEquals(1, branches.get(4).size()); - assertTrue(branches.get(4).get(0) == serviceNode5); - - // create 2 branches both dependent on the same service - nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - nodeMap.put("1", serviceNode1); - nodeMap.put("2", serviceNode2); - nodeMap.put("3", serviceNode3); - - branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); - assertEquals(3, branches.size()); - - ordered = branches.get(0); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - assertTrue(ordered.get(1) == serviceNode1); - - ordered = branches.get(1); - assertEquals(1, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - - ordered = branches.get(2); - assertEquals(2, ordered.size()); - assertTrue(ordered.get(0) == serviceNode2); - assertTrue(ordered.get(1) == serviceNode3); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 46dd885..3dc1752 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.nifi.controller.ProcessScheduler; @@ -68,7 +71,7 @@ public class TestStandardControllerServiceProvider { @Test public void testDisableControllerService() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false); provider.enableControllerService(serviceNode); @@ -78,7 +81,7 @@ public class TestStandardControllerServiceProvider { @Test public void testEnableDisableWithReference() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false); final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false); @@ -108,7 +111,7 @@ public class TestStandardControllerServiceProvider { @Test public void testEnableReferencingServicesGraph() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); // build a graph of controller services with dependencies as such: // @@ -145,7 +148,7 @@ public class TestStandardControllerServiceProvider { @Test public void testStartStopReferencingComponents() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); // build a graph of reporting tasks and controller services with dependencies as such: // @@ -232,4 +235,151 @@ public class TestStandardControllerServiceProvider { provider.disableControllerService(serviceNode4); assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState()); } + + + @Test + public void testOrderingOfServices() { + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); + final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); + final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false); + + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + + final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>(); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + + List<List<ControllerServiceNode>> branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + List<ControllerServiceNode> ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + assertEquals(1, branches.get(1).size()); + assertTrue(branches.get(1).get(0) == serviceNode2); + + nodeMap.clear(); + nodeMap.put("2", serviceNode2); + nodeMap.put("1", serviceNode1); + + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(1); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + assertEquals(1, branches.get(0).size()); + assertTrue(branches.get(0).get(0) == serviceNode2); + + // add circular dependency on self. + nodeMap.clear(); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1"); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + nodeMap.clear(); + nodeMap.put("2", serviceNode2); + nodeMap.put("1", serviceNode1); + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(1); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + // add circular dependency once removed. In this case, we won't actually be able to enable these because of the + // circular dependency because they will never be valid because they will always depend on a disabled service. + // But we want to ensure that the method returns successfully without throwing a StackOverflowException or anything + // like that. + nodeMap.clear(); + final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1"); + nodeMap.put("1", serviceNode1); + nodeMap.put("3", serviceNode3); + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode3); + assertTrue(ordered.get(1) == serviceNode1); + + nodeMap.clear(); + nodeMap.put("3", serviceNode3); + nodeMap.put("1", serviceNode1); + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(1); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode3); + assertTrue(ordered.get(1) == serviceNode1); + + + // Add multiple completely disparate branches. + nodeMap.clear(); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5", false); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + nodeMap.put("3", serviceNode3); + nodeMap.put("4", serviceNode4); + nodeMap.put("5", serviceNode5); + + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(5, branches.size()); + + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + assertEquals(1, branches.get(1).size()); + assertTrue(branches.get(1).get(0) == serviceNode2); + + ordered = branches.get(2); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode4); + assertTrue(ordered.get(1) == serviceNode3); + + assertEquals(1, branches.get(3).size()); + assertTrue(branches.get(3).get(0) == serviceNode4); + + assertEquals(1, branches.get(4).size()); + assertTrue(branches.get(4).get(0) == serviceNode5); + + // create 2 branches both dependent on the same service + nodeMap.clear(); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + nodeMap.put("3", serviceNode3); + + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); + assertEquals(3, branches.size()); + + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + ordered = branches.get(1); + assertEquals(1, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + + ordered = branches.get(2); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode3); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java index a9643ab..a6a2458 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java @@ -163,7 +163,7 @@ public abstract class AbstractCacheServer implements CacheServer { stopped = true; logger.info("Stopping CacheServer {}", new Object[] { this.identifier }); - if (serverSocketChannel != null) { + if (serverSocketChannel != null && serverSocketChannel.isOpen()) { serverSocketChannel.close(); } // need to close out the created SocketChannels...this is done by interrupting
