NIFI-250: Fixed controller service handling on startup

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/800f80bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/800f80bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/800f80bc

Branch: refs/heads/NIFI-250
Commit: 800f80bc05ead5f097c9e948f619e0dd681ca592
Parents: f0c660c
Author: Mark Payne <[email protected]>
Authored: Thu Mar 19 09:28:05 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Mar 19 09:28:05 2015 -0400

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java |   7 +-
 .../service/ControllerServiceProvider.java      |   8 +
 .../apache/nifi/controller/FlowController.java  |  13 +-
 .../scheduling/StandardProcessScheduler.java    |  14 +-
 .../service/ControllerServiceLoader.java        | 124 ++-----------
 .../StandardControllerServiceProvider.java      | 145 ++++++++++++++-
 .../service/TestControllerServiceLoader.java    | 175 -------------------
 .../TestStandardControllerServiceProvider.java  | 158 ++++++++++++++++-
 .../cache/server/AbstractCacheServer.java       |   2 +-
 9 files changed, 347 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 6fe1f80..37465e9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -434,7 +434,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 
10);
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
         
-        controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler);
+        controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository);
     }
 
     public void start() throws IOException {
@@ -1411,6 +1411,11 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
     
     @Override
+    public void enableControllerServices(final 
Collection<ControllerServiceNode> serviceNodes) {
+        controllerServiceProvider.enableControllerServices(serviceNodes);
+    }
+    
+    @Override
     public void disableControllerService(final ControllerServiceNode 
serviceNode) {
         controllerServiceProvider.disableControllerService(serviceNode);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index aac65dc..1901fb6 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.service;
 
+import java.util.Collection;
 import java.util.Set;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
@@ -63,6 +64,13 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
     void enableControllerService(ControllerServiceNode serviceNode);
     
     /**
+     * Enables the collection of services. If a service in this collection 
depends on another service,
+     * the service being depended on must either already be enabled or must be 
in the collection as well.
+     * @param serviceNodes
+     */
+    void enableControllerServices(Collection<ControllerServiceNode> 
serviceNodes);
+    
+    /**
      * Disables the given controller service so that it cannot be used by 
other components. This allows
      * configuration to be updated or allows service to be removed.
      * @param serviceNode

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 26f2369..fd01711 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -403,7 +403,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         processScheduler = new StandardProcessScheduler(this, this, encryptor);
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, 
processScheduler);
-        controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler);
+        controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository);
 
         final ProcessContextFactory contextFactory = new 
ProcessContextFactory(contentRepository, flowFileRepository, 
flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, 
new EventDrivenSchedulingAgent(
@@ -598,7 +598,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                         startConnectable(connectable);
                     }
                 } catch (final Throwable t) {
-                    LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t});
+                    LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t.toString()});
+                    if ( LOG.isDebugEnabled() ) {
+                        LOG.error("", t);
+                    }
                 }
             }
 
@@ -2631,11 +2634,15 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     
     @Override
     public void enableControllerService(final ControllerServiceNode 
serviceNode) {
-        serviceNode.verifyCanEnable();
         controllerServiceProvider.enableControllerService(serviceNode);
     }
     
     @Override
+    public void enableControllerServices(final 
Collection<ControllerServiceNode> serviceNodes) {
+        controllerServiceProvider.enableControllerServices(serviceNodes);
+    }
+    
+    @Override
     public void disableControllerService(final ControllerServiceNode 
serviceNode) {
         serviceNode.verifyCanDisable();
         controllerServiceProvider.disableControllerService(serviceNode);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index b005a57..89850cc 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -281,7 +281,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         }
 
         if (!procNode.isValid()) {
-            throw new IllegalStateException("Processor " + procNode.getName() 
+ " is not in a valid state");
+            throw new IllegalStateException("Processor " + procNode.getName() 
+ " is not in a valid state due to " + procNode.getValidationErrors());
         }
 
         final Runnable startProcRunnable = new Runnable() {
@@ -301,9 +301,18 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                         }
                     }
                     
-                    while (true) {
+                    attemptOnScheduled: while (true) {
                         try {
                             synchronized (scheduleState) {
+                                for ( final String serviceId : serviceIds ) {
+                                    final boolean enabled = 
processContext.isControllerServiceEnabled(serviceId);
+                                    if ( !enabled ) {
+                                        LOG.debug("Controller Service with ID 
{} is not yet enabled, so will not start {} yet", serviceId, procNode);
+                                        
Thread.sleep(administrativeYieldMillis);
+                                        continue attemptOnScheduled;
+                                    }
+                                }
+                                
                                 // if no longer scheduled to run, then we're 
finished. This can happen, for example,
                                 // if the @OnScheduled method throws an 
Exception and the user stops the processor 
                                 // while we're administratively yielded.
@@ -607,7 +616,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     @Override
     public void enableControllerService(final ControllerServiceNode service) {
-        service.verifyCanEnable();
         service.setState(ControllerServiceState.ENABLING);
         final ScheduleState scheduleState = getScheduleState(service);
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 7c2d4df..1fde670 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -26,19 +26,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
-import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.FlowFromDOMFactory;
 import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.slf4j.Logger;
@@ -116,122 +111,29 @@ public class ControllerServiceLoader {
         
         // Start services
         if ( autoResumeState ) {
-            // determine the order to load the services. We have to ensure 
that if service A references service B, then B
-            // is enabled first, and so on.
-            final Map<String, ControllerServiceNode> idToNodeMap = new 
HashMap<>();
-            for ( final ControllerServiceNode node : nodeMap.keySet() ) {
-                idToNodeMap.put(node.getIdentifier(), node);
-            }
-            
-            // We can have many Controller Services dependent on one another. 
We can have many of these
-            // disparate lists of Controller Services that are dependent on 
one another. We refer to each
-            // of these as a branch.
-            final List<List<ControllerServiceNode>> branches = 
determineEnablingOrder(idToNodeMap);
-            
-            final ExecutorService executor = 
Executors.newFixedThreadPool(Math.min(10, branches.size()));
+            final Set<ControllerServiceNode> nodesToEnable = new HashSet<>();
             
-            for ( final List<ControllerServiceNode> branch : branches ) {
-                final Runnable enableBranchRunnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        logger.debug("Enabling Controller Service Branch {}", 
branch);
-                        
-                        for ( final ControllerServiceNode serviceNode : branch 
) {
-                            try {
-                                final Element controllerServiceElement = 
nodeMap.get(serviceNode);
-    
-                                final ControllerServiceDTO dto;
-                                synchronized 
(controllerServiceElement.getOwnerDocument()) {
-                                    dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-                                }
-                                
-                                final ControllerServiceState state = 
ControllerServiceState.valueOf(dto.getState());
-                                final boolean enable = (state == 
ControllerServiceState.ENABLED);
-                                if (enable) {
-                                    if ( 
ControllerServiceState.DISABLED.equals(serviceNode.getState()) ) {
-                                        logger.info("Enabling {}", 
serviceNode);
-                                        try {
-                                            
provider.enableControllerService(serviceNode);
-                                        } catch (final Exception e) {
-                                            logger.error("Failed to enable " + 
serviceNode + " due to " + e);
-                                            if ( logger.isDebugEnabled() ) {
-                                                logger.error("", e);
-                                            }
-                                            
-                                            
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
-                                                    "Controller Service", 
Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
-                                        }
-                                    }
-                                    
-                                    // wait for service to finish enabling.
-                                    while ( 
ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
-                                        try {
-                                            Thread.sleep(100L);
-                                        } catch (final InterruptedException 
ie) {}
-                                    }
-                                    
-                                    logger.info("State for {} is now {}", 
serviceNode, serviceNode.getState());
-                                }
-                            } catch (final Exception e) {
-                                logger.error("Failed to enable {} due to {}", 
serviceNode, e.toString());
-                                if ( logger.isDebugEnabled() ) {
-                                    logger.error("", e);
-                                }
-                            }
-                        }
-                    }
-                };
+            for ( final ControllerServiceNode node : nodeMap.keySet() ) {
+                final Element controllerServiceElement = nodeMap.get(node);
+
+                final ControllerServiceDTO dto;
+                synchronized (controllerServiceElement.getOwnerDocument()) {
+                    dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+                }
                 
-                executor.submit(enableBranchRunnable);
+                final ControllerServiceState state = 
ControllerServiceState.valueOf(dto.getState());
+                if (state == ControllerServiceState.ENABLED) {
+                    nodesToEnable.add(node);
+                }
             }
             
-            executor.shutdown();
+            provider.enableControllerServices(nodesToEnable);
         }
         
         return nodeMap.keySet();
     }
     
     
-    static List<List<ControllerServiceNode>> determineEnablingOrder(final 
Map<String, ControllerServiceNode> serviceNodeMap) {
-        final List<List<ControllerServiceNode>> orderedNodeLists = new 
ArrayList<>();
-        
-        for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
-            if ( orderedNodeLists.contains(node) ) {
-                continue;   // this node is already in the list.
-            }
-            
-            final List<ControllerServiceNode> branch = new ArrayList<>();
-            determineEnablingOrder(serviceNodeMap, node, branch, new 
HashSet<ControllerServiceNode>());
-            orderedNodeLists.add(branch);
-        }
-        
-        return orderedNodeLists;
-    }
-    
-    
-    private static void determineEnablingOrder(final Map<String, 
ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, 
final List<ControllerServiceNode> orderedNodes, final 
Set<ControllerServiceNode> visited) {
-        if ( visited.contains(contextNode) ) {
-            return;
-        }
-        
-        for ( final Map.Entry<PropertyDescriptor, String> entry : 
contextNode.getProperties().entrySet() ) {
-            if ( entry.getKey().getControllerServiceDefinition() != null ) {
-                final String referencedServiceId = entry.getValue();
-                if ( referencedServiceId != null ) {
-                    final ControllerServiceNode referencedNode = 
serviceNodeMap.get(referencedServiceId);
-                    if ( !orderedNodes.contains(referencedNode) ) {
-                        visited.add(contextNode);
-                        determineEnablingOrder(serviceNodeMap, referencedNode, 
orderedNodes, visited);
-                    }
-                }
-            }
-        }
-
-        if ( !orderedNodes.contains(contextNode) ) {
-            orderedNodes.add(contextNode);
-        }
-    }
-    
     private static ControllerServiceNode createControllerService(final 
ControllerServiceProvider provider, final Element controllerServiceElement, 
final StringEncryptor encryptor) {
         final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index b2d61bd..ded55b4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -23,13 +23,17 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -44,11 +48,14 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
 import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
@@ -64,6 +71,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
     private final ProcessScheduler processScheduler;
     private final ConcurrentMap<String, ControllerServiceNode> 
controllerServices;
     private static final Set<Method> validDisabledMethods;
+    private final BulletinRepository bulletinRepo;
 
     static {
         // methods that are okay to be called when the service is disabled.
@@ -77,11 +85,12 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         validDisabledMethods = Collections.unmodifiableSet(validMethods);
     }
 
-    public StandardControllerServiceProvider(final ProcessScheduler scheduler) 
{
+    public StandardControllerServiceProvider(final ProcessScheduler scheduler, 
final BulletinRepository bulletinRepo) {
         // the following 2 maps must be updated atomically, but we do not lock 
around them because they are modified
         // only in the createControllerService method, and both are modified 
before the method returns
         this.controllerServices = new ConcurrentHashMap<>();
         this.processScheduler = scheduler;
+        this.bulletinRepo = bulletinRepo;
     }
 
     private Class<?>[] getInterfaces(final Class<?> cls) {
@@ -285,6 +294,140 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
     }
     
     @Override
+    public void enableControllerServices(final 
Collection<ControllerServiceNode> serviceNodes) {
+        final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
+        // Ensure that all nodes are already disabled
+        for ( final ControllerServiceNode serviceNode : serviceNodes ) {
+            final ControllerServiceState curState = serviceNode.getState();
+            if ( ControllerServiceState.DISABLED.equals(curState) ) {
+                servicesToEnable.add(serviceNode);
+            } else {
+                logger.warn("Cannot enable {} because it is not disabled; 
current state is {}", serviceNode, curState);
+            }
+        }
+        
+        // determine the order to load the services. We have to ensure that if 
service A references service B, then B
+        // is enabled first, and so on.
+        final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
+        for ( final ControllerServiceNode node : servicesToEnable ) {
+            idToNodeMap.put(node.getIdentifier(), node);
+        }
+        
+        // We can have many Controller Services dependent on one another. We 
can have many of these
+        // disparate lists of Controller Services that are dependent on one 
another. We refer to each
+        // of these as a branch.
+        final List<List<ControllerServiceNode>> branches = 
determineEnablingOrder(idToNodeMap);
+
+        if ( branches.isEmpty() ) {
+            logger.info("No Controller Services to enable");
+            return;
+        } else {
+            logger.info("Will enable {} Controller Services", 
servicesToEnable.size());
+        }
+        
+        // Mark all services that are configured to be enabled as 'ENABLING'. 
This allows Processors, reporting tasks
+        // to be valid so that they can be scheduled.
+        for ( final List<ControllerServiceNode> branch : branches ) {
+            for ( final ControllerServiceNode nodeToEnable : branch ) {
+                nodeToEnable.setState(ControllerServiceState.ENABLING);
+            }
+        }
+        
+        final Set<ControllerServiceNode> enabledNodes = 
Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
+        final ExecutorService executor = 
Executors.newFixedThreadPool(Math.min(10, branches.size()));
+        for ( final List<ControllerServiceNode> branch : branches ) {
+            final Runnable enableBranchRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    logger.debug("Enabling Controller Service Branch {}", 
branch);
+                    
+                    for ( final ControllerServiceNode serviceNode : branch ) {
+                        try {
+                            if ( !enabledNodes.contains(serviceNode) ) {
+                                enabledNodes.add(serviceNode);
+                                
+                                logger.info("Enabling {}", serviceNode);
+                                try {
+                                    
processScheduler.enableControllerService(serviceNode);
+                                } catch (final Exception e) {
+                                    logger.error("Failed to enable " + 
serviceNode + " due to " + e);
+                                    if ( logger.isDebugEnabled() ) {
+                                        logger.error("", e);
+                                    }
+                                    
+                                    if ( bulletinRepo != null ) {
+                                        
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
+                                            "Controller Service", 
Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
+                                    }
+                                }
+                            }
+                            
+                            // wait for service to finish enabling.
+                            while ( 
ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
+                                try {
+                                    Thread.sleep(100L);
+                                } catch (final InterruptedException ie) {}
+                            }
+                            
+                            logger.info("State for {} is now {}", serviceNode, 
serviceNode.getState());
+                        } catch (final Exception e) {
+                            logger.error("Failed to enable {} due to {}", 
serviceNode, e.toString());
+                            if ( logger.isDebugEnabled() ) {
+                                logger.error("", e);
+                            }
+                        }
+                    }
+                }
+            };
+            
+            executor.submit(enableBranchRunnable);
+        }
+        
+        executor.shutdown();
+    }
+    
+    static List<List<ControllerServiceNode>> determineEnablingOrder(final 
Map<String, ControllerServiceNode> serviceNodeMap) {
+        final List<List<ControllerServiceNode>> orderedNodeLists = new 
ArrayList<>();
+        
+        for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
+            if ( orderedNodeLists.contains(node) ) {
+                continue;   // this node is already in the list.
+            }
+            
+            final List<ControllerServiceNode> branch = new ArrayList<>();
+            determineEnablingOrder(serviceNodeMap, node, branch, new 
HashSet<ControllerServiceNode>());
+            orderedNodeLists.add(branch);
+        }
+        
+        return orderedNodeLists;
+    }
+    
+    
+    private static void determineEnablingOrder(final Map<String, 
ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, 
final List<ControllerServiceNode> orderedNodes, final 
Set<ControllerServiceNode> visited) {
+        if ( visited.contains(contextNode) ) {
+            return;
+        }
+        
+        for ( final Map.Entry<PropertyDescriptor, String> entry : 
contextNode.getProperties().entrySet() ) {
+            if ( entry.getKey().getControllerServiceDefinition() != null ) {
+                final String referencedServiceId = entry.getValue();
+                if ( referencedServiceId != null ) {
+                    final ControllerServiceNode referencedNode = 
serviceNodeMap.get(referencedServiceId);
+                    if ( !orderedNodes.contains(referencedNode) ) {
+                        visited.add(contextNode);
+                        determineEnablingOrder(serviceNodeMap, referencedNode, 
orderedNodes, visited);
+                    }
+                }
+            }
+        }
+
+        if ( !orderedNodes.contains(contextNode) ) {
+            orderedNodes.add(contextNode);
+        }
+    }
+    
+    
+    @Override
     public void disableControllerService(final ControllerServiceNode 
serviceNode) {
         serviceNode.verifyCanDisable();
         processScheduler.disableControllerService(serviceNode);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
deleted file mode 100644
index 9451b07..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.service;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.controller.service.mock.ServiceA;
-import org.apache.nifi.controller.service.mock.ServiceB;
-import org.junit.Test;
-
-public class TestControllerServiceLoader {
-    @Test
-    public void testOrderingOfServices() {
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null);
-        final ControllerServiceNode serviceNode1 = 
provider.createControllerService(ServiceA.class.getName(), "1", false);
-        final ControllerServiceNode serviceNode2 = 
provider.createControllerService(ServiceB.class.getName(), "2", false);
-
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-
-        final Map<String, ControllerServiceNode> nodeMap = new 
LinkedHashMap<>();
-        nodeMap.put("1", serviceNode1);
-        nodeMap.put("2", serviceNode2);
-        
-        List<List<ControllerServiceNode>> branches = 
ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(2, branches.size());
-        List<ControllerServiceNode> ordered = branches.get(0);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        assertTrue(ordered.get(1) == serviceNode1);
-        assertEquals(1, branches.get(1).size());
-        assertTrue(branches.get(1).get(0) == serviceNode2);
-        
-        nodeMap.clear();
-        nodeMap.put("2", serviceNode2);
-        nodeMap.put("1", serviceNode1);
-        
-        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(2, branches.size());
-        ordered = branches.get(1);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        assertTrue(ordered.get(1) == serviceNode1);
-        assertEquals(1, branches.get(0).size());
-        assertTrue(branches.get(0).get(0) == serviceNode2);
-        
-        // add circular dependency on self.
-        nodeMap.clear();
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
-        nodeMap.put("1", serviceNode1);
-        nodeMap.put("2", serviceNode2);
-        
-        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(2, branches.size());
-        ordered = branches.get(0);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        assertTrue(ordered.get(1) == serviceNode1);
-        
-        nodeMap.clear();
-        nodeMap.put("2", serviceNode2);
-        nodeMap.put("1", serviceNode1);
-        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(2, branches.size());
-        ordered = branches.get(1);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        assertTrue(ordered.get(1) == serviceNode1);
-        
-        // add circular dependency once removed. In this case, we won't 
actually be able to enable these because of the
-        // circular dependency because they will never be valid because they 
will always depend on a disabled service.
-        // But we want to ensure that the method returns successfully without 
throwing a StackOverflowException or anything
-        // like that.
-        nodeMap.clear();
-        final ControllerServiceNode serviceNode3 = 
provider.createControllerService(ServiceA.class.getName(), "3", false);
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
-        nodeMap.put("1", serviceNode1);
-        nodeMap.put("3", serviceNode3);
-        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(2, branches.size());
-        ordered = branches.get(0);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode3);
-        assertTrue(ordered.get(1) == serviceNode1);
-        
-        nodeMap.clear();
-        nodeMap.put("3", serviceNode3);
-        nodeMap.put("1", serviceNode1);
-        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(2, branches.size());
-        ordered = branches.get(1);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode3);
-        assertTrue(ordered.get(1) == serviceNode1);
-        
-        
-        // Add multiple completely disparate branches.
-        nodeMap.clear();
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        final ControllerServiceNode serviceNode4 = 
provider.createControllerService(ServiceB.class.getName(), "4", false);
-        final ControllerServiceNode serviceNode5 = 
provider.createControllerService(ServiceB.class.getName(), "5", false);
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
-        nodeMap.put("1", serviceNode1);
-        nodeMap.put("2", serviceNode2);
-        nodeMap.put("3", serviceNode3);
-        nodeMap.put("4", serviceNode4);
-        nodeMap.put("5", serviceNode5);
-        
-        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(5, branches.size());
-
-        ordered = branches.get(0);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        assertTrue(ordered.get(1) == serviceNode1);
-        
-        assertEquals(1, branches.get(1).size());
-        assertTrue(branches.get(1).get(0) == serviceNode2);
-        
-        ordered = branches.get(2);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode4);
-        assertTrue(ordered.get(1) == serviceNode3);
-        
-        assertEquals(1, branches.get(3).size());
-        assertTrue(branches.get(3).get(0) == serviceNode4);
-        
-        assertEquals(1, branches.get(4).size());
-        assertTrue(branches.get(4).get(0) == serviceNode5);
-        
-        // create 2 branches both dependent on the same service
-        nodeMap.clear();
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        nodeMap.put("1", serviceNode1);
-        nodeMap.put("2", serviceNode2);
-        nodeMap.put("3", serviceNode3);
-        
-        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
-        assertEquals(3, branches.size());
-        
-        ordered = branches.get(0);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        assertTrue(ordered.get(1) == serviceNode1);
-        
-        ordered = branches.get(1);
-        assertEquals(1, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        
-        ordered = branches.get(2);
-        assertEquals(2, ordered.size());
-        assertTrue(ordered.get(0) == serviceNode2);
-        assertTrue(ordered.get(1) == serviceNode3);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 46dd885..3dc1752 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.nifi.controller.ProcessScheduler;
@@ -68,7 +71,7 @@ public class TestStandardControllerServiceProvider {
     @Test
     public void testDisableControllerService() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
         
         final ControllerServiceNode serviceNode = 
provider.createControllerService(ServiceB.class.getName(), "B", false);
         provider.enableControllerService(serviceNode);
@@ -78,7 +81,7 @@ public class TestStandardControllerServiceProvider {
     @Test
     public void testEnableDisableWithReference() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
         
         final ControllerServiceNode serviceNodeB = 
provider.createControllerService(ServiceB.class.getName(), "B", false);
         final ControllerServiceNode serviceNodeA = 
provider.createControllerService(ServiceA.class.getName(), "A", false);
@@ -108,7 +111,7 @@ public class TestStandardControllerServiceProvider {
     @Test
     public void testEnableReferencingServicesGraph() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
         
         // build a graph of controller services with dependencies as such:
         //
@@ -145,7 +148,7 @@ public class TestStandardControllerServiceProvider {
     @Test
     public void testStartStopReferencingComponents() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
         
         // build a graph of reporting tasks and controller services with 
dependencies as such:
         //
@@ -232,4 +235,151 @@ public class TestStandardControllerServiceProvider {
         provider.disableControllerService(serviceNode4);
         assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState());
     }
+    
+    
+    @Test
+    public void testOrderingOfServices() {
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null);
+        final ControllerServiceNode serviceNode1 = 
provider.createControllerService(ServiceA.class.getName(), "1", false);
+        final ControllerServiceNode serviceNode2 = 
provider.createControllerService(ServiceB.class.getName(), "2", false);
+
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+
+        final Map<String, ControllerServiceNode> nodeMap = new 
LinkedHashMap<>();
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        
+        List<List<ControllerServiceNode>> branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        List<ControllerServiceNode> ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        assertEquals(1, branches.get(1).size());
+        assertTrue(branches.get(1).get(0) == serviceNode2);
+        
+        nodeMap.clear();
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("1", serviceNode1);
+        
+        branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(1);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        assertEquals(1, branches.get(0).size());
+        assertTrue(branches.get(0).get(0) == serviceNode2);
+        
+        // add circular dependency on self.
+        nodeMap.clear();
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        
+        branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        nodeMap.clear();
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("1", serviceNode1);
+        branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(1);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        // add circular dependency once removed. In this case, we won't 
actually be able to enable these because of the
+        // circular dependency because they will never be valid because they 
will always depend on a disabled service.
+        // But we want to ensure that the method returns successfully without 
throwing a StackOverflowException or anything
+        // like that.
+        nodeMap.clear();
+        final ControllerServiceNode serviceNode3 = 
provider.createControllerService(ServiceA.class.getName(), "3", false);
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("3", serviceNode3);
+        branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode3);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        nodeMap.clear();
+        nodeMap.put("3", serviceNode3);
+        nodeMap.put("1", serviceNode1);
+        branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(1);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode3);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        
+        // Add multiple completely disparate branches.
+        nodeMap.clear();
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        final ControllerServiceNode serviceNode4 = 
provider.createControllerService(ServiceB.class.getName(), "4", false);
+        final ControllerServiceNode serviceNode5 = 
provider.createControllerService(ServiceB.class.getName(), "5", false);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("3", serviceNode3);
+        nodeMap.put("4", serviceNode4);
+        nodeMap.put("5", serviceNode5);
+        
+        branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(5, branches.size());
+
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        assertEquals(1, branches.get(1).size());
+        assertTrue(branches.get(1).get(0) == serviceNode2);
+        
+        ordered = branches.get(2);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode4);
+        assertTrue(ordered.get(1) == serviceNode3);
+        
+        assertEquals(1, branches.get(3).size());
+        assertTrue(branches.get(3).get(0) == serviceNode4);
+        
+        assertEquals(1, branches.get(4).size());
+        assertTrue(branches.get(4).get(0) == serviceNode5);
+        
+        // create 2 branches both dependent on the same service
+        nodeMap.clear();
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("3", serviceNode3);
+        
+        branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
+        assertEquals(3, branches.size());
+        
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        ordered = branches.get(1);
+        assertEquals(1, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        
+        ordered = branches.get(2);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode3);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/800f80bc/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
index a9643ab..a6a2458 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -163,7 +163,7 @@ public abstract class AbstractCacheServer implements 
CacheServer {
         stopped = true;
         logger.info("Stopping CacheServer {}", new Object[] { this.identifier 
});
 
-        if (serverSocketChannel != null) {
+        if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
             serverSocketChannel.close();
         }
         // need to close out the created SocketChannels...this is done by 
interrupting

Reply via email to