http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index ec32cc1..282c50d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -16,7 +16,30 @@ */ package org.apache.nifi.groups; -import com.google.common.collect.Sets; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -29,6 +52,7 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; @@ -40,6 +64,7 @@ import org.apache.nifi.connectable.LocalPort; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Positionable; +import org.apache.nifi.connectable.Size; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; @@ -49,22 +74,58 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.registry.flow.Bundle; +import org.apache.nifi.registry.flow.ConnectableComponent; +import org.apache.nifi.registry.flow.FlowRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.RemoteFlowCoordinates; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; +import org.apache.nifi.registry.flow.UnknownResourceException; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedConnection; +import org.apache.nifi.registry.flow.VersionedControllerService; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFunnel; +import org.apache.nifi.registry.flow.VersionedLabel; +import org.apache.nifi.registry.flow.VersionedPort; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; +import org.apache.nifi.registry.flow.diff.ComparableDataFlow; +import org.apache.nifi.registry.flow.diff.DifferenceType; +import org.apache.nifi.registry.flow.diff.FlowComparator; +import org.apache.nifi.registry.flow.diff.FlowComparison; +import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; +import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.Revision; @@ -72,23 +133,6 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public final class StandardProcessGroup implements ProcessGroup { private final String id; @@ -96,6 +140,8 @@ public final class StandardProcessGroup implements ProcessGroup { private final AtomicReference<String> name; private final AtomicReference<Position> position; private final AtomicReference<String> comments; + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); + private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>(); private final StandardProcessScheduler scheduler; private final ControllerServiceProvider controllerServiceProvider; @@ -782,7 +828,6 @@ public final class StandardProcessGroup implements ProcessGroup { removed = true; LOG.info("{} removed from flow", processor); - } finally { if (removed) { try { @@ -1935,7 +1980,6 @@ public final class StandardProcessGroup implements ProcessGroup { removed = true; LOG.info("{} removed from {}", service, this); - } finally { if (removed) { try { @@ -2234,7 +2278,7 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public Set<Positionable> findAllPositionables() { - Set<Positionable> positionables = Sets.newHashSet(); + final Set<Positionable> positionables = new HashSet<>(); positionables.addAll(findAllConnectables(this, true)); List<ProcessGroup> allProcessGroups = findAllProcessGroups(); positionables.addAll(allProcessGroups); @@ -2371,7 +2415,7 @@ public final class StandardProcessGroup implements ProcessGroup { connection.verifyCanDelete(); } - for(final ControllerServiceNode cs : controllerServices.values()) { + for (final ControllerServiceNode cs : controllerServices.values()) { cs.verifyCanDelete(); } @@ -2647,6 +2691,31 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String componentId) { + writeLock.lock(); + try { + final String currentId = versionedComponentId.get(); + + if (currentId == null) { + versionedComponentId.set(componentId); + } else if (currentId.equals(componentId)) { + return; + } else if (componentId == null) { + versionedComponentId.set(null); + } else { + throw new IllegalStateException(this + " is already under version control with a different Versioned Component ID"); + } + } finally { + writeLock.unlock(); + } + } + + @Override public Set<ConfiguredComponent> getComponentsAffectedByVariable(final String variableName) { final Set<ConfiguredComponent> affected = new HashSet<>(); @@ -2736,4 +2805,1072 @@ public final class StandardProcessGroup implements ProcessGroup { } } + @Override + public VersionControlInformation getVersionControlInformation() { + return versionControlInfo.get(); + } + + @Override + public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) { + final StandardVersionControlInformation svci = new StandardVersionControlInformation(versionControlInformation.getRegistryIdentifier(), + versionControlInformation.getBucketIdentifier(), + versionControlInformation.getFlowIdentifier(), + versionControlInformation.getVersion(), + versionControlInformation.getFlowSnapshot(), + versionControlInformation.getModified().orElse(null), + versionControlInformation.getCurrent().orElse(null)) { + + @Override + public Optional<Boolean> getModified() { + return StandardProcessGroup.this.isModified(); + } + }; + + writeLock.lock(); + try { + updateVersionedComponentIds(this, versionedComponentIds); + this.versionControlInfo.set(svci); + } finally { + writeLock.unlock(); + } + } + + private void updateVersionedComponentIds(final ProcessGroup processGroup, final Map<String, String> versionedComponentIds) { + if (versionedComponentIds == null || versionedComponentIds.isEmpty()) { + return; + } + + processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier())); + + processGroup.getConnections().stream() + .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + processGroup.getProcessors().stream() + .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + processGroup.getInputPorts().stream() + .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + processGroup.getOutputPorts().stream() + .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + processGroup.getLabels().stream() + .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + processGroup.getFunnels().stream() + .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + processGroup.getControllerServices(false).stream() + .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier()))); + + processGroup.getRemoteProcessGroups().stream() + .forEach(rpg -> { + rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier())); + + rpg.getInputPorts().stream() + .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier()))); + + rpg.getOutputPorts().stream() + .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier()))); + }); + + processGroup.getProcessGroups().stream() + .forEach(childGroup -> updateVersionedComponentIds(childGroup, versionedComponentIds)); + } + + + @Override + public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) { + final StandardVersionControlInformation vci = versionControlInfo.get(); + if (vci == null) { + return; + } + + final String registryId = vci.getRegistryIdentifier(); + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); + if (flowRegistry == null) { + LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry " + + "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId); + return; + } + + try { + final int latestVersion = flowRegistry.getLatestVersion(vci.getBucketIdentifier(), vci.getFlowIdentifier()); + + if (latestVersion == vci.getVersion()) { + LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion); + vci.setCurrent(true); + } else { + vci.setCurrent(false); + LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}", + new Object[] {this, vci.getVersion(), latestVersion}); + } + } catch (final IOException | UnknownResourceException e) { + LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); + } + + final VersionedProcessGroup snapshot = vci.getFlowSnapshot(); + if (snapshot == null) { + // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry. + // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry. + try { + final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion()); + final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents(); + vci.setFlowSnapshot(registryFlow); + } catch (final IOException | UnknownResourceException e) { + LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}", + new Object[] {this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e); + return; + } + } + } + + + @Override + public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty) { + writeLock.lock(); + try { + verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. Recursively if child is under VC also + + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient()); + + final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); + final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents()); + + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow); + final FlowComparison flowComparison = flowComparator.compare(); + + final Set<String> updatedVersionedComponentIds = flowComparison.getDifferences().stream() + .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED) + .map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier()) + .collect(Collectors.toSet()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences()); + } else { + // TODO: Remove the actual differences from the info level log. It can be extremely verbose. Is here only for testing purposes becuase it's much more convenient + // than having to remember to enable DEBUG level logging every time a full build is done. + LOG.info("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences()); + } + + updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false); + } catch (final ProcessorInstantiationException pie) { + throw new RuntimeException(pie); + } finally { + writeLock.unlock(); + } + } + + + private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed, + final Set<String> updatedVersionedComponentIds, final boolean updatePosition) throws ProcessorInstantiationException { + + group.setComments(proposed.getComments()); + group.setName(proposed.getName()); + if (updatePosition && proposed.getPosition() != null) { + group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + } + + // Determine which variables have been added/removed and add/remove them from this group's variable registry. + // We don't worry about if a variable value has changed, because variables are designed to be 'environment specific.' + // As a result, once imported, we won't update variables to match the remote flow, but we will add any missing variables + // and remove any variables that are no longer part of the remote flow. + final Set<String> existingVariableNames = group.getVariableRegistry().getVariableMap().keySet().stream() + .map(VariableDescriptor::getName) + .collect(Collectors.toSet()); + + final Set<String> variablesRemoved = new HashSet<>(existingVariableNames); + variablesRemoved.removeAll(proposed.getVariables().keySet()); + final Map<String, String> updatedVariableMap = new HashMap<>(); + variablesRemoved.forEach(var -> updatedVariableMap.put(var, null)); + + // If any new variables exist in the proposed flow, add those to the variable registry. + for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) { + if (!existingVariableNames.contains(entry.getKey())) { + updatedVariableMap.put(entry.getKey(), entry.getValue()); + } + } + + group.setVariables(updatedVariableMap); + + final RemoteFlowCoordinates remoteCoordinates = proposed.getRemoteFlowCoordinates(); + if (remoteCoordinates != null) { + final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl()); + final String bucketId = remoteCoordinates.getBucketId(); + final String flowId = remoteCoordinates.getFlowId(); + final int version = remoteCoordinates.getVersion(); + + final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, proposed, false, true); + group.setVersionControlInformation(vci, Collections.emptyMap()); + } + + // Child groups + // TODO: Need to take into account if child group is under version control pointing to a different Versioned Flow and if so need to handle it differently. + final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> childGroupsRemoved = new HashSet<>(childGroupsByVersionedId.keySet()); + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); + + if (childGroup == null) { + final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else { + updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true); + LOG.info("Updated {}", childGroup); + } + + childGroupsRemoved.remove(proposedChildGroup.getIdentifier()); + } + + + // Controller Services + final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet()); + + for (final VersionedControllerService proposedService : proposed.getControllerServices()) { + final ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier()); + if (service == null) { + final ControllerServiceNode added = addControllerService(proposedService, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { + updateControllerService(service, proposedService); + LOG.info("Updated {}", service); + } + + controllerServicesRemoved.remove(proposedService.getIdentifier()); + } + + + // Funnels + final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> funnelsRemoved = new HashSet<>(funnelsByVersionedId.keySet()); + + for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) { + final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier()); + if (funnel == null) { + final Funnel added = addFunnel(proposedFunnel, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) { + updateFunnel(funnel, proposedFunnel); + LOG.info("Updated {}", funnel); + } else { + funnel.setPosition(new Position(proposedFunnel.getPosition().getX(), proposedFunnel.getPosition().getY())); + } + + funnelsRemoved.remove(proposedFunnel.getIdentifier()); + } + + + // Input Ports + final Map<String, Port> inputPortsByVersionedId = group.getInputPorts().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> inputPortsRemoved = new HashSet<>(inputPortsByVersionedId.keySet()); + + for (final VersionedPort proposedPort : proposed.getInputPorts()) { + final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier()); + if (port == null) { + final Port added = addInputPort(proposedPort, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { + updatePort(port, proposedPort); + LOG.info("Updated {}", port); + } else { + port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY())); + } + + inputPortsRemoved.remove(proposedPort.getIdentifier()); + } + + // Output Ports + final Map<String, Port> outputPortsByVersionedId = group.getOutputPorts().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> outputPortsRemoved = new HashSet<>(outputPortsByVersionedId.keySet()); + + for (final VersionedPort proposedPort : proposed.getOutputPorts()) { + final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier()); + if (port == null) { + final Port added = addOutputPort(proposedPort, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) { + updatePort(port, proposedPort); + LOG.info("Updated {}", port); + } else { + port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY())); + } + + outputPortsRemoved.remove(proposedPort.getIdentifier()); + } + + + // Labels + final Map<String, Label> labelsByVersionedId = group.getLabels().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> labelsRemoved = new HashSet<>(labelsByVersionedId.keySet()); + + for (final VersionedLabel proposedLabel : proposed.getLabels()) { + final Label label = labelsByVersionedId.get(proposedLabel.getIdentifier()); + if (label == null) { + final Label added = addLabel(proposedLabel, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (updatedVersionedComponentIds.contains(proposedLabel.getIdentifier())) { + updateLabel(label, proposedLabel); + LOG.info("Updated {}", label); + } else { + label.setPosition(new Position(proposedLabel.getPosition().getX(), proposedLabel.getPosition().getY())); + } + + labelsRemoved.remove(proposedLabel.getIdentifier()); + } + + + // Processors + final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet()); + + for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { + final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); + if (processor == null) { + final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { + updateProcessor(processor, proposedProcessor); + LOG.info("Updated {}", processor); + } else { + processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); + } + + processorsRemoved.remove(proposedProcessor.getIdentifier()); + } + + + // Remote Groups + final Map<String, RemoteProcessGroup> rpgsByVersionedId = group.getRemoteProcessGroups().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> rpgsRemoved = new HashSet<>(rpgsByVersionedId.keySet()); + + for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) { + final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier()); + if (rpg == null) { + final RemoteProcessGroup added = addRemoteProcessGroup(proposedRpg, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) { + updateRemoteProcessGroup(rpg, proposedRpg); + LOG.info("Updated {}", rpg); + } else { + rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY())); + } + + rpgsRemoved.remove(proposedRpg.getIdentifier()); + } + + + // Connections + final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet()); + + for (final VersionedConnection proposedConnection : proposed.getConnections()) { + final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier()); + if (connection == null) { + final Connection added = addConnection(proposedConnection, componentIdSeed); + LOG.info("Added {} to {}", added, this); + } else if (!connection.getSource().isRunning() && !connection.getDestination().isRunning()) { + // If the connection needs to be updated, then the source and destination will already have + // been stopped (else, the validation above would fail). So if the source or the destination is running, + // then we know that we don't need to update the connection. + updateConnection(connection, proposedConnection); + LOG.info("Updated {}", connection); + } + + connectionsRemoved.remove(proposedConnection.getIdentifier()); + } + + // Remove components that exist in the local flow but not the remote flow. + + // Connections must be the first thing to remove, not the last. Otherwise, we will fail + // to remove a component if it has a connection going to it! + for (final String removedVersionedId : connectionsRemoved) { + final Connection connection = connectionsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", connection, group); + group.removeConnection(connection); + } + + for (final String removedVersionedId : controllerServicesRemoved) { + final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", service, group); + group.removeControllerService(service); + } + + for (final String removedVersionedId : funnelsRemoved) { + final Funnel funnel = funnelsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", funnel, group); + group.removeFunnel(funnel); + } + + for (final String removedVersionedId : inputPortsRemoved) { + final Port port = inputPortsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", port, group); + group.removeInputPort(port); + } + + for (final String removedVersionedId : outputPortsRemoved) { + final Port port = outputPortsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", port, group); + group.removeOutputPort(port); + } + + for (final String removedVersionedId : labelsRemoved) { + final Label label = labelsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", label, group); + group.removeLabel(label); + } + + for (final String removedVersionedId : processorsRemoved) { + final ProcessorNode processor = processorsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", processor, group); + group.removeProcessor(processor); + } + + for (final String removedVersionedId : rpgsRemoved) { + final RemoteProcessGroup rpg = rpgsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", rpg, group); + group.removeRemoteProcessGroup(rpg); + } + + for (final String removedVersionedId : childGroupsRemoved) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(removedVersionedId); + LOG.info("Removing {} from {}", childGroup, group); + group.removeProcessGroup(childGroup); + } + } + + protected String generateUuid(final String componentIdSeed) { + UUID uuid; + if (componentIdSeed == null) { + uuid = ComponentIdGenerator.generateId(); + } else { + try { + UUID seedId = UUID.fromString(componentIdSeed); + uuid = new UUID(seedId.getMostSignificantBits(), componentIdSeed.hashCode()); + } catch (Exception e) { + LOG.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation."); + uuid = UUID.nameUUIDFromBytes(componentIdSeed.getBytes(StandardCharsets.UTF_8)); + } + } + + return uuid.toString(); + } + + + private ProcessGroup addProcessGroup(final VersionedProcessGroup proposed, final String componentIdSeed) throws ProcessorInstantiationException { + final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); + group.setVersionedComponentId(proposed.getIdentifier()); + addProcessGroup(group); + updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true); + return group; + } + + private void updateConnection(final Connection connection, final VersionedConnection proposed) { + connection.setBendPoints(proposed.getBends().stream() + .map(pos -> new Position(pos.getX(), pos.getY())) + .collect(Collectors.toList())); + + connection.setDestination(getConnectable(proposed.getDestination())); + connection.setLabelIndex(proposed.getLabelIndex()); + connection.setName(proposed.getName()); + connection.setRelationships(proposed.getSelectedRelationships().stream() + .map(name -> new Relationship.Builder().name(name).build()) + .collect(Collectors.toSet())); + connection.setZIndex(proposed.getzIndex()); + + final FlowFileQueue queue = connection.getFlowFileQueue(); + queue.setBackPressureDataSizeThreshold(proposed.getBackPressureDataSizeThreshold()); + queue.setBackPressureObjectThreshold(proposed.getBackPressureObjectThreshold()); + queue.setFlowFileExpiration(proposed.getFlowFileExpiration()); + + final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers().stream() + .map(prioritizerName -> { + try { + return flowController.createPrioritizer(prioritizerName); + } catch (final Exception e) { + throw new RuntimeException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier()); + } + }) + .collect(Collectors.toList()); + + queue.setPriorities(prioritizers); + } + + private Connection addConnection(final VersionedConnection proposed, final String componentIdSeed) { + final Connectable source = getConnectable(proposed.getSource()); + if (source == null) { + throw new IllegalArgumentException("Connection has a source with identifier " + proposed.getIdentifier() + + " but no component could be found in the Process Group with a corresponding identifier"); + } + + final Connectable destination = getConnectable(proposed.getDestination()); + if (destination == null) { + throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier() + + " but no component could be found in the Process Group with a corresponding identifier"); + } + + final Connection connection = flowController.createConnection(generateUuid(componentIdSeed), proposed.getName(), source, destination, proposed.getSelectedRelationships()); + connection.setVersionedComponentId(proposed.getIdentifier()); + addConnection(connection); + updateConnection(connection, proposed); + + return connection; + } + + private Connectable getConnectable(final ConnectableComponent connectableComponent) { + final String id = connectableComponent.getId(); + + switch (connectableComponent.getType()) { + case FUNNEL: + return getFunnels().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + case INPUT_PORT: + return getInputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + case OUTPUT_PORT: + return getOutputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + case PROCESSOR: + return getProcessors().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + case REMOTE_INPUT_PORT: { + final String rpgId = connectableComponent.getGroupId(); + final Optional<RemoteProcessGroup> rpgOption = getRemoteProcessGroups().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny(); + + if (!rpgOption.isPresent()) { + throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID " + + rpgId + " but could not find a Remote Process Group corresponding to that ID"); + } + + final RemoteProcessGroup rpg = rpgOption.get(); + return rpg.getInputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + } + case REMOTE_OUTPUT_PORT: { + final String rpgId = connectableComponent.getGroupId(); + final Optional<RemoteProcessGroup> rpgOption = getRemoteProcessGroups().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny(); + + if (!rpgOption.isPresent()) { + throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID " + + rpgId + " but could not find a Remote Process Group corresponding to that ID"); + } + + final RemoteProcessGroup rpg = rpgOption.get(); + return rpg.getOutputPorts().stream() + .filter(component -> component.getVersionedComponentId().isPresent()) + .filter(component -> id.equals(component.getVersionedComponentId().get())) + .findAny() + .orElse(null); + } + } + + return null; + } + + private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) { + service.setAnnotationData(proposed.getAnnotationData()); + service.setComments(proposed.getComments()); + service.setName(proposed.getName()); + service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties())); + + if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) { + final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); + final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getProperties().keySet()); + final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors); + flowController.reload(service, proposed.getType(), newBundleCoordinate, additionalUrls); + } + } + + private boolean isEqual(final BundleCoordinate coordinate, final Bundle bundle) { + if (!bundle.getGroup().equals(coordinate.getGroup())) { + return false; + } + + if (!bundle.getArtifact().equals(coordinate.getId())) { + return false; + } + + if (!bundle.getVersion().equals(coordinate.getVersion())) { + return false; + } + + return true; + } + + private BundleCoordinate toCoordinate(final Bundle bundle) { + return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); + } + + private ControllerServiceNode addControllerService(final VersionedControllerService proposed, final String componentIdSeed) { + final String type = proposed.getType(); + final String id = generateUuid(componentIdSeed); + + final Bundle bundle = proposed.getBundle(); + final BundleCoordinate coordinate = toCoordinate(bundle); + final boolean firstTimeAdded = true; + final Set<URL> additionalUrls = Collections.emptySet(); + + final ControllerServiceNode newService = flowController.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded); + newService.setVersionedComponentId(proposed.getIdentifier()); + + addControllerService(newService); + updateControllerService(newService, proposed); + + return newService; + } + + private void updateFunnel(final Funnel funnel, final VersionedFunnel proposed) { + funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + } + + private Funnel addFunnel(final VersionedFunnel proposed, final String componentIdSeed) { + final Funnel funnel = flowController.createFunnel(generateUuid(componentIdSeed)); + funnel.setVersionedComponentId(proposed.getIdentifier()); + addFunnel(funnel); + updateFunnel(funnel, proposed); + + return funnel; + } + + private void updatePort(final Port port, final VersionedPort proposed) { + port.setComments(proposed.getComments()); + port.setName(proposed.getName()); + port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + } + + private Port addInputPort(final VersionedPort proposed, final String componentIdSeed) { + final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName()); + port.setVersionedComponentId(proposed.getIdentifier()); + addInputPort(port); + updatePort(port, proposed); + + return port; + } + + private Port addOutputPort(final VersionedPort proposed, final String componentIdSeed) { + final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName()); + port.setVersionedComponentId(proposed.getIdentifier()); + addOutputPort(port); + updatePort(port, proposed); + + return port; + } + + private Label addLabel(final VersionedLabel proposed, final String componentIdSeed) { + final Label label = flowController.createLabel(generateUuid(componentIdSeed), proposed.getLabel()); + label.setVersionedComponentId(proposed.getIdentifier()); + addLabel(label); + updateLabel(label, proposed); + + return label; + } + + private void updateLabel(final Label label, final VersionedLabel proposed) { + label.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + label.setSize(new Size(proposed.getWidth(), proposed.getHeight())); + label.setStyle(proposed.getStyle()); + label.setValue(proposed.getLabel()); + } + + private ProcessorNode addProcessor(final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException { + final BundleCoordinate coordinate = toCoordinate(proposed.getBundle()); + final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(componentIdSeed), coordinate, true); + procNode.setVersionedComponentId(proposed.getIdentifier()); + + addProcessor(procNode); + updateProcessor(procNode, proposed); + + return procNode; + } + + private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException { + processor.setAnnotationData(proposed.getAnnotationData()); + processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel())); + processor.setComments(proposed.getComments()); + processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount()); + processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode())); + processor.setName(proposed.getName()); + processor.setPenalizationPeriod(proposed.getPenaltyDuration()); + processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties())); + processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS); + processor.setScheduldingPeriod(proposed.getSchedulingPeriod()); + processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy())); + processor.setStyle(proposed.getStyle()); + processor.setYieldPeriod(proposed.getYieldDuration()); + processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + + processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream() + .map(relName -> processor.getRelationship(relName)) + .collect(Collectors.toSet())); + + if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) { + final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); + final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet()); + final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors); + flowController.reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls); + } + } + + private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) { + final Map<String, String> fullPropertyMap = new HashMap<>(); + for (final PropertyDescriptor property : currentProperties.keySet()) { + fullPropertyMap.put(property.getName(), null); + } + + fullPropertyMap.putAll(proposedProperties); + return fullPropertyMap; + } + + private RemoteProcessGroup addRemoteProcessGroup(final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { + final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(componentIdSeed), proposed.getTargetUris()); + rpg.setVersionedComponentId(proposed.getIdentifier()); + + addRemoteProcessGroup(rpg); + updateRemoteProcessGroup(rpg, proposed); + + return rpg; + } + + private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed) { + rpg.setComments(proposed.getComments()); + rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout()); + rpg.setInputPorts(proposed.getInputPorts().stream() + .map(port -> createPortDescriptor(port)) + .collect(Collectors.toSet())); + rpg.setName(proposed.getName()); + rpg.setNetworkInterface(proposed.getLocalNetworkInterface()); + rpg.setOutputPorts(proposed.getOutputPorts().stream() + .map(port -> createPortDescriptor(port)) + .collect(Collectors.toSet())); + rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + rpg.setProxyHost(proposed.getProxyHost()); + rpg.setProxyPort(proposed.getProxyPort()); + rpg.setProxyUser(proposed.getProxyUser()); + rpg.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(proposed.getTransportProtocol())); + rpg.setYieldDuration(proposed.getYieldDuration()); + } + + private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed) { + final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); + descriptor.setVersionedComponentId(proposed.getIdentifier()); + descriptor.setBatchCount(proposed.getBatchSize().getCount()); + descriptor.setBatchDuration(proposed.getBatchSize().getDuration()); + descriptor.setBatchSize(proposed.getBatchSize().getSize()); + descriptor.setComments(proposed.getComments()); + descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount()); + descriptor.setGroupId(proposed.getGroupId()); + descriptor.setId(UUID.randomUUID().toString()); // TODO: Need to address this issue of port id's + descriptor.setName(proposed.getName()); + descriptor.setUseCompression(proposed.isUseCompression()); + return descriptor; + } + + + public Optional<Boolean> isModified() { + final StandardVersionControlInformation vci = versionControlInfo.get(); + + // If this group is not under version control, then we need to notify the parent + // group (if any) that a modification has taken place. Otherwise, we need to + // compare the current the flow with the 'versioned snapshot' of the flow in order + // to determine if the flows are different. + // We cannot simply say 'if something changed then this flow is different than the versioned snapshot' + // because if we do this, and a user adds a processor then subsequently removes it, then the logic would + // say that the flow is modified. There would be no way to ever go back to the flow not being modified. + // So we have to perform a diff of the flows and see if they are the same. + if (vci == null) { + return Optional.of(Boolean.FALSE); + } + + if (vci.getFlowSnapshot() == null) { + // we haven't retrieved the flow from the Flow Registry yet, so we don't know if it's been modified. + // As a result, we will just return an empty optional + return Optional.empty(); + } + + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient()); + + final ComparableDataFlow currentFlow = new ComparableDataFlow() { + @Override + public VersionedProcessGroup getContents() { + return versionedGroup; + } + + @Override + public String getName() { + return "Local Flow"; + } + }; + + final ComparableDataFlow snapshotFlow = new ComparableDataFlow() { + @Override + public VersionedProcessGroup getContents() { + return vci.getFlowSnapshot(); + } + + @Override + public String getName() { + return "Flow Under Version Control"; + } + }; + + final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow); + final FlowComparison comparison = flowComparator.compare(); + final Set<FlowDifference> differences = comparison.getDifferences(); + final boolean modified = differences.stream() + .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED) + .filter(diff -> diff.getDifferenceType() != DifferenceType.STYLE_CHANGED) + .findAny() + .isPresent(); + + LOG.debug("There are {} differences between this flow and the versioned snapshot of this flow: {}", differences.size(), differences); + return Optional.of(modified); + } + + + @Override + public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) { + readLock.lock(); + try { + final VersionControlInformation versionControlInfo = getVersionControlInformation(); + if (versionControlInfo == null) { + throw new IllegalStateException("Cannot update the Version of the flow for " + this + + " because the Process Group is not currently under Version Control"); + } + + if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) { + throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with"); + } + + if (verifyNotDirty) { + final Optional<Boolean> modifiedOption = versionControlInfo.getModified(); + if (!modifiedOption.isPresent()) { + throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow " + + "has not yet been synchronized with the Flow Registry. The Process Group must be" + + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later"); + } + + if (Boolean.TRUE.equals(modifiedOption.get())) { + throw new IllegalStateException("Cannot change the Version of the flow for " + this + + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be" + + " restored to its original form before changing the version"); + } + } + + final VersionedProcessGroup flowContents = updatedFlow.getFlowContents(); + if (verifyConnectionRemoval) { + // Determine which Connections have been removed. + final Map<String, Connection> removedConnectionByVersionedId = new HashMap<>(); + findAllConnections().stream() + .filter(conn -> conn.getVersionedComponentId().isPresent()) + .forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().get(), conn)); + + final Set<String> proposedFlowConnectionIds = new HashSet<>(); + findAllConnectionIds(flowContents, proposedFlowConnectionIds); + + for (final String proposedConnectionId : proposedFlowConnectionIds) { + removedConnectionByVersionedId.remove(proposedConnectionId); + } + + // If any connection that was removed has data in it, throw an IllegalStateException + for (final Connection connection : removedConnectionByVersionedId.values()) { + final FlowFileQueue flowFileQueue = connection.getFlowFileQueue(); + if (!flowFileQueue.isEmpty()) { + throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the " + + "proposed version does not contain " + + connection + " and the connection currently has data in the queue."); + } + } + } + + // Determine which input ports were removed from this process group + final Map<String, Port> removedInputPortsByVersionId = new HashMap<>(); + getInputPorts().stream() + .filter(port -> port.getVersionedComponentId().isPresent()) + .forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port)); + flowContents.getInputPorts().stream() + .map(VersionedPort::getIdentifier) + .forEach(id -> removedInputPortsByVersionId.remove(id)); + + // Ensure that there are no incoming connections for any Input Port that was removed. + for (final Port inputPort : removedInputPortsByVersionId.values()) { + final List<Connection> incomingConnections = inputPort.getIncomingConnections(); + if (!incomingConnections.isEmpty()) { + throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Input Port " + + inputPort + " and the Input Port currently has an incoming connections"); + } + } + + // Determine which output ports were removed from this process group + final Map<String, Port> removedOutputPortsByVersionId = new HashMap<>(); + getOutputPorts().stream() + .filter(port -> port.getVersionedComponentId().isPresent()) + .forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port)); + flowContents.getOutputPorts().stream() + .map(VersionedPort::getIdentifier) + .forEach(id -> removedOutputPortsByVersionId.remove(id)); + + // Ensure that there are no outgoing connections for any Output Port that was removed. + for (final Port outputPort : removedOutputPortsByVersionId.values()) { + final Set<Connection> outgoingConnections = outputPort.getConnections(); + if (!outgoingConnections.isEmpty()) { + throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Output Port " + + outputPort + " and the Output Port currently has an outgoing connections"); + } + } + + // Find any Process Groups that may have been deleted. If we find any Process Group that was deleted, and that Process Group + // has Templates, then we fail because the Templates have to be removed first. + final Map<String, VersionedProcessGroup> proposedProcessGroups = new HashMap<>(); + findAllProcessGroups(updatedFlow.getFlowContents(), proposedProcessGroups); + + for (final ProcessGroup childGroup : findAllProcessGroups()) { + if (childGroup.getTemplates().isEmpty()) { + continue; + } + + final Optional<String> versionedIdOption = childGroup.getVersionedComponentId(); + if (!versionedIdOption.isPresent()) { + continue; + } + + final String versionedId = versionedIdOption.get(); + if (!proposedProcessGroups.containsKey(versionedId)) { + // Process Group was removed. + throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the child " + childGroup + + " that exists locally has one or more Templates, and the proposed flow does not contain this Process Group. " + + "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to chnage the version of the flow."); + } + } + + + // Ensure that all Processors are instantiate-able. + final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>(); + findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors); + + findAllProcessors().stream() + .filter(proc -> proc.getVersionedComponentId().isPresent()) + .forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().get())); + + for (final VersionedProcessor processorToAdd : proposedProcessors.values()) { + final BundleCoordinate coordinate = toCoordinate(processorToAdd.getBundle()); + try { + flowController.createProcessor(processorToAdd.getType(), UUID.randomUUID().toString(), coordinate, false); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create Processor of type " + processorToAdd.getType(), e); + } + } + + // Ensure that all Controller Services are instantiate-able. + final Map<String, VersionedControllerService> proposedServices = new HashMap<>(); + findAllControllerServices(updatedFlow.getFlowContents(), proposedServices); + + findAllControllerServices().stream() + .filter(service -> service.getVersionedComponentId().isPresent()) + .forEach(service -> proposedServices.remove(service.getVersionedComponentId().get())); + + for (final VersionedControllerService serviceToAdd : proposedServices.values()) { + final BundleCoordinate coordinate = toCoordinate(serviceToAdd.getBundle()); + try { + flowController.createControllerService(serviceToAdd.getType(), UUID.randomUUID().toString(), coordinate, Collections.emptySet(), false); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create Controller Service of type " + serviceToAdd.getType(), e); + } + } + + // Ensure that all Prioritizers are instantiate-able. + final Map<String, VersionedConnection> proposedConnections = new HashMap<>(); + findAllConnections(updatedFlow.getFlowContents(), proposedConnections); + + findAllConnections().stream() + .filter(conn -> conn.getVersionedComponentId().isPresent()) + .forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().get())); + + for (final VersionedConnection connectionToAdd : proposedConnections.values()) { + for (final String prioritizerType : connectionToAdd.getPrioritizers()) { + try { + flowController.createPrioritizer(prioritizerType); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create Prioritizer of type " + prioritizerType, e); + } + } + } + } finally { + readLock.unlock(); + } + } + + private void findAllConnectionIds(final VersionedProcessGroup group, final Set<String> ids) { + for (final VersionedConnection connection : group.getConnections()) { + ids.add(connection.getIdentifier()); + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + findAllConnectionIds(childGroup, ids); + } + } + + private void findAllProcessors(final VersionedProcessGroup group, final Map<String, VersionedProcessor> map) { + for (final VersionedProcessor processor : group.getProcessors()) { + map.put(processor.getIdentifier(), processor); + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + findAllProcessors(childGroup, map); + } + } + + private void findAllControllerServices(final VersionedProcessGroup group, final Map<String, VersionedControllerService> map) { + for (final VersionedControllerService service : group.getControllerServices()) { + map.put(service.getIdentifier(), service); + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + findAllControllerServices(childGroup, map); + } + } + + private void findAllConnections(final VersionedProcessGroup group, final Map<String, VersionedConnection> map) { + for (final VersionedConnection connection : group.getConnections()) { + map.put(connection.getIdentifier(), connection); + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + findAllConnections(childGroup, map); + } + } + + private void findAllProcessGroups(final VersionedProcessGroup group, final Map<String, VersionedProcessGroup> map) { + map.put(group.getIdentifier(), group); + + for (final VersionedProcessGroup child : group.getProcessGroups()) { + findAllProcessGroups(child, map); + } + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java new file mode 100644 index 0000000..22ba50b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.util.DefaultPrettyPrinter; + +/** + * A simple file-based implementation of a Flow Registry Client. Rather than interacting + * with an actual Flow Registry, this implementation simply reads flows from disk and writes + * them to disk. It is not meant for any production use but is available for testing purposes. + */ +public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegistry { + private final File directory; + private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>(); + private final JsonFactory jsonFactory = new JsonFactory(); + + public FileBasedFlowRegistryClient(final File directory) throws IOException { + if (!directory.exists() && !directory.mkdirs()) { + throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry"); + } + + this.directory = directory; + recoverBuckets(); + } + + private void recoverBuckets() throws IOException { + final File[] bucketDirs = directory.listFiles(); + if (bucketDirs == null) { + throw new IOException("Could not get listing of directory " + directory); + } + + for (final File bucketDir : bucketDirs) { + final File[] flowDirs = bucketDir.listFiles(); + if (flowDirs == null) { + throw new IOException("Could not get listing of directory " + bucketDir); + } + + final Set<String> flowNames = new HashSet<>(); + for (final File flowDir : flowDirs) { + final File propsFile = new File(flowDir, "flow.properties"); + if (!propsFile.exists()) { + continue; + } + + final Properties properties = new Properties(); + try (final InputStream in = new FileInputStream(propsFile)) { + properties.load(in); + } + + final String flowName = properties.getProperty("name"); + if (flowName == null) { + continue; + } + + flowNames.add(flowName); + } + + if (!flowNames.isEmpty()) { + flowNamesByBucket.put(bucketDir.getName(), flowNames); + } + } + } + + @Override + public FlowRegistry getFlowRegistry(final String registryId) { + if (!"default".equals(registryId)) { + return null; + } + + return this; + } + + @Override + public String getURL() { + return directory.toURI().toString(); + } + + @Override + public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException { + Objects.requireNonNull(flow); + Objects.requireNonNull(flow.getBucketIdentifier()); + Objects.requireNonNull(flow.getName()); + + // Verify that bucket exists + final File bucketDir = new File(directory, flow.getBucketIdentifier()); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); + } + + // Verify that there is no flow with the same name in that bucket + final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier()); + if (flowNames != null && flowNames.contains(flow.getName())) { + throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier()); + } + + final String flowIdentifier = UUID.randomUUID().toString(); + final File flowDir = new File(bucketDir, flowIdentifier); + if (!flowDir.mkdirs()) { + throw new IOException("Failed to create directory " + flowDir + " for new Flow"); + } + + final File propertiesFile = new File(flowDir, "flow.properties"); + + final Properties flowProperties = new Properties(); + flowProperties.setProperty("name", flow.getName()); + flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp())); + flowProperties.setProperty("description", flow.getDescription()); + flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp())); + + try (final OutputStream out = new FileOutputStream(propertiesFile)) { + flowProperties.store(out, null); + } + + final VersionedFlow response = new VersionedFlow(); + response.setBucketIdentifier(flow.getBucketIdentifier()); + response.setCreatedTimestamp(flow.getCreatedTimestamp()); + response.setDescription(flow.getDescription()); + response.setIdentifier(flowIdentifier); + response.setModifiedTimestamp(flow.getModifiedTimestamp()); + response.setName(flow.getName()); + + return response; + } + + @Override + public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments) + throws IOException, UnknownResourceException { + Objects.requireNonNull(flow); + Objects.requireNonNull(flow.getBucketIdentifier()); + Objects.requireNonNull(flow.getName()); + Objects.requireNonNull(snapshot); + + // Verify that the bucket exists + final File bucketDir = new File(directory, flow.getBucketIdentifier()); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier()); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flow.getIdentifier()); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier()); + } + + final File[] versionDirs = flowDir.listFiles(); + if (versionDirs == null) { + throw new IOException("Unable to perform listing of directory " + flowDir); + } + + int maxVersion = 0; + for (final File versionDir : versionDirs) { + final String versionName = versionDir.getName(); + + final int version; + try { + version = Integer.parseInt(versionName); + } catch (final NumberFormatException nfe) { + continue; + } + + if (version > maxVersion) { + maxVersion = version; + } + } + + final int snapshotVersion = maxVersion + 1; + final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion)); + if (!snapshotDir.mkdir()) { + throw new IOException("Could not create directory " + snapshotDir); + } + + final File contentsFile = new File(snapshotDir, "flow.xml"); + + try (final OutputStream out = new FileOutputStream(contentsFile); + final JsonGenerator generator = jsonFactory.createJsonGenerator(out)) { + generator.setCodec(new ObjectMapper()); + generator.setPrettyPrinter(new DefaultPrettyPrinter()); + generator.writeObject(snapshot); + } + + final Properties snapshotProperties = new Properties(); + snapshotProperties.setProperty("comments", comments); + snapshotProperties.setProperty("name", flow.getName()); + final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties"); + try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) { + snapshotProperties.store(out, null); + } + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier()); + snapshotMetadata.setComments(comments); + snapshotMetadata.setFlowIdentifier(flow.getIdentifier()); + snapshotMetadata.setFlowName(flow.getName()); + snapshotMetadata.setTimestamp(System.currentTimeMillis()); + snapshotMetadata.setVersion(snapshotVersion); + + final VersionedFlowSnapshot response = new VersionedFlowSnapshot(); + response.setSnapshotMetadata(snapshotMetadata); + response.setFlowContents(snapshot); + return response; + } + + @Override + public Set<String> getRegistryIdentifiers() { + return Collections.singleton("default"); + } + + @Override + public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException { + // Verify that the bucket exists + final File bucketDir = new File(directory, bucketId); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + bucketId); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flowId); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId); + } + + final File[] versionDirs = flowDir.listFiles(); + if (versionDirs == null) { + throw new IOException("Unable to perform listing of directory " + flowDir); + } + + int maxVersion = 0; + for (final File versionDir : versionDirs) { + final String versionName = versionDir.getName(); + + final int version; + try { + version = Integer.parseInt(versionName); + } catch (final NumberFormatException nfe) { + continue; + } + + if (version > maxVersion) { + maxVersion = version; + } + } + + return maxVersion; + } + + @Override + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException { + // Verify that the bucket exists + final File bucketDir = new File(directory, bucketId); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + bucketId); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flowId); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); + } + + final File versionDir = new File(flowDir, String.valueOf(version)); + if (!versionDir.exists()) { + throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version); + } + + final File contentsFile = new File(versionDir, "flow.xml"); + + final VersionedProcessGroup processGroup; + try (final JsonParser parser = jsonFactory.createJsonParser(contentsFile)) { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + parser.setCodec(mapper); + processGroup = parser.readValueAs(VersionedProcessGroup.class); + } + + final Properties properties = new Properties(); + final File snapshotPropsFile = new File(versionDir, "snapshot.properties"); + try (final InputStream in = new FileInputStream(snapshotPropsFile)) { + properties.load(in); + } + + final String comments = properties.getProperty("comments"); + final String flowName = properties.getProperty("name"); + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(bucketId); + snapshotMetadata.setComments(comments); + snapshotMetadata.setFlowIdentifier(flowId); + snapshotMetadata.setFlowName(flowName); + snapshotMetadata.setTimestamp(System.currentTimeMillis()); + snapshotMetadata.setVersion(version); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setFlowContents(processGroup); + snapshot.setSnapshotMetadata(snapshotMetadata); + + return snapshot; + } + + @Override + public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException { + // Verify that the bucket exists + final File bucketDir = new File(directory, bucketId); + if (!bucketDir.exists()) { + throw new UnknownResourceException("No bucket exists with ID " + bucketId); + } + + // Verify that the flow exists + final File flowDir = new File(bucketDir, flowId); + if (!flowDir.exists()) { + throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId); + } + + final File flowPropsFile = new File(flowDir, "flow.properties"); + final Properties flowProperties = new Properties(); + try (final InputStream in = new FileInputStream(flowPropsFile)) { + flowProperties.load(in); + } + + final VersionedFlow flow = new VersionedFlow(); + flow.setBucketIdentifier(bucketId); + flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created"))); + flow.setDescription(flowProperties.getProperty("description")); + flow.setIdentifier(flowId); + flow.setModifiedTimestamp(flowDir.lastModified()); + flow.setName(flowProperties.getProperty("name")); + + final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion()); + + final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator); + flow.setSnapshotMetadata(snapshotMetadataSet); + + final File[] versionDirs = flowDir.listFiles(); + for (final File file : versionDirs) { + if (!file.isDirectory()) { + continue; + } + + int version; + try { + version = Integer.parseInt(file.getName()); + } catch (final NumberFormatException nfe) { + // not a version. skip. + continue; + } + + final File snapshotPropsFile = new File(file, "snapshot.properties"); + final Properties snapshotProperties = new Properties(); + try (final InputStream in = new FileInputStream(snapshotPropsFile)) { + snapshotProperties.load(in); + } + + final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); + metadata.setBucketIdentifier(bucketId); + metadata.setComments(snapshotProperties.getProperty("comments")); + metadata.setFlowIdentifier(flowId); + metadata.setFlowName(snapshotProperties.getProperty("name")); + metadata.setTimestamp(file.lastModified()); + metadata.setVersion(version); + + snapshotMetadataSet.add(metadata); + } + + return flow; + } +}
