Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 42f8e8198 -> c63a8c05f


NIFI-250: Fix the way that services are restored at startup


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d7210da4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d7210da4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d7210da4

Branch: refs/heads/NIFI-250
Commit: d7210da40eda444267133fdfbbf2a8a378590e82
Parents: dddf5e8
Author: Mark Payne <[email protected]>
Authored: Wed Mar 18 09:37:06 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Mar 18 09:37:06 2015 -0400

----------------------------------------------------------------------
 .../nifi/components/PropertyDescriptor.java     |   6 +-
 .../controller/ControllerServiceLookup.java     |  12 ++
 .../nifi/util/MockControllerServiceLookup.java  |   5 +
 .../MockProcessorInitializationContext.java     |   5 +
 .../apache/nifi/util/MockValidationContext.java |   5 +
 .../cluster/manager/impl/WebClusterManager.java |  64 +++++--
 .../apache/nifi/controller/FlowController.java  |   5 +
 .../controller/StandardFlowSynchronizer.java    | 158 ++++++++---------
 .../reporting/AbstractReportingTaskNode.java    |   6 +
 .../reporting/StandardReportingContext.java     |   5 +
 .../StandardReportingInitializationContext.java |   5 +
 .../scheduling/StandardProcessScheduler.java    |  15 +-
 .../service/ControllerServiceLoader.java        | 161 ++++++++++++-----
 ...dControllerServiceInitializationContext.java |   5 +
 .../StandardControllerServiceProvider.java      |  10 +-
 .../nifi/processor/StandardProcessContext.java  |   5 +
 .../service/TestControllerServiceLoader.java    | 175 +++++++++++++++++++
 .../processor/TestStandardPropertyValue.java    |   5 +
 .../cache/server/DistributedCacheServer.java    |   4 +-
 19 files changed, 514 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
index 65ef0ba..e62ff79 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
@@ -150,9 +150,11 @@ public final class PropertyDescriptor implements 
Comparable<PropertyDescriptor>
                         .build();
                 }
                 
