NIFI-4436: Ensure that on save, we assign a Versioned Component Identifier to inner process groups that are tracking to remote flows, if they don't have one. This would occur, for instance, if a Process Group was imported into an existing group (or copied/moved into it) and then the existing group was saved.
NIFI-4436: Fixed a bug that caused a flow not to successfully change version if a connection is added to an existing component and that component is running at time of version change NIFI-4436: Fixed bug with ordering of controller services being enabled and disabled NIFI-4436: Fixed bug that prevented local input and output ports from being stopped and started as needed NIFI-4436: Fixed bugs around referencing controller services that are at a higher level than the versioned flow NIFI-4436: Ensure that we clear components from FlowController's cache when removed and that they are added to cache when created. NIFI-4436: Fixed error message coming back if component is invalid when trying to be restarted/re-enabled NIFI-4436: Addressed issue with children of a removed process group not being considered 'affected components' and as a result not being stopped/disabled/restarted/re-enabled This closes #2219. Signed-off-by: Matt Gilman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b6117743 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b6117743 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b6117743 Branch: refs/heads/master Commit: b6117743d4c1c1a37a16ba746b9edbbdd276d69f Parents: fa996cd Author: Mark Payne <[email protected]> Authored: Thu Jan 4 16:09:02 2018 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 13:10:13 2018 -0500 ---------------------------------------------------------------------- .../nifi/web/api/dto/AffectedComponentDTO.java | 3 + .../service/ControllerServiceProvider.java | 22 ++- .../apache/nifi/controller/FlowController.java | 11 ++ .../StandardControllerServiceProvider.java | 127 ++++++++++++++- .../nifi/groups/StandardProcessGroup.java | 42 ++--- .../TestStandardProcessScheduler.java | 17 +- .../TestStandardControllerServiceProvider.java | 9 +- .../org/apache/nifi/web/NiFiServiceFacade.java | 21 ++- .../apache/nifi/web/ResumeFlowException.java | 31 ++++ .../nifi/web/StandardNiFiServiceFacade.java | 160 ++++++++++++++++--- .../apache/nifi/web/api/VersionsResource.java | 33 +++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 27 ++++ .../apache/nifi/web/dao/ProcessGroupDAO.java | 7 +- .../web/dao/impl/StandardProcessGroupDAO.java | 54 +++---- .../nifi/web/util/AffectedComponentUtils.java | 9 ++ .../nifi/web/util/LocalComponentLifecycle.java | 45 ++++++ 16 files changed, 520 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java index 95024ca..dd7c6c4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java @@ -26,6 +26,8 @@ import java.util.Collection; public class AffectedComponentDTO { public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR"; public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE"; + public static final String COMPONENT_TYPE_INPUT_PORT = "INPUT_PORT"; + public static final String COMPONENT_TYPE_OUTPUT_PORT = "OUTPUT_PORT"; public static final String COMPONENT_TYPE_REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT"; public static final String COMPONENT_TYPE_REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT"; @@ -58,6 +60,7 @@ public class AffectedComponentDTO { @ApiModelProperty(value = "The type of this component", allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE + ", " + + COMPONENT_TYPE_INPUT_PORT + ", " + COMPONENT_TYPE_OUTPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_INPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_OUTPUT_PORT) public String getReferenceType() { return referenceType; http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index 010ecdf..ae5416c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -20,6 +20,7 @@ import java.net.URL; import java.util.Collection; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.bundle.BundleCoordinate; @@ -84,6 +85,16 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { void enableControllerServices(Collection<ControllerServiceNode> serviceNodes); /** + * Enables the collection of services in the background. If a service in this collection + * depends on another service, the service being depended on must either already be enabled + * or must be in the collection as well. + * + * @param serviceNodes the nodes + * @return a Future that can be used to cancel the task or wait until it is completed + */ + Future<Void> enableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes); + + /** * Disables the given controller service so that it cannot be used by other * components. This allows configuration to be updated or allows service to * be removed. @@ -93,8 +104,17 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { CompletableFuture<Void> disableControllerService(ControllerServiceNode serviceNode); /** + * Disables the collection of services in the background. If any of the services given is referenced + * by another service, then that other service must either be disabled or be in the given collection. + * + * @param serviceNodes the nodes the disable + * @return a Future that can be used to cancel the task or wait until it is completed + */ + Future<Void> disableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes); + + /** * @return a Set of all Controller Services that exist for this service - * provider + * provider */ Set<ControllerServiceNode> getAllControllerServices(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index eb4b8b9..88dc11c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -245,6 +245,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -3610,12 +3611,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override + public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) { + return controllerServiceProvider.enableControllerServicesAsync(serviceNodes); + } + + @Override public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); return controllerServiceProvider.disableControllerService(serviceNode); } @Override + public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) { + return controllerServiceProvider.disableControllerServicesAsync(serviceNodes); + } + + @Override public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 48ad849..b4d9e8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.commons.lang3.ClassUtils; @@ -50,13 +51,13 @@ import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; -import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; +import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; @@ -78,7 +79,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class); - private final ProcessScheduler processScheduler; + private final StandardProcessScheduler processScheduler; private final BulletinRepository bulletinRepo; private final StateManagerProvider stateManagerProvider; private final VariableRegistry variableRegistry; @@ -87,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>(); - public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, + public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) { this.flowController = flowController; @@ -384,6 +385,74 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } + @Override + public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + processScheduler.submitFrameworkTask(() -> { + enableControllerServices(serviceNodes, future); + future.complete(null); + }); + + return future; + } + + private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) { + // validate that we are able to start all of the services. + Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator(); + while (serviceIter.hasNext()) { + ControllerServiceNode controllerServiceNode = serviceIter.next(); + List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices(); + for (ControllerServiceNode requiredService : requiredServices) { + if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) { + logger.error("Cannot enable {} because it has a dependency on {}, which is not enabled", controllerServiceNode, requiredService); + completableFuture.completeExceptionally(new IllegalStateException("Cannot enable " + controllerServiceNode + + " because it has a dependency on " + requiredService + ", which is not enabled")); + return; + } + } + } + + for (final ControllerServiceNode controllerServiceNode : serviceNodes) { + if (completableFuture.isCancelled()) { + return; + } + + try { + if (!controllerServiceNode.isActive()) { + final Future<Void> future = enableControllerServiceDependenciesFirst(controllerServiceNode); + + while (true) { + try { + future.get(1, TimeUnit.SECONDS); + logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState()); + break; + } catch (final TimeoutException e) { + if (completableFuture.isCancelled()) { + return; + } + } catch (final Exception e) { + logger.warn("Failed to enable service {}", controllerServiceNode, e); + completableFuture.completeExceptionally(e); + + if (this.bulletinRepo != null) { + this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service", + Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e)); + } + + return; + } + } + } + } catch (Exception e) { + logger.error("Failed to enable " + controllerServiceNode, e); + if (this.bulletinRepo != null) { + this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service", + Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e)); + } + } + } + } + private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) { final Map<ControllerServiceNode, Future<Void>> futures = new HashMap<>(); @@ -461,6 +530,58 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override + public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + processScheduler.submitFrameworkTask(() -> { + disableControllerServices(serviceNodes, future); + future.complete(null); + }); + + return future; + } + + private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) { + final Set<ControllerServiceNode> serviceNodeSet = new HashSet<>(serviceNodes); + + // Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection + for (final ControllerServiceNode serviceNode : serviceNodes) { + final List<ControllerServiceNode> references = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); + for (final ControllerServiceNode reference : references) { + if (reference.isActive()) { + try { + reference.verifyCanDisable(serviceNodeSet); + } catch (final Exception e) { + future.completeExceptionally(e); + } + } + } + } + + for (final ControllerServiceNode serviceNode : serviceNodes) { + if (serviceNode.isActive()) { + disableReferencingServices(serviceNode); + final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode); + + while (true) { + try { + serviceFuture.get(1, TimeUnit.SECONDS); + break; + } catch (final TimeoutException e) { + if (future.isCancelled()) { + return; + } + + continue; + } catch (final Exception e) { + logger.error("Failed to disable {}", serviceNode, e); + future.completeExceptionally(e); + } + } + } + } + } + + @Override public ControllerService getControllerService(final String serviceIdentifier) { final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier); return node == null ? null : node.getProxiedControllerService(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 9418f40..4cd6e2a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -738,7 +738,8 @@ public final class StandardProcessGroup implements ProcessGroup { } for (final ControllerServiceNode cs : group.getControllerServices(false)) { - group.removeControllerService(cs); + // Must go through Controller Service here because we need to ensure that it is removed from the cache + flowController.removeControllerService(cs); } for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) { @@ -3158,9 +3159,13 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier()))); }); - processGroup.getProcessGroups().stream() - .filter(childGroup -> childGroup.getVersionControlInformation() == null) - .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); + for (final ProcessGroup childGroup : processGroup.getProcessGroups()) { + if (childGroup.getVersionControlInformation() == null) { + applyVersionedComponentIds(childGroup, lookup); + } else if (!childGroup.getVersionedComponentId().isPresent()) { + childGroup.setVersionedComponentId(lookup.apply(childGroup.getIdentifier())); + } + } } @@ -3242,7 +3247,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(remoteFlow, localFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); final Set<String> updatedVersionedComponentIds = new HashSet<>(); @@ -3387,7 +3392,6 @@ public final class StandardProcessGroup implements ProcessGroup { .map(VariableDescriptor::getName) .collect(Collectors.toSet()); - final Map<String, String> updatedVariableMap = new HashMap<>(); // If any new variables exist in the proposed flow, add those to the variable registry. @@ -3477,6 +3481,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (childGroup == null) { final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip); + flowController.onProcessGroupAdded(added); added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize); LOG.info("Added {} to {}", added, this); } else if (childCoordinates == null || updateDescendantVersionedGroups) { @@ -3496,6 +3501,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier()); if (funnel == null) { final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed); + flowController.onFunnelAdded(added); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) { updateFunnel(funnel, proposedFunnel); @@ -3517,6 +3523,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier()); if (port == null) { final Port added = addInputPort(group, proposedPort, componentIdSeed); + flowController.onInputPortAdded(added); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { updatePort(port, proposedPort); @@ -3537,6 +3544,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier()); if (port == null) { final Port added = addOutputPort(group, proposedPort, componentIdSeed); + flowController.onOutputPortAdded(added); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { updatePort(port, proposedPort); @@ -3580,6 +3588,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); if (processor == null) { final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed); + flowController.onProcessorAdded(added); final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream() @@ -3638,6 +3647,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier()); if (connection == null) { final Connection added = addConnection(group, proposedConnection, componentIdSeed); + flowController.onConnectionAdded(added); LOG.info("Added {} to {}", added, this); } else if (isUpdateable(connection)) { // If the connection needs to be updated, then the source and destination will already have @@ -3658,6 +3668,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Connection connection = connectionsByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", connection, group); group.removeConnection(connection); + flowController.onConnectionRemoved(connection); } // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships. @@ -3670,7 +3681,8 @@ public final class StandardProcessGroup implements ProcessGroup { for (final String removedVersionedId : controllerServicesRemoved) { final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId); LOG.info("Removing {} from {}", service, group); - group.removeControllerService(service); + // Must remove Controller Service through Flow Controller in order to remove from cache + flowController.removeControllerService(service); } for (final String removedVersionedId : funnelsRemoved) { @@ -4065,13 +4077,6 @@ public final class StandardProcessGroup implements ProcessGroup { // to the instance ID of the Controller Service. final String serviceVersionedComponentId = entry.getValue(); String instanceId = getServiceInstanceId(serviceVersionedComponentId, group); - if (instanceId == null) { - // We didn't find the instance ID based on the Versioned Component ID. So we want to just - // leave the value set to whatever it currently is, if it's currently set. - final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder().name(entry.getKey()).build(); - instanceId = currentProperties.get(propertyDescriptor); - } - value = instanceId == null ? serviceVersionedComponentId : instanceId; } else { value = entry.getValue(); @@ -4085,13 +4090,9 @@ public final class StandardProcessGroup implements ProcessGroup { } private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) { - for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) { + for (final ControllerServiceNode serviceNode : group.getControllerServices(true)) { final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId(); - if (!optionalVersionedId.isPresent()) { - continue; - } - - final String versionedId = optionalVersionedId.get(); + final String versionedId = optionalVersionedId.orElseGet(() -> UUID.nameUUIDFromBytes(serviceNode.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString()); if (versionedId.equals(serviceVersionedComponentId)) { return serviceNode.getIdentifier(); } @@ -4319,7 +4320,6 @@ public final class StandardProcessGroup implements ProcessGroup { } } - // Ensure that all Processors are instantiate-able. final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>(); findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors); http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 314738a..c0b36c9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -47,7 +47,6 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; -import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ReportingTaskNode; @@ -269,7 +268,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -308,7 +307,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateDisabledServiceCantBeDisabled() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -346,7 +345,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -380,7 +379,7 @@ public class TestStandardProcessScheduler { @Test public void validateDisablingOfTheFailedService() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -412,7 +411,7 @@ public class TestStandardProcessScheduler { @Test @Ignore public void validateEnabledDisableMultiThread() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 200; i++) { @@ -455,7 +454,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -481,7 +480,7 @@ public class TestStandardProcessScheduler { */ @Test public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); @@ -581,7 +580,7 @@ public class TestStandardProcessScheduler { } } - private ProcessScheduler createScheduler() { + private StandardProcessScheduler createScheduler() { return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 0d15143..ed335e9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -38,7 +38,6 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; -import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ScheduledState; @@ -146,7 +145,7 @@ public class TestStandardControllerServiceProvider { final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); @@ -162,7 +161,7 @@ public class TestStandardControllerServiceProvider { final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group); - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); @@ -214,13 +213,13 @@ public class TestStandardControllerServiceProvider { */ @Test(timeout = 120000) public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); for (int i = 0; i < 5000; i++) { testEnableReferencingServicesGraph(scheduler); } } - public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) { + public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) { final ProcessGroup procGroup = new MockProcessGroup(controller); final FlowController controller = Mockito.mock(FlowController.class); Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index ab96747..165af45 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -116,6 +116,7 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -778,6 +779,15 @@ public interface NiFiServiceFacade { PortEntity getInputPort(String inputPortId); /** + * Gets an input port as it is available to the given user + * + * @param inputPortId The input port id + * @param user the user + * @return port + */ + PortEntity getInputPort(String inputPortId, NiFiUser user); + + /** * Gets all input ports in a given group. * * @param groupId The id of the group @@ -847,6 +857,15 @@ public interface NiFiServiceFacade { PortEntity getOutputPort(String outputPortId); /** + * Gets an output port as it is available to the given user + * + * @param outputPortId The output port id + * @param user the user + * @return port + */ + PortEntity getOutputPort(String outputPortId, NiFiUser user); + + /** * Gets all output ports in a given group. * * @param groupId The id of the group @@ -1008,7 +1027,7 @@ public interface NiFiServiceFacade { * @param state the state * @param serviceIds the id's of the services */ - void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Set<String> serviceIds); + void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Collection<String> serviceIds); /** * Enables or disables the controller services with the given IDs & Revisions on behalf of the currently logged in user http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java new file mode 100644 index 0000000..8e1e123 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web; + +/** + * Thrown whenever a flow cannot be resumed due to validation error, etc. + */ +public class ResumeFlowException extends Exception { + public ResumeFlowException(final String message, final Throwable cause) { + super(message, cause); + } + + public ResumeFlowException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 1ccced2..6e0e9d2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.commons.collections4.CollectionUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -271,8 +273,8 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; +import com.google.common.collect.Sets; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -484,8 +486,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) { - processGroupDAO.verifyActivateControllerServices(groupId, state, serviceIds); + public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) { + processGroupDAO.verifyActivateControllerServices(state, serviceIds); } @Override @@ -1016,7 +1018,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public RevisionUpdate<ActivateControllerServicesEntity> update() { // schedule the components - processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet()); + processGroupDAO.activateControllerServices(state, serviceRevisions.keySet()); // update the revisions final Map<String, Revision> updatedRevisions = new HashMap<>(); @@ -3289,8 +3291,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } private PortEntity createInputPortEntity(final Port port) { + return createInputPortEntity(port, NiFiUserUtils.getNiFiUser()); + } + + private PortEntity createInputPortEntity(final Port port, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier())); final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -3298,8 +3304,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } private PortEntity createOutputPortEntity(final Port port) { + return createOutputPortEntity(port, NiFiUserUtils.getNiFiUser()); + } + + private PortEntity createOutputPortEntity(final Port port, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user); final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier())); final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); @@ -3409,6 +3419,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public PortEntity getInputPort(final String inputPortId, final NiFiUser user) { + final Port port = inputPortDAO.getPort(inputPortId); + return createInputPortEntity(port, user); + } + + @Override public PortStatusEntity getInputPortStatus(final String inputPortId) { final Port inputPort = inputPortDAO.getPort(inputPortId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPort); @@ -3423,6 +3439,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public PortEntity getOutputPort(final String outputPortId, final NiFiUser user) { + final Port port = outputPortDAO.getPort(outputPortId); + return createOutputPortEntity(port, user); + } + + @Override public PortStatusEntity getOutputPortStatus(final String outputPortId) { final Port outputPort = outputPortDAO.getPort(outputPortId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPort); @@ -3974,28 +3996,95 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { }) .collect(Collectors.toCollection(HashSet::new)); - final Map<String, List<Connection>> connectionsByVersionedId = group.findAllConnections().stream() - .filter(conn -> conn.getVersionedComponentId().isPresent()) - .collect(Collectors.groupingBy(conn -> conn.getVersionedComponentId().get())); + for (final FlowDifference difference : comparison.getDifferences()) { + final VersionedComponent localComponent = difference.getComponentA(); + if (localComponent == null) { + continue; + } + // If any Process Group is removed, consider all components below that Process Group as an affected component + if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) { + final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId(); + final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId); + + localGroup.findAllProcessors().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllFunnels().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllInputPorts().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllOutputPorts().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllRemoteProcessGroups().stream() + .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream())) + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + localGroup.findAllControllerServices().stream() + .map(comp -> createAffectedComponentEntity(comp, user)) + .forEach(affectedComponents::add); + } + + if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) { + final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId(); + final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId); + + final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); + for (final ControllerServiceNode referencingService : referencingServices) { + affectedComponents.add(createAffectedComponentEntity(referencingService, user)); + } + + final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); + for (final ProcessorNode referencingProcessor : referencingProcessors) { + affectedComponents.add(createAffectedComponentEntity(referencingProcessor, user)); + } + } + } + + // Create a map of all connectable components by versioned component ID to the connectable component itself + final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>(); + mapToConnectableId(group.findAllFunnels(), connectablesByVersionId); + mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId); + mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId); + mapToConnectableId(group.findAllProcessors(), connectablesByVersionId); + + final List<RemoteGroupPort> remotePorts = new ArrayList<>(); + for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) { + remotePorts.addAll(rpg.getInputPorts()); + remotePorts.addAll(rpg.getOutputPorts()); + } + mapToConnectableId(remotePorts, connectablesByVersionId); + + // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently) + // and the destination (if it exists in the flow currently). for (final FlowDifference difference : comparison.getDifferences()) { VersionedComponent component = difference.getComponentA(); if (component == null) { component = difference.getComponentB(); } - if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) { - final VersionedConnection connection = (VersionedConnection) component; + if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) { + continue; + } - final String versionedConnectionId = connection.getIdentifier(); - final List<Connection> instances = connectionsByVersionedId.get(versionedConnectionId); - if (instances == null) { - continue; + final VersionedConnection connection = (VersionedConnection) component; + + final String sourceVersionedId = connection.getSource().getId(); + final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId); + if (sources != null) { + for (final Connectable source : sources) { + affectedComponents.add(createAffectedComponentEntity(source, user)); } + } - for (final Connection instance : instances) { - affectedComponents.add(createAffectedComponentEntity(instance.getSource(), user)); - affectedComponents.add(createAffectedComponentEntity(instance.getDestination(), user)); + final String destinationVersionId = connection.getDestination().getId(); + final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId); + if (destinations != null) { + for (final Connectable destination : destinations) { + affectedComponents.add(createAffectedComponentEntity(destination, user)); } } } @@ -4003,6 +4092,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return affectedComponents; } + private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) { + for (final Connectable connectable : connectables) { + final Optional<String> versionedId = connectable.getVersionedComponentId(); + if (!versionedId.isPresent()) { + continue; + } + + final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId.get(), key -> new ArrayList<>()); + byVersionedId.add(connectable); + } + } + private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) { final AffectedComponentEntity entity = new AffectedComponentEntity(); @@ -4023,6 +4124,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entity; } + private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode, final NiFiUser user) { + final AffectedComponentEntity entity = new AffectedComponentEntity(); + entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()))); + entity.setId(serviceNode.getIdentifier()); + + final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable(); + final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user); + entity.setPermissions(permissionsDto); + + final AffectedComponentDTO dto = new AffectedComponentDTO(); + dto.setId(serviceNode.getIdentifier()); + dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier()); + dto.setState(serviceNode.getState().name()); + + entity.setComponent(dto); + return entity; + } + private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) { final AffectedComponentEntity entity = new AffectedComponentEntity(); entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId()))); http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index b106948..d8c8ddf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -45,6 +45,7 @@ import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.util.BundleUtils; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.ResumeFlowException; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.concurrent.AsyncRequestManager; import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest; @@ -1128,6 +1129,11 @@ public class VersionsResource extends ApplicationResource { idGenerationSeed, true, true); vcur.markComplete(updatedVersionControlEntity); + } catch (final ResumeFlowException rfe) { + // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow + // since in this case the flow was successfully updated - we just couldn't re-enable the components. + logger.error(rfe.getMessage(), rfe); + vcur.setFailureReason(rfe.getMessage()); } catch (final Exception e) { logger.error("Failed to update flow to new version", e); vcur.setFailureReason("Failed to update flow to new version due to " + e); @@ -1301,6 +1307,11 @@ public class VersionsResource extends ApplicationResource { idGenerationSeed, false, true); vcur.markComplete(updatedVersionControlEntity); + } catch (final ResumeFlowException rfe) { + // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow + // since in this case the flow was successfully updated - we just couldn't re-enable the components. + logger.error(rfe.getMessage(), rfe); + vcur.setFailureReason(rfe.getMessage()); } catch (final Exception e) { logger.error("Failed to update flow to new version", e); vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage()); @@ -1333,13 +1344,15 @@ public class VersionsResource extends ApplicationResource { private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri, final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity, final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed, - final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException { + final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException, ResumeFlowException { // Steps 6-7: Determine which components must be stopped and stop them. final Set<String> stoppableReferenceTypes = new HashSet<>(); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT); + stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT); + stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT); final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream() .filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType())) @@ -1459,7 +1472,14 @@ public class VersionsResource extends ApplicationResource { asyncRequest.setCancelCallback(enableServicesPause::cancel); final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user); logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size()); - componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); + + try { + componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); + } catch (final IllegalStateException ise) { + // Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide + // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. + throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise); + } } if (!asyncRequest.isCancelled()) { @@ -1474,7 +1494,14 @@ public class VersionsResource extends ApplicationResource { final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); asyncRequest.setCancelCallback(startComponentsPause::cancel); logger.info("Restarting {} Processors", componentsToStart.size()); - componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); + + try { + componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); + } catch (final IllegalStateException ise) { + // Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide + // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. + throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 4198303..3d9f521 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -179,6 +179,7 @@ import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity; +import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; @@ -1799,6 +1800,32 @@ public final class DtoFactory { return component; } + public AffectedComponentEntity createAffectedComponentEntity(final PortEntity portEntity, final String referenceType) { + if (portEntity == null) { + return null; + } + + final AffectedComponentEntity component = new AffectedComponentEntity(); + component.setBulletins(portEntity.getBulletins()); + component.setId(portEntity.getId()); + component.setPermissions(portEntity.getPermissions()); + component.setPosition(portEntity.getPosition()); + component.setRevision(portEntity.getRevision()); + component.setUri(portEntity.getUri()); + + final PortDTO portDto = portEntity.getComponent(); + final AffectedComponentDTO componentDto = new AffectedComponentDTO(); + componentDto.setId(portDto.getId()); + componentDto.setName(portDto.getName()); + componentDto.setProcessGroupId(portDto.getParentGroupId()); + componentDto.setReferenceType(referenceType); + componentDto.setState(portDto.getState()); + componentDto.setValidationErrors(portDto.getValidationErrors()); + component.setComponent(componentDto); + + return component; + } + public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) { if (serviceEntity == null) { return null; http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 459acfc..7582420 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -25,6 +25,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -74,11 +75,10 @@ public interface ProcessGroupDAO { /** * Verifies the specified controller services can be modified * - * @param groupId the ID of the process group * @param state the desired state * @param serviceIds the ID's of the controller services */ - void verifyActivateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds); + void verifyActivateControllerServices(ControllerServiceState state, Collection<String> serviceIds); /** * Schedules the components in the specified process group. @@ -93,11 +93,10 @@ public interface ProcessGroupDAO { /** * Enables or disables the controller services in the specified process group * - * @param groupId the id of the group * @param state the desired state * @param serviceIds the ID's of the services to enable or disable */ - Future<Void> activateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds); + Future<Void> activateControllerServices(ControllerServiceState state, Collection<String> serviceIds); /** * Updates the specified process group. http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 5bbb56f..e1d9e69 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -41,8 +41,10 @@ import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.dao.ProcessGroupDAO; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -130,18 +132,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override - public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) { - final ProcessGroup group = locateProcessGroup(flowController, groupId); - - group.findAllControllerServices().stream() - .filter(service -> serviceIds.contains(service.getIdentifier())) - .forEach(service -> { - if (state == ControllerServiceState.ENABLED) { - service.verifyCanEnable(); - } else { - service.verifyCanDisable(); - } - }); + public void verifyActivateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) { + final Set<ControllerServiceNode> serviceNodes = serviceIds.stream() + .map(flowController::getControllerServiceNode) + .collect(Collectors.toSet()); + + for (final ControllerServiceNode serviceNode : serviceNodes) { + if (state == ControllerServiceState.ENABLED) { + serviceNode.verifyCanEnable(serviceNodes); + } else { + serviceNode.verifyCanDisable(serviceNodes); + } + } } @Override @@ -201,26 +203,16 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override - public Future<Void> activateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) { - final ProcessGroup group = locateProcessGroup(flowController, groupId); - - CompletableFuture<Void> future = CompletableFuture.completedFuture(null); - for (final String serviceId : serviceIds) { - final ControllerServiceNode serviceNode = group.findControllerService(serviceId, true, true); - if (serviceNode == null) { - throw new ResourceNotFoundException("Could not find Controller Service with identifier " + serviceId); - } - - if (ControllerServiceState.ENABLED.equals(state)) { - final CompletableFuture<Void> serviceFuture = flowController.enableControllerService(serviceNode); - future = CompletableFuture.allOf(future, serviceFuture); - } else { - final CompletableFuture<Void> serviceFuture = flowController.disableControllerService(serviceNode); - future = CompletableFuture.allOf(future, serviceFuture); - } + public Future<Void> activateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) { + final List<ControllerServiceNode> serviceNodes = serviceIds.stream() + .map(flowController::getControllerServiceNode) + .collect(Collectors.toList()); + + if (state == ControllerServiceState.ENABLED) { + return flowController.enableControllerServicesAsync(serviceNodes); + } else { + return flowController.disableControllerServicesAsync(serviceNodes); } - - return future; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java index a801dcb..f257bb1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java @@ -27,6 +27,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; @@ -39,6 +40,14 @@ public class AffectedComponentUtils { case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR: final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user); return dtoFactory.createAffectedComponentEntity(procEntity); + case AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT: { + final PortEntity portEntity = serviceFacade.getInputPort(componentEntity.getId(), user); + return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT); + } + case AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT: { + final PortEntity portEntity = serviceFacade.getOutputPort(componentEntity.getId(), user); + return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT); + } case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE: final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user); return dtoFactory.createAffectedComponentEntity(serviceEntity); http://git-wip-us.apache.org/repos/asf/nifi/blob/b6117743/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java index e005d28..1c7e82d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java @@ -18,7 +18,9 @@ package org.apache.nifi.web.util; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; @@ -33,6 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -228,6 +233,46 @@ public class LocalComponentLifecycle implements ComponentLifecycle { waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, user); } + static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { + final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>(); + + for (final ControllerServiceNode node : serviceNodeMap.values()) { + final List<ControllerServiceNode> branch = new ArrayList<>(); + determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>()); + orderedNodeLists.add(branch); + } + + return orderedNodeLists; + } + + private static void determineEnablingOrder( + final Map<String, ControllerServiceNode> serviceNodeMap, + final ControllerServiceNode contextNode, + final List<ControllerServiceNode> orderedNodes, + final Set<ControllerServiceNode> visited) { + if (visited.contains(contextNode)) { + return; + } + + for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet()) { + if (entry.getKey().getControllerServiceDefinition() != null) { + final String referencedServiceId = entry.getValue(); + if (referencedServiceId != null) { + final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId); + if (!orderedNodes.contains(referencedNode)) { + visited.add(contextNode); + determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited); + } + } + } + } + + if (!orderedNodes.contains(contextNode)) { + orderedNodes.add(contextNode); + } + } + + /** * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. *
