Repository: incubator-nifi Updated Branches: refs/heads/NIFI-250 42f8e8198 -> c63a8c05f
NIFI-250: Fix the way that services are restored at startup Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d7210da4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d7210da4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d7210da4 Branch: refs/heads/NIFI-250 Commit: d7210da40eda444267133fdfbbf2a8a378590e82 Parents: dddf5e8 Author: Mark Payne <[email protected]> Authored: Wed Mar 18 09:37:06 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Mar 18 09:37:06 2015 -0400 ---------------------------------------------------------------------- .../nifi/components/PropertyDescriptor.java | 6 +- .../controller/ControllerServiceLookup.java | 12 ++ .../nifi/util/MockControllerServiceLookup.java | 5 + .../MockProcessorInitializationContext.java | 5 + .../apache/nifi/util/MockValidationContext.java | 5 + .../cluster/manager/impl/WebClusterManager.java | 64 +++++-- .../apache/nifi/controller/FlowController.java | 5 + .../controller/StandardFlowSynchronizer.java | 158 ++++++++--------- .../reporting/AbstractReportingTaskNode.java | 6 + .../reporting/StandardReportingContext.java | 5 + .../StandardReportingInitializationContext.java | 5 + .../scheduling/StandardProcessScheduler.java | 15 +- .../service/ControllerServiceLoader.java | 161 ++++++++++++----- ...dControllerServiceInitializationContext.java | 5 + .../StandardControllerServiceProvider.java | 10 +- .../nifi/processor/StandardProcessContext.java | 5 + .../service/TestControllerServiceLoader.java | 175 +++++++++++++++++++ .../processor/TestStandardPropertyValue.java | 5 + .../cache/server/DistributedCacheServer.java | 4 +- 19 files changed, 514 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index 65ef0ba..e62ff79 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -150,9 +150,11 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> .build(); } - if (!context.getControllerServiceLookup().isControllerServiceEnabled(controllerService)) { + final String serviceId = controllerService.getIdentifier(); + if (!context.getControllerServiceLookup().isControllerServiceEnabled(serviceId) && + !context.getControllerServiceLookup().isControllerServiceEnabling(serviceId)) { return new ValidationResult.Builder() - .input(context.getControllerServiceLookup().getControllerServiceName(controllerService.getIdentifier())) + .input(context.getControllerServiceLookup().getControllerServiceName(serviceId)) .subject(getName()) .valid(false) .explanation("Controller Service " + controllerService + " is disabled") http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java index 25167ad..4b96f62 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java @@ -42,6 +42,18 @@ public interface ControllerServiceLookup { boolean isControllerServiceEnabled(String serviceIdentifier); /** + * Returns <code>true</code> if the Controller Service with the given + * identifier has been enabled but is still in the transitioning state, + * otherwise returns <code>false</code>. + * If the given identifier is not known by this ControllerServiceLookup, + * returns <code>false</code>. + * + * @param serviceIdentifier + * @return + */ + boolean isControllerServiceEnabling(String serviceIdentifier); + + /** * Returns <code>true</code> if the given Controller Service is enabled, * <code>false</code> otherwise. If the given Controller Service is not * known by this ControllerServiceLookup, returns <code>false</code> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java index c804d5f..2734440 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java @@ -77,6 +77,11 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo } @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + return false; + } + + @Override public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { final Set<String> ids = new HashSet<>(); for (final Map.Entry<String, ControllerServiceConfiguration> entry : controllerServiceMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java index 582ffb1..0aa2749 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -76,4 +76,9 @@ public class MockProcessorInitializationContext implements ProcessorInitializati public boolean isControllerServiceEnabled(ControllerService service) { return context.isControllerServiceEnabled(service); } + + @Override + public boolean isControllerServiceEnabling(String serviceIdentifier) { + return context.isControllerServiceEnabling(serviceIdentifier); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java index 6d32d0b..bf6ed1d 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -101,4 +101,9 @@ public class MockValidationContext implements ValidationContext, ControllerServi public boolean isValidationRequired(final ControllerService service) { return true; } + + @Override + public boolean isControllerServiceEnabling(String serviceIdentifier) { + return context.isControllerServiceEnabling(serviceIdentifier); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index d3688af..6fe1f80 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -127,6 +127,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ProcessorLifeCycleException; @@ -219,6 +220,7 @@ import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; import com.sun.jersey.api.client.ClientResponse; + import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; @@ -941,6 +943,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim(); final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim(); + final String scheduleStateValue = DomUtils.getChild(taskElement, "scheduledState").getTextContent().trim(); + final ScheduledState scheduledState = ScheduledState.valueOf(scheduleStateValue); + //optional task-specific properties for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) { final String name = optionalProperty.getAttribute("name"); @@ -980,7 +985,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue()); } - processScheduler.schedule(reportingTaskNode); + reportingTaskNode.setScheduledState(scheduledState); + if ( ScheduledState.RUNNING.equals(scheduledState) ) { + if ( reportingTaskNode.isValid() ) { + try { + processScheduler.schedule(reportingTaskNode); + } catch (final Exception e) { + logger.error("Failed to start {} due to {}", reportingTaskNode, e); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + } + } else { + logger.error("Failed to start {} because it is invalid due to {}", reportingTaskNode, reportingTaskNode.getValidationErrors()); + } + } + + tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode); } } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) { @@ -1369,6 +1390,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier); + } + + @Override public String getControllerServiceName(final String serviceIdentifier) { return controllerServiceProvider.getControllerServiceName(serviceIdentifier); } @@ -2621,24 +2647,26 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue(); // go through all the nodes referencing components - for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) { - // handle active thread counts - if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) { - final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId()); - if (current == null) { - activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount()); - } else { - activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current); + if ( nodeReferencingComponents != null ) { + for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) { + // handle active thread counts + if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) { + final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId()); + if (current == null) { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount()); + } else { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current); + } } - } - - // handle controller service state - final String state = states.get(nodeReferencingComponent.getId()); - if (state == null) { - if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) { - states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name()); - } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) { - states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name()); + + // handle controller service state + final String state = states.get(nodeReferencingComponent.getId()); + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name()); + } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name()); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index a04ae3a..26f2369 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2682,6 +2682,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier); + } + + @Override public String getControllerServiceName(final String serviceIdentifier) { return controllerServiceProvider.getControllerServiceName(serviceIdentifier); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 2348dcb..087272c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -214,88 +214,90 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { logger.trace("Parsing proposed flow bytes as DOM document"); final Document configuration = parseFlowBytes(proposedFlow.getFlow()); - // attempt to sync controller with proposed flow - try { - if (configuration != null) { - // get the root element - final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0); - - // set controller config - logger.trace("Updating flow config"); - final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount"); - if (maxThreadCount == null) { - controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount")); - controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount")); - } else { - controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3); - controller.setMaxEventDrivenThreadCount(maxThreadCount / 3); - } - - // get the root group XML element - final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); - - final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); - if ( controllerServicesElement != null ) { - final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); - - if ( !initialized || existingFlowEmpty ) { - ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState); - } else { - for ( final Element serviceElement : serviceElements ) { - updateControllerService(controller, serviceElement, encryptor); - } - } - } - - // if this controller isn't initialized or its emtpy, add the root group, otherwise update - if (!initialized || existingFlowEmpty) { - logger.trace("Adding root process group"); - addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); - } else { - logger.trace("Updating root process group"); - updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); - } - - final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks"); - if ( reportingTasksElement != null ) { - final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); - for ( final Element taskElement : taskElements ) { - if ( !initialized || existingFlowEmpty ) { - addReportingTask(controller, taskElement, encryptor); - } else { - updateReportingTask(controller, taskElement, encryptor); - } - } + synchronized (configuration) { + // attempt to sync controller with proposed flow + try { + if (configuration != null) { + // get the root element + final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0); + + // set controller config + logger.trace("Updating flow config"); + final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount"); + if (maxThreadCount == null) { + controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount")); + controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount")); + } else { + controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3); + controller.setMaxEventDrivenThreadCount(maxThreadCount / 3); + } + + // get the root group XML element + final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); + + final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); + if ( controllerServicesElement != null ) { + final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); + + if ( !initialized || existingFlowEmpty ) { + ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState); + } else { + for ( final Element serviceElement : serviceElements ) { + updateControllerService(controller, serviceElement, encryptor); + } + } + } + + // if this controller isn't initialized or its emtpy, add the root group, otherwise update + if (!initialized || existingFlowEmpty) { + logger.trace("Adding root process group"); + addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); + } else { + logger.trace("Updating root process group"); + updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); + } + + final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks"); + if ( reportingTasksElement != null ) { + final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); + for ( final Element taskElement : taskElements ) { + if ( !initialized || existingFlowEmpty ) { + addReportingTask(controller, taskElement, encryptor); + } else { + updateReportingTask(controller, taskElement, encryptor); + } + } + } } - } - - logger.trace("Synching templates"); - if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) { - // need to load templates - final TemplateManager templateManager = controller.getTemplateManager(); - final List<Template> proposedTemplateList = TemplateManager.parseBytes(proposedFlow.getTemplates()); - for (final Template template : proposedTemplateList) { - templateManager.addTemplate(template.getDetails()); + + logger.trace("Synching templates"); + if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) { + // need to load templates + final TemplateManager templateManager = controller.getTemplateManager(); + final List<Template> proposedTemplateList = TemplateManager.parseBytes(proposedFlow.getTemplates()); + for (final Template template : proposedTemplateList) { + templateManager.addTemplate(template.getDetails()); + } } - } - - // clear the snippets that are currently in memory - logger.trace("Clearing existing snippets"); - final SnippetManager snippetManager = controller.getSnippetManager(); - snippetManager.clear(); - - // if proposed flow has any snippets, load them - logger.trace("Loading proposed snippets"); - final byte[] proposedSnippets = proposedFlow.getSnippets(); - if (proposedSnippets != null && proposedSnippets.length > 0) { - for (final StandardSnippet snippet : SnippetManager.parseBytes(proposedSnippets)) { - snippetManager.addSnippet(snippet); + + // clear the snippets that are currently in memory + logger.trace("Clearing existing snippets"); + final SnippetManager snippetManager = controller.getSnippetManager(); + snippetManager.clear(); + + // if proposed flow has any snippets, load them + logger.trace("Loading proposed snippets"); + final byte[] proposedSnippets = proposedFlow.getSnippets(); + if (proposedSnippets != null && proposedSnippets.length > 0) { + for (final StandardSnippet snippet : SnippetManager.parseBytes(proposedSnippets)) { + snippetManager.addSnippet(snippet); + } } + + logger.debug("Finished synching flows"); + } catch (final Exception ex) { + throw new FlowSynchronizationException(ex); } - - logger.debug("Finished synching flows"); - } catch (final Exception ex) { - throw new FlowSynchronizationException(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 272c0ba..05b3a06 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -245,4 +245,10 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon } } } + + + @Override + public String toString() { + return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]"; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index 2b49d76..3d57533 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -124,6 +124,11 @@ public class StandardReportingContext implements ReportingContext, ControllerSer public boolean isControllerServiceEnabled(final String serviceIdentifier) { return serviceProvider.isControllerServiceEnabled(serviceIdentifier); } + + @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + return serviceProvider.isControllerServiceEnabling(serviceIdentifier); + } @Override public ControllerServiceLookup getControllerServiceLookup() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java index a2cf8ee..435dbd0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java @@ -94,6 +94,11 @@ public class StandardReportingInitializationContext implements ReportingInitiali } @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + return serviceProvider.isControllerServiceEnabling(serviceIdentifier); + } + + @Override public ControllerServiceLookup getControllerServiceLookup() { return this; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 087ec68..b005a57 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -19,6 +19,8 @@ package org.apache.nifi.controller.scheduling; import static java.util.Objects.requireNonNull; import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -32,11 +34,13 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; @@ -281,13 +285,22 @@ public final class StandardProcessScheduler implements ProcessScheduler { } final Runnable startProcRunnable = new Runnable() { - @SuppressWarnings("deprecation") @Override + @SuppressWarnings("deprecation") public void run() { try (final NarCloseable x = NarCloseable.withNarLoader()) { long lastStopTime = scheduleState.getLastStopTime(); final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); + final Set<String> serviceIds = new HashSet<>(); + for ( final PropertyDescriptor descriptor : processContext.getProperties().keySet() ) { + final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition(); + if ( serviceDefinition != null ) { + final String serviceId = processContext.getProperty(descriptor).getValue(); + serviceIds.add(serviceId); + } + } + while (true) { try { synchronized (scheduleState) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 32d9c46..7c2d4df 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -22,15 +22,18 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.BulletinFactory; @@ -38,6 +41,8 @@ import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.DomUtils; import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.SAXException; @@ -48,7 +53,7 @@ import org.xml.sax.SAXParseException; */ public class ControllerServiceLoader { - private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class); + private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class); public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException { @@ -89,7 +94,7 @@ public class ControllerServiceLoader { }); final Document document = builder.parse(in); - final Element controllerServices = DomUtils.getChild(document.getDocumentElement(), "controllerServices"); + final Element controllerServices = document.getDocumentElement(); final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); return new ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState)); } catch (SAXException | ParserConfigurationException sxe) { @@ -98,57 +103,135 @@ public class ControllerServiceLoader { } public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { - final Map<Element, ControllerServiceNode> nodeMap = new HashMap<>(); + final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>(); for ( final Element serviceElement : serviceElements ) { final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor); - nodeMap.put(serviceElement, serviceNode); + // We need to clone the node because it will be used in a separate thread below, and + // Element is not thread-safe. + nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true)); } - for ( final Map.Entry<Element, ControllerServiceNode> entry : nodeMap.entrySet() ) { - configureControllerService(entry.getValue(), entry.getKey(), encryptor); + for ( final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet() ) { + configureControllerService(entry.getKey(), entry.getValue(), encryptor); } // Start services if ( autoResumeState ) { - for ( final Map.Entry<Element, ControllerServiceNode> entry : nodeMap.entrySet() ) { - final Element controllerServiceElement = entry.getKey(); - final ControllerServiceNode serviceNode = entry.getValue(); - - final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); - final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); - final boolean enable = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); - if (enable) { - try { - provider.enableReferencingServices(serviceNode); - } catch (final Exception e) { - logger.error("Failed to enable " + serviceNode + " due to " + e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } + // determine the order to load the services. We have to ensure that if service A references service B, then B + // is enabled first, and so on. + final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>(); + for ( final ControllerServiceNode node : nodeMap.keySet() ) { + idToNodeMap.put(node.getIdentifier(), node); + } + + // We can have many Controller Services dependent on one another. We can have many of these + // disparate lists of Controller Services that are dependent on one another. We refer to each + // of these as a branch. + final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap); + + final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size())); + + for ( final List<ControllerServiceNode> branch : branches ) { + final Runnable enableBranchRunnable = new Runnable() { + @Override + public void run() { + logger.debug("Enabling Controller Service Branch {}", branch); - bulletinRepo.addBulletin(BulletinFactory.createBulletin( - "Controller Service", Severity.ERROR.name(), "Could not start services referencing " + serviceNode + " due to " + e)); - continue; - } - - try { - provider.enableControllerService(serviceNode); - } catch (final Exception e) { - logger.error("Failed to enable " + serviceNode + " due to " + e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); + for ( final ControllerServiceNode serviceNode : branch ) { + try { + final Element controllerServiceElement = nodeMap.get(serviceNode); + + final ControllerServiceDTO dto; + synchronized (controllerServiceElement.getOwnerDocument()) { + dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); + } + + final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); + final boolean enable = (state == ControllerServiceState.ENABLED); + if (enable) { + if ( ControllerServiceState.DISABLED.equals(serviceNode.getState()) ) { + logger.info("Enabling {}", serviceNode); + try { + provider.enableControllerService(serviceNode); + } catch (final Exception e) { + logger.error("Failed to enable " + serviceNode + " due to " + e); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + + bulletinRepo.addBulletin(BulletinFactory.createBulletin( + "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); + } + } + + // wait for service to finish enabling. + while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) { + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) {} + } + + logger.info("State for {} is now {}", serviceNode, serviceNode.getState()); + } + } catch (final Exception e) { + logger.error("Failed to enable {} due to {}", serviceNode, e.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + } } - - bulletinRepo.addBulletin(BulletinFactory.createBulletin( - "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); } - } + }; + + executor.submit(enableBranchRunnable); + } + + executor.shutdown(); + } + + return nodeMap.keySet(); + } + + + static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { + final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>(); + + for ( final ControllerServiceNode node : serviceNodeMap.values() ) { + if ( orderedNodeLists.contains(node) ) { + continue; // this node is already in the list. } + + final List<ControllerServiceNode> branch = new ArrayList<>(); + determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>()); + orderedNodeLists.add(branch); } - return nodeMap.values(); + return orderedNodeLists; } + private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) { + if ( visited.contains(contextNode) ) { + return; + } + + for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) { + if ( entry.getKey().getControllerServiceDefinition() != null ) { + final String referencedServiceId = entry.getValue(); + if ( referencedServiceId != null ) { + final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId); + if ( !orderedNodes.contains(referencedNode) ) { + visited.add(contextNode); + determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited); + } + } + } + } + + if ( !orderedNodes.contains(contextNode) ) { + orderedNodes.add(contextNode); + } + } + private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) { final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java index dee8b37..8d46b05 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java @@ -66,6 +66,11 @@ public class StandardControllerServiceInitializationContext implements Controlle } @Override + public boolean isControllerServiceEnabling(String serviceIdentifier) { + return serviceProvider.isControllerServiceEnabling(serviceIdentifier); + } + + @Override public String getControllerServiceName(final String serviceIdentifier) { return serviceProvider.getControllerServiceName(serviceIdentifier); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index cca825c..b2d61bd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -156,7 +156,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } else { proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler); } - logger.info("Create Controller Service of type {} with identifier {}", type, id); + logger.info("Created Controller Service of type {} with identifier {}", type, id); final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService); originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this)); @@ -308,6 +308,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + final ControllerServiceNode node = controllerServices.get(serviceIdentifier); + return (node == null) ? false : (ControllerServiceState.ENABLING == node.getState()); + } + + @Override public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { return controllerServices.get(serviceIdentifier); } @@ -396,7 +402,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return references; } - + @Override public void enableReferencingServices(final ControllerServiceNode serviceNode) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 8d6b710..d14a459 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -142,6 +142,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService } @Override + public boolean isControllerServiceEnabling(final String serviceIdentifier) { + return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier); + } + + @Override public ControllerServiceLookup getControllerServiceLookup() { return this; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java new file mode 100644 index 0000000..9451b07 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestControllerServiceLoader.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.service.mock.ServiceA; +import org.apache.nifi.controller.service.mock.ServiceB; +import org.junit.Test; + +public class TestControllerServiceLoader { + @Test + public void testOrderingOfServices() { + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null); + final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); + final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false); + + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + + final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>(); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + + List<List<ControllerServiceNode>> branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + List<ControllerServiceNode> ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + assertEquals(1, branches.get(1).size()); + assertTrue(branches.get(1).get(0) == serviceNode2); + + nodeMap.clear(); + nodeMap.put("2", serviceNode2); + nodeMap.put("1", serviceNode1); + + branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(1); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + assertEquals(1, branches.get(0).size()); + assertTrue(branches.get(0).get(0) == serviceNode2); + + // add circular dependency on self. + nodeMap.clear(); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1"); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + + branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + nodeMap.clear(); + nodeMap.put("2", serviceNode2); + nodeMap.put("1", serviceNode1); + branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(1); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + // add circular dependency once removed. In this case, we won't actually be able to enable these because of the + // circular dependency because they will never be valid because they will always depend on a disabled service. + // But we want to ensure that the method returns successfully without throwing a StackOverflowException or anything + // like that. + nodeMap.clear(); + final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1"); + nodeMap.put("1", serviceNode1); + nodeMap.put("3", serviceNode3); + branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode3); + assertTrue(ordered.get(1) == serviceNode1); + + nodeMap.clear(); + nodeMap.put("3", serviceNode3); + nodeMap.put("1", serviceNode1); + branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(2, branches.size()); + ordered = branches.get(1); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode3); + assertTrue(ordered.get(1) == serviceNode1); + + + // Add multiple completely disparate branches. + nodeMap.clear(); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5", false); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + nodeMap.put("3", serviceNode3); + nodeMap.put("4", serviceNode4); + nodeMap.put("5", serviceNode5); + + branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(5, branches.size()); + + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + assertEquals(1, branches.get(1).size()); + assertTrue(branches.get(1).get(0) == serviceNode2); + + ordered = branches.get(2); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode4); + assertTrue(ordered.get(1) == serviceNode3); + + assertEquals(1, branches.get(3).size()); + assertTrue(branches.get(3).get(0) == serviceNode4); + + assertEquals(1, branches.get(4).size()); + assertTrue(branches.get(4).get(0) == serviceNode5); + + // create 2 branches both dependent on the same service + nodeMap.clear(); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + nodeMap.put("1", serviceNode1); + nodeMap.put("2", serviceNode2); + nodeMap.put("3", serviceNode3); + + branches = ControllerServiceLoader.determineEnablingOrder(nodeMap); + assertEquals(3, branches.size()); + + ordered = branches.get(0); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode1); + + ordered = branches.get(1); + assertEquals(1, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + + ordered = branches.get(2); + assertEquals(2, ordered.size()); + assertTrue(ordered.get(0) == serviceNode2); + assertTrue(ordered.get(1) == serviceNode3); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java index 2c85655..a0bf30d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java @@ -167,5 +167,10 @@ public class TestStandardPropertyValue { public String getControllerServiceName(String serviceIdentifier) { return null; } + + @Override + public boolean isControllerServiceEnabling(String serviceIdentifier) { + return false; + } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d7210da4/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java index 66b73e2..f2e848f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java @@ -20,8 +20,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @@ -89,7 +89,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService { } } - @OnShutdown + @OnDisabled public void shutdownServer() throws IOException { if (cacheServer != null) { cacheServer.stop();