-                if 
(!context.getControllerServiceLookup().isControllerServiceEnabled(controllerService))
 {
+                final String serviceId = controllerService.getIdentifier();
+                if 
(!context.getControllerServiceLookup().isControllerServiceEnabled(serviceId) && 
+                    
!context.getControllerServiceLookup().isControllerServiceEnabling(serviceId)) {
                     return new ValidationResult.Builder()
-                            
.input(context.getControllerServiceLookup().getControllerServiceName(controllerService.getIdentifier()))
+                            
.input(context.getControllerServiceLookup().getControllerServiceName(serviceId))
                             .subject(getName())
                             .valid(false)
                             .explanation("Controller Service " + 
controllerService + " is disabled")

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java
index 25167ad..4b96f62 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java
@@ -42,6 +42,18 @@ public interface ControllerServiceLookup {
     boolean isControllerServiceEnabled(String serviceIdentifier);
 
     /**
+     * Returns <code>true</code> if the Controller Service with the given
+     * identifier has been enabled but is still in the transitioning state,
+     * otherwise returns <code>false</code>.
+     * If the given identifier is not known by this ControllerServiceLookup,
+     * returns <code>false</code>.
+     * 
+     * @param serviceIdentifier
+     * @return
+     */
+    boolean isControllerServiceEnabling(String serviceIdentifier);
+    
+    /**
      * Returns <code>true</code> if the given Controller Service is enabled,
      * <code>false</code> otherwise. If the given Controller Service is not
      * known by this ControllerServiceLookup, returns <code>false</code>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
index c804d5f..2734440 100644
--- 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
+++ 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
@@ -77,6 +77,11 @@ public abstract class MockControllerServiceLookup implements 
ControllerServiceLo
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) 
{
+        return false;
+    }
+    
+    @Override
     public Set<String> getControllerServiceIdentifiers(final Class<? extends 
ControllerService> serviceType) {
         final Set<String> ids = new HashSet<>();
         for (final Map.Entry<String, ControllerServiceConfiguration> entry : 
controllerServiceMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
index 582ffb1..0aa2749 100644
--- 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
+++ 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
@@ -76,4 +76,9 @@ public class MockProcessorInitializationContext implements 
ProcessorInitializati
     public boolean isControllerServiceEnabled(ControllerService service) {
         return context.isControllerServiceEnabled(service);
     }
+
+    @Override
+    public boolean isControllerServiceEnabling(String serviceIdentifier) {
+        return context.isControllerServiceEnabling(serviceIdentifier);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
index 6d32d0b..bf6ed1d 100644
--- 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
+++ 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
@@ -101,4 +101,9 @@ public class MockValidationContext implements 
ValidationContext, ControllerServi
     public boolean isValidationRequired(final ControllerService service) {
         return true;
     }
+
+    @Override
+    public boolean isControllerServiceEnabling(String serviceIdentifier) {
+        return context.isControllerServiceEnabling(serviceIdentifier);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index d3688af..6fe1f80 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -127,6 +127,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.StandardFlowSerializer;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
@@ -219,6 +220,7 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
+
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
@@ -941,6 +943,9 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                 final String taskSchedulingPeriod = 
DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
                 final String taskClass = DomUtils.getChild(taskElement, 
"class").getTextContent().trim();
 
+                final String scheduleStateValue = 
DomUtils.getChild(taskElement, "scheduledState").getTextContent().trim();
+                final ScheduledState scheduledState = 
ScheduledState.valueOf(scheduleStateValue);
+                
                 //optional task-specific properties
                 for (final Element optionalProperty : 
DomUtils.getChildElementsByTagName(taskElement, "property")) {
                     final String name = optionalProperty.getAttribute("name");
@@ -980,7 +985,23 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
                     reportingTaskNode.setProperty(entry.getKey().getName(), 
entry.getValue());
                 }
 
-                processScheduler.schedule(reportingTaskNode);
+                reportingTaskNode.setScheduledState(scheduledState);
+                if ( ScheduledState.RUNNING.equals(scheduledState) ) {
+                    if ( reportingTaskNode.isValid() ) {
+                        try {
+                            processScheduler.schedule(reportingTaskNode);
+                        } catch (final Exception e) {
+                            logger.error("Failed to start {} due to {}", 
reportingTaskNode, e);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.error("", e);
+                            }
+                        }
+                    } else {
+                        logger.error("Failed to start {} because it is invalid 
due to {}", reportingTaskNode, reportingTaskNode.getValidationErrors());
+                    }
+                }
+                
+                
                 tasks.put(reportingTaskNode.getIdentifier(), 
reportingTaskNode);
             }
         } catch (final SAXException | ParserConfigurationException | 
IOException | DOMException | NumberFormatException | InitializationException t) 
{
@@ -1369,6 +1390,11 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) 
{
+        return 
controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
     public String getControllerServiceName(final String serviceIdentifier) {
        return 
controllerServiceProvider.getControllerServiceName(serviceIdentifier);
     }
@@ -2621,24 +2647,26 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             final Set<ControllerServiceReferencingComponentDTO> 
nodeReferencingComponents = nodeEntry.getValue();
 
             // go through all the nodes referencing components
-            for (final ControllerServiceReferencingComponentDTO 
nodeReferencingComponent : nodeReferencingComponents) {
-                // handle active thread counts
-                if (nodeReferencingComponent.getActiveThreadCount() != null && 
nodeReferencingComponent.getActiveThreadCount() > 0) {
-                    final Integer current = 
activeThreadCounts.get(nodeReferencingComponent.getId());
-                    if (current == null) {
-                        
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount());
-                    } else {
-                        
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount() + current);
+            if ( nodeReferencingComponents != null ) {
+                for (final ControllerServiceReferencingComponentDTO 
nodeReferencingComponent : nodeReferencingComponents) {
+                    // handle active thread counts
+                    if (nodeReferencingComponent.getActiveThreadCount() != 
null && nodeReferencingComponent.getActiveThreadCount() > 0) {
+                        final Integer current = 
activeThreadCounts.get(nodeReferencingComponent.getId());
+                        if (current == null) {
+                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount());
+                        } else {
+                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount() + current);
+                        }
                     }
-                }
-                
-                // handle controller service state
-                final String state = 
states.get(nodeReferencingComponent.getId());
-                if (state == null) {
-                    if 
(ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState()))
 {
-                        states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.DISABLING.name());
-                    } else if 
(ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState()))
 {
-                        states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.ENABLING.name());
+                    
+                    // handle controller service state
+                    final String state = 
states.get(nodeReferencingComponent.getId());
+                    if (state == null) {
+                        if 
(ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.DISABLING.name());
+                        } else if 
(ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.ENABLING.name());
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index a04ae3a..26f2369 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2682,6 +2682,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) 
{
+        return 
controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
     public String getControllerServiceName(final String serviceIdentifier) {
        return 
controllerServiceProvider.getControllerServiceName(serviceIdentifier);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 2348dcb..087272c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -214,88 +214,90 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         logger.trace("Parsing proposed flow bytes as DOM document");
         final Document configuration = parseFlowBytes(proposedFlow.getFlow());
 
-        // attempt to sync controller with proposed flow
-        try {
-            if (configuration != null) {
-                // get the root element
-                final Element rootElement = (Element) 
configuration.getElementsByTagName("flowController").item(0);
-
-                // set controller config
-                logger.trace("Updating flow config");
-                final Integer maxThreadCount = getInteger(rootElement, 
"maxThreadCount");
-                if (maxThreadCount == null) {
-                    
controller.setMaxTimerDrivenThreadCount(getInt(rootElement, 
"maxTimerDrivenThreadCount"));
-                    
controller.setMaxEventDrivenThreadCount(getInt(rootElement, 
"maxEventDrivenThreadCount"));
-                } else {
-                    controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 
/ 3);
-                    controller.setMaxEventDrivenThreadCount(maxThreadCount / 
3);
-                }
-
-                // get the root group XML element
-                final Element rootGroupElement = (Element) 
rootElement.getElementsByTagName("rootGroup").item(0);
-
-                final Element controllerServicesElement = (Element) 
DomUtils.getChild(rootElement, "controllerServices");
-                   if ( controllerServicesElement != null ) {
-                       final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServicesElement, 
"controllerService");
-                       
-                       if ( !initialized || existingFlowEmpty ) {
-                           
ControllerServiceLoader.loadControllerServices(serviceElements, controller, 
encryptor, controller.getBulletinRepository(), autoResumeState);
-                       } else {
-                           for ( final Element serviceElement : 
serviceElements ) {
-                               updateControllerService(controller, 
serviceElement, encryptor);
-                           }
-                       }
-                }
-
-                // if this controller isn't initialized or its emtpy, add the 
root group, otherwise update
-                if (!initialized || existingFlowEmpty) {
-                    logger.trace("Adding root process group");
-                    addProcessGroup(controller, /* parent group */ null, 
rootGroupElement, encryptor);
-                } else {
-                    logger.trace("Updating root process group");
-                    updateProcessGroup(controller, /* parent group */ null, 
rootGroupElement, encryptor);
-                }
-
-                final Element reportingTasksElement = (Element) 
DomUtils.getChild(rootElement, "reportingTasks");
-                if ( reportingTasksElement != null ) {
-                       final List<Element> taskElements = 
DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
-                       for ( final Element taskElement : taskElements ) {
-                               if ( !initialized || existingFlowEmpty ) {
-                                       addReportingTask(controller, 
taskElement, encryptor);
-                               } else {
-                                       updateReportingTask(controller, 
taskElement, encryptor);
-                               }
-                       }
+        synchronized (configuration) {
+            // attempt to sync controller with proposed flow
+            try {
+                if (configuration != null) {
+                    // get the root element
+                    final Element rootElement = (Element) 
configuration.getElementsByTagName("flowController").item(0);
+    
+                    // set controller config
+                    logger.trace("Updating flow config");
+                    final Integer maxThreadCount = getInteger(rootElement, 
"maxThreadCount");
+                    if (maxThreadCount == null) {
+                        
controller.setMaxTimerDrivenThreadCount(getInt(rootElement, 
"maxTimerDrivenThreadCount"));
+                        
controller.setMaxEventDrivenThreadCount(getInt(rootElement, 
"maxEventDrivenThreadCount"));
+                    } else {
+                        controller.setMaxTimerDrivenThreadCount(maxThreadCount 
* 2 / 3);
+                        controller.setMaxEventDrivenThreadCount(maxThreadCount 
/ 3);
+                    }
+    
+                    // get the root group XML element
+                    final Element rootGroupElement = (Element) 
rootElement.getElementsByTagName("rootGroup").item(0);
+    
+                    final Element controllerServicesElement = (Element) 
DomUtils.getChild(rootElement, "controllerServices");
+                   if ( controllerServicesElement != null ) {
+                       final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServicesElement, 
"controllerService");
+                       
+                       if ( !initialized || existingFlowEmpty ) {
+                           
ControllerServiceLoader.loadControllerServices(serviceElements, controller, 
encryptor, controller.getBulletinRepository(), autoResumeState);
+                       } else {
+                           for ( final Element serviceElement : 
serviceElements ) {
+                               updateControllerService(controller, 
serviceElement, encryptor);
+                           }
+                       }
+                    }
+    
+                    // if this controller isn't initialized or its emtpy, add 
the root group, otherwise update
+                    if (!initialized || existingFlowEmpty) {
+                        logger.trace("Adding root process group");
+                        addProcessGroup(controller, /* parent group */ null, 
rootGroupElement, encryptor);
+                    } else {
+                        logger.trace("Updating root process group");
+                        updateProcessGroup(controller, /* parent group */ 
null, rootGroupElement, encryptor);
+                    }
+    
+                    final Element reportingTasksElement = (Element) 
DomUtils.getChild(rootElement, "reportingTasks");
+                    if ( reportingTasksElement != null ) {
+                       final List<Element> taskElements = 
DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
+                       for ( final Element taskElement : taskElements ) {
+                               if ( !initialized || existingFlowEmpty ) {
+                                       addReportingTask(controller, 
taskElement, encryptor);
+                               } else {
+                                       updateReportingTask(controller, 
taskElement, encryptor);
+                               }
+                       }
+                    }
                 }
-            }
-
-            logger.trace("Synching templates");
-            if ((existingTemplates == null || existingTemplates.length == 0) 
&& proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 
0) {
-                // need to load templates
-                final TemplateManager templateManager = 
controller.getTemplateManager();
-                final List<Template> proposedTemplateList = 
TemplateManager.parseBytes(proposedFlow.getTemplates());
-                for (final Template template : proposedTemplateList) {
-                    templateManager.addTemplate(template.getDetails());
+    
+                logger.trace("Synching templates");
+                if ((existingTemplates == null || existingTemplates.length == 
0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length 
> 0) {
+                    // need to load templates
+                    final TemplateManager templateManager = 
controller.getTemplateManager();
+                    final List<Template> proposedTemplateList = 
TemplateManager.parseBytes(proposedFlow.getTemplates());
+                    for (final Template template : proposedTemplateList) {
+                        templateManager.addTemplate(template.getDetails());
+                    }
                 }
-            }
-
-            // clear the snippets that are currently in memory
-            logger.trace("Clearing existing snippets");
-            final SnippetManager snippetManager = 
controller.getSnippetManager();
-            snippetManager.clear();
-
-            // if proposed flow has any snippets, load them
-            logger.trace("Loading proposed snippets");
-            final byte[] proposedSnippets = proposedFlow.getSnippets();
-            if (proposedSnippets != null && proposedSnippets.length > 0) {
-                for (final StandardSnippet snippet : 
SnippetManager.parseBytes(proposedSnippets)) {
-                    snippetManager.addSnippet(snippet);
+    
+                // clear the snippets that are currently in memory
+                logger.trace("Clearing existing snippets");
+                final SnippetManager snippetManager = 
controller.getSnippetManager();
+                snippetManager.clear();
+    
+                // if proposed flow has any snippets, load them
+                logger.trace("Loading proposed snippets");
+                final byte[] proposedSnippets = proposedFlow.getSnippets();
+                if (proposedSnippets != null && proposedSnippets.length > 0) {
+                    for (final StandardSnippet snippet : 
SnippetManager.parseBytes(proposedSnippets)) {
+                        snippetManager.addSnippet(snippet);
+                    }
                 }
+    
+                logger.debug("Finished synching flows");
+            } catch (final Exception ex) {
+                throw new FlowSynchronizationException(ex);
             }
-
-            logger.debug("Finished synching flows");
-        } catch (final Exception ex) {
-            throw new FlowSynchronizationException(ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 272c0ba..05b3a06 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -245,4 +245,10 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
             }
         }
     }
+    
+    
+    @Override
+    public String toString() {
+        return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + 
"]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 2b49d76..3d57533 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -124,6 +124,11 @@ public class StandardReportingContext implements 
ReportingContext, ControllerSer
     public boolean isControllerServiceEnabled(final String serviceIdentifier) {
         return serviceProvider.isControllerServiceEnabled(serviceIdentifier);
     }
+    
+    @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) 
{
+        return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
 
     @Override
     public ControllerServiceLookup getControllerServiceLookup() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
index a2cf8ee..435dbd0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
@@ -94,6 +94,11 @@ public class StandardReportingInitializationContext 
implements ReportingInitiali
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) 
{
+        return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
     public ControllerServiceLookup getControllerServiceLookup() {
         return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 087ec68..b005a57 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -19,6 +19,8 @@ package org.apache.nifi.controller.scheduling;
 import static java.util.Objects.requireNonNull;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -32,11 +34,13 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
@@ -281,13 +285,22 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         }
 
         final Runnable startProcRunnable = new Runnable() {
-            @SuppressWarnings("deprecation")
             @Override
+            @SuppressWarnings("deprecation")
             public void run() {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
                     long lastStopTime = scheduleState.getLastStopTime();
                     final StandardProcessContext processContext = new 
StandardProcessContext(procNode, controllerServiceProvider, encryptor);
 
+                    final Set<String> serviceIds = new HashSet<>();
+                    for ( final PropertyDescriptor descriptor : 
processContext.getProperties().keySet() ) {
+                        final Class<? extends ControllerService> 
serviceDefinition = descriptor.getControllerServiceDefinition();
+                        if ( serviceDefinition != null ) {
+                            final String serviceId = 
processContext.getProperty(descriptor).getValue();
+                            serviceIds.add(serviceId);
+                        }
+                    }
+                    
                     while (true) {
                         try {
                             synchronized (scheduleState) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 32d9c46..7c2d4df 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -22,15 +22,18 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.FlowFromDOMFactory;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.events.BulletinFactory;
@@ -38,6 +41,8 @@ import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.xml.sax.SAXException;
@@ -48,7 +53,7 @@ import org.xml.sax.SAXParseException;
  */
 public class ControllerServiceLoader {
 
-    private static final Log logger = 
LogFactory.getLog(ControllerServiceLoader.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(ControllerServiceLoader.class);
 
 
     public static List<ControllerServiceNode> loadControllerServices(final 
ControllerServiceProvider provider, final InputStream serializedStream, final 
StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean 
autoResumeState) throws IOException {
@@ -89,7 +94,7 @@ public class ControllerServiceLoader {
             });
             
             final Document document = builder.parse(in);
-            final Element controllerServices = 
DomUtils.getChild(document.getDocumentElement(), "controllerServices");
+            final Element controllerServices = document.getDocumentElement();
             final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
             return new 
ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, 
provider, encryptor, bulletinRepo, autoResumeState));
         } catch (SAXException | ParserConfigurationException sxe) {
@@ -98,57 +103,135 @@ public class ControllerServiceLoader {
     }
     
     public static Collection<ControllerServiceNode> 
loadControllerServices(final List<Element> serviceElements, final 
ControllerServiceProvider provider, final StringEncryptor encryptor, final 
BulletinRepository bulletinRepo, final boolean autoResumeState) {
-        final Map<Element, ControllerServiceNode> nodeMap = new HashMap<>();
+        final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
         for ( final Element serviceElement : serviceElements ) {
             final ControllerServiceNode serviceNode = 
createControllerService(provider, serviceElement, encryptor);
-            nodeMap.put(serviceElement, serviceNode);
+            // We need to clone the node because it will be used in a separate 
thread below, and 
+            // Element is not thread-safe.
+            nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true));
         }
-        for ( final Map.Entry<Element, ControllerServiceNode> entry : 
nodeMap.entrySet() ) {
-            configureControllerService(entry.getValue(), entry.getKey(), 
encryptor);
+        for ( final Map.Entry<ControllerServiceNode, Element> entry : 
nodeMap.entrySet() ) {
+            configureControllerService(entry.getKey(), entry.getValue(), 
encryptor);
         }
         
         // Start services
         if ( autoResumeState ) {
-            for ( final Map.Entry<Element, ControllerServiceNode> entry : 
nodeMap.entrySet() ) {
-                final Element controllerServiceElement = entry.getKey();
-                final ControllerServiceNode serviceNode = entry.getValue();
-                
-                final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-                final ControllerServiceState state = 
ControllerServiceState.valueOf(dto.getState());
-                final boolean enable = (state == 
ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
-                if (enable) {
-                    try {
-                        provider.enableReferencingServices(serviceNode);
-                    } catch (final Exception e) {
-                        logger.error("Failed to enable " + serviceNode + " due 
to " + e);
-                        if ( logger.isDebugEnabled() ) {
-                            logger.error("", e);
-                        }
+            // determine the order to load the services. We have to ensure 
that if service A references service B, then B
+            // is enabled first, and so on.
+            final Map<String, ControllerServiceNode> idToNodeMap = new 
HashMap<>();
+            for ( final ControllerServiceNode node : nodeMap.keySet() ) {
+                idToNodeMap.put(node.getIdentifier(), node);
+            }
+            
+            // We can have many Controller Services dependent on one another. 
We can have many of these
+            // disparate lists of Controller Services that are dependent on 
one another. We refer to each
+            // of these as a branch.
+            final List<List<ControllerServiceNode>> branches = 
determineEnablingOrder(idToNodeMap);
+            
+            final ExecutorService executor = 
Executors.newFixedThreadPool(Math.min(10, branches.size()));
+            
+            for ( final List<ControllerServiceNode> branch : branches ) {
+                final Runnable enableBranchRunnable = new Runnable() {
+                    @Override
+                    public void run() {
+                        logger.debug("Enabling Controller Service Branch {}", 
branch);
                         
-                        
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
-                                "Controller Service", Severity.ERROR.name(), 
"Could not start services referencing " + serviceNode + " due to " + e));
-                        continue;
-                    }
-                    
-                    try {
-                        provider.enableControllerService(serviceNode);
-                    } catch (final Exception e) {
-                        logger.error("Failed to enable " + serviceNode + " due 
to " + e);
-                        if ( logger.isDebugEnabled() ) {
-                            logger.error("", e);
+                        for ( final ControllerServiceNode serviceNode : branch 
) {
+                            try {
+                                final Element controllerServiceElement = 
nodeMap.get(serviceNode);
+    
+                                final ControllerServiceDTO dto;
+                                synchronized 
(controllerServiceElement.getOwnerDocument()) {
+                                    dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+                                }
+                                
+                                final ControllerServiceState state = 
ControllerServiceState.valueOf(dto.getState());
+                                final boolean enable = (state == 
ControllerServiceState.ENABLED);
+                                if (enable) {
+                                    if ( 
ControllerServiceState.DISABLED.equals(serviceNode.getState()) ) {
+                                        logger.info("Enabling {}", 
serviceNode);
+                                        try {
+                                            
provider.enableControllerService(serviceNode);
+                                        } catch (final Exception e) {
+                                            logger.error("Failed to enable " + 
serviceNode + " due to " + e);
+                                            if ( logger.isDebugEnabled() ) {
+                                                logger.error("", e);
+                                            }
+                                            
+                                            
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
+                                                    "Controller Service", 
Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
+                                        }
+                                    }
+                                    
+                                    // wait for service to finish enabling.
+                                    while ( 
ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
+                                        try {
+                                            Thread.sleep(100L);
+                                        } catch (final InterruptedException 
ie) {}
+                                    }
+                                    
+                                    logger.info("State for {} is now {}", 
serviceNode, serviceNode.getState());
+                                }
+                            } catch (final Exception e) {
+                                logger.error("Failed to enable {} due to {}", 
serviceNode, e.toString());
+                                if ( logger.isDebugEnabled() ) {
+                                    logger.error("", e);
+                                }
+                            }
                         }
-                        
-                        
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
-                                "Controller Service", Severity.ERROR.name(), 
"Could not start " + serviceNode + " due to " + e));
                     }
-                }
+                };
+                
+                executor.submit(enableBranchRunnable);
+            }
+            
+            executor.shutdown();
+        }
+        
+        return nodeMap.keySet();
+    }
+    
+    
+    static List<List<ControllerServiceNode>> determineEnablingOrder(final 
Map<String, ControllerServiceNode> serviceNodeMap) {
+        final List<List<ControllerServiceNode>> orderedNodeLists = new 
ArrayList<>();
+        
+        for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
+            if ( orderedNodeLists.contains(node) ) {
+                continue;   // this node is already in the list.
             }
+            
+            final List<ControllerServiceNode> branch = new ArrayList<>();
+            determineEnablingOrder(serviceNodeMap, node, branch, new 
HashSet<ControllerServiceNode>());
+            orderedNodeLists.add(branch);
         }
         
-        return nodeMap.values();
+        return orderedNodeLists;
     }
     
     
+    private static void determineEnablingOrder(final Map<String, 
ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, 
final List<ControllerServiceNode> orderedNodes, final 
Set<ControllerServiceNode> visited) {
+        if ( visited.contains(contextNode) ) {
+            return;
+        }
+        
+        for ( final Map.Entry<PropertyDescriptor, String> entry : 
contextNode.getProperties().entrySet() ) {
+            if ( entry.getKey().getControllerServiceDefinition() != null ) {
+                final String referencedServiceId = entry.getValue();
+                if ( referencedServiceId != null ) {
+                    final ControllerServiceNode referencedNode = 
serviceNodeMap.get(referencedServiceId);
+                    if ( !orderedNodes.contains(referencedNode) ) {
+                        visited.add(contextNode);
+                        determineEnablingOrder(serviceNodeMap, referencedNode, 
orderedNodes, visited);
+                    }
+                }
+            }
+        }
+
+        if ( !orderedNodes.contains(contextNode) ) {
+            orderedNodes.add(contextNode);
+        }
+    }
+    
     private static ControllerServiceNode createControllerService(final 
ControllerServiceProvider provider, final Element controllerServiceElement, 
final StringEncryptor encryptor) {
         final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index dee8b37..8d46b05 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -66,6 +66,11 @@ public class StandardControllerServiceInitializationContext 
implements Controlle
     }
     
     @Override
+    public boolean isControllerServiceEnabling(String serviceIdentifier) {
+        return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
     public String getControllerServiceName(final String serviceIdentifier) {
        return serviceProvider.getControllerServiceName(serviceIdentifier);
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index cca825c..b2d61bd 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -156,7 +156,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
             } else {
                 proxiedService = (ControllerService) 
Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), 
invocationHandler);
             }
-            logger.info("Create Controller Service of type {} with identifier 
{}", type, id);
+            logger.info("Created Controller Service of type {} with identifier 
{}", type, id);
 
             final ComponentLog serviceLogger = new SimpleProcessLogger(id, 
originalService);
             originalService.initialize(new 
StandardControllerServiceInitializationContext(id, serviceLogger, this));
@@ -308,6 +308,12 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) 
{
+        final ControllerServiceNode node = 
controllerServices.get(serviceIdentifier);
+        return (node == null) ? false : (ControllerServiceState.ENABLING == 
node.getState());
+    }
+    
+    @Override
     public ControllerServiceNode getControllerServiceNode(final String 
serviceIdentifier) {
         return controllerServices.get(serviceIdentifier);
     }
@@ -396,7 +402,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         
         return references;
     }
-    
+
     
     @Override
     public void enableReferencingServices(final ControllerServiceNode 
serviceNode) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 8d6b710..d14a459 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -142,6 +142,11 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
     }
 
     @Override
+    public boolean isControllerServiceEnabling(final String serviceIdentifier) 
{
+        return 
controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
+    }
+    
+    @Override
     public ControllerServiceLookup getControllerServiceLookup() {
         return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
new file mode 100644
index 0000000..9451b07
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.service.mock.ServiceA;
+import org.apache.nifi.controller.service.mock.ServiceB;
+import org.junit.Test;
+
+public class TestControllerServiceLoader {
+    @Test
+    public void testOrderingOfServices() {
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null);
+        final ControllerServiceNode serviceNode1 = 
provider.createControllerService(ServiceA.class.getName(), "1", false);
+        final ControllerServiceNode serviceNode2 = 
provider.createControllerService(ServiceB.class.getName(), "2", false);
+
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+
+        final Map<String, ControllerServiceNode> nodeMap = new 
LinkedHashMap<>();
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        
+        List<List<ControllerServiceNode>> branches = 
ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        List<ControllerServiceNode> ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        assertEquals(1, branches.get(1).size());
+        assertTrue(branches.get(1).get(0) == serviceNode2);
+        
+        nodeMap.clear();
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("1", serviceNode1);
+        
+        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(1);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        assertEquals(1, branches.get(0).size());
+        assertTrue(branches.get(0).get(0) == serviceNode2);
+        
+        // add circular dependency on self.
+        nodeMap.clear();
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        
+        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        nodeMap.clear();
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("1", serviceNode1);
+        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(1);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        // add circular dependency once removed. In this case, we won't 
actually be able to enable these because of the
+        // circular dependency because they will never be valid because they 
will always depend on a disabled service.
+        // But we want to ensure that the method returns successfully without 
throwing a StackOverflowException or anything
+        // like that.
+        nodeMap.clear();
+        final ControllerServiceNode serviceNode3 = 
provider.createControllerService(ServiceA.class.getName(), "3", false);
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("3", serviceNode3);
+        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode3);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        nodeMap.clear();
+        nodeMap.put("3", serviceNode3);
+        nodeMap.put("1", serviceNode1);
+        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(2, branches.size());
+        ordered = branches.get(1);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode3);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        
+        // Add multiple completely disparate branches.
+        nodeMap.clear();
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        final ControllerServiceNode serviceNode4 = 
provider.createControllerService(ServiceB.class.getName(), "4", false);
+        final ControllerServiceNode serviceNode5 = 
provider.createControllerService(ServiceB.class.getName(), "5", false);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("3", serviceNode3);
+        nodeMap.put("4", serviceNode4);
+        nodeMap.put("5", serviceNode5);
+        
+        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(5, branches.size());
+
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        assertEquals(1, branches.get(1).size());
+        assertTrue(branches.get(1).get(0) == serviceNode2);
+        
+        ordered = branches.get(2);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode4);
+        assertTrue(ordered.get(1) == serviceNode3);
+        
+        assertEquals(1, branches.get(3).size());
+        assertTrue(branches.get(3).get(0) == serviceNode4);
+        
+        assertEquals(1, branches.get(4).size());
+        assertTrue(branches.get(4).get(0) == serviceNode5);
+        
+        // create 2 branches both dependent on the same service
+        nodeMap.clear();
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        nodeMap.put("1", serviceNode1);
+        nodeMap.put("2", serviceNode2);
+        nodeMap.put("3", serviceNode3);
+        
+        branches = ControllerServiceLoader.determineEnablingOrder(nodeMap);
+        assertEquals(3, branches.size());
+        
+        ordered = branches.get(0);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode1);
+        
+        ordered = branches.get(1);
+        assertEquals(1, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        
+        ordered = branches.get(2);
+        assertEquals(2, ordered.size());
+        assertTrue(ordered.get(0) == serviceNode2);
+        assertTrue(ordered.get(1) == serviceNode3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
index 2c85655..a0bf30d 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
@@ -167,5 +167,10 @@ public class TestStandardPropertyValue {
         public String getControllerServiceName(String serviceIdentifier) {
                return null;
         }
+        
+        @Override
+        public boolean isControllerServiceEnabling(String serviceIdentifier) {
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 66b73e2..f2e848f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -20,8 +20,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -89,7 +89,7 @@ public abstract class DistributedCacheServer extends 
AbstractControllerService {
         }
     }
 
-    @OnShutdown
+    @OnDisabled
     public void shutdownServer() throws IOException {
         if (cacheServer != null) {
             cacheServer.stop();

Reply via email to