NIFI-4436: Bug fixes - Checkpoint before allowing multiple Process Groups with same Versioned Component ID and same parent - Ensure that if flow update is cancelled while processors are being stopped/services disabled that we stop waiting for that to occur. Also ensure that if we fail to update flow that we re-enable/restart the processors and services - Updated verbiage to use a ConciseEvolvingDifferentDescriptor when getting local modifications for a versioned flow - Do not allow outer process group to be saved to flow registry or have local modifications reverted if it has a descendant process group that is under version control and is dirty. Fixed bug where ComponentDifferenceDTO was populated with wrong component id and group id
Signed-off-by: Matt Gilman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/adacb204 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/adacb204 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/adacb204 Branch: refs/heads/master Commit: adacb204a8b6518a79600253463a36c9f8afaa37 Parents: 3d8b1e4 Author: Mark Payne <[email protected]> Authored: Fri Nov 17 11:02:33 2017 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 12:44:53 2018 -0500 ---------------------------------------------------------------------- .../api/entity/VersionedFlowSnapshotEntity.java | 11 + .../org/apache/nifi/groups/ProcessGroup.java | 27 +- .../apache/nifi/registry/flow/FlowRegistry.java | 9 +- .../apache/nifi/controller/FlowController.java | 54 +++- .../nifi/groups/StandardProcessGroup.java | 143 ++++++++-- .../registry/flow/RestBasedFlowRegistry.java | 17 +- .../service/mock/MockProcessGroup.java | 14 +- .../org/apache/nifi/web/NiFiServiceFacade.java | 34 ++- .../nifi/web/StandardNiFiServiceFacade.java | 93 ++----- .../nifi/web/api/ProcessGroupResource.java | 11 +- .../apache/nifi/web/api/VersionsResource.java | 273 +++++++++---------- .../api/concurrent/AsynchronousWebRequest.java | 8 + .../StandardAsynchronousWebRequest.java | 7 + .../org/apache/nifi/web/api/dto/DtoFactory.java | 15 +- .../apache/nifi/web/dao/ProcessGroupDAO.java | 4 +- .../web/dao/impl/StandardProcessGroupDAO.java | 10 +- .../nifi/web/util/CancellableTimedPause.java | 2 +- .../org/apache/nifi/web/util/SnippetUtils.java | 16 +- 18 files changed, 490 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java index 170640d..2faf791 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java @@ -28,6 +28,7 @@ public class VersionedFlowSnapshotEntity extends Entity { private VersionedFlowSnapshot versionedFlowSnapshot; private RevisionDTO processGroupRevision; private String registryId; + private Boolean updateDescendantVersionedFlows; @ApiModelProperty("The versioned flow snapshot") public VersionedFlowSnapshot getVersionedFlowSnapshot() { @@ -55,4 +56,14 @@ public class VersionedFlowSnapshotEntity extends Entity { public void setRegistryId(String registryId) { this.registryId = registryId; } + + @ApiModelProperty("If the Process Group to be updated has a child or descendant Process Group that is also under " + + "Version Control, this specifies whether or not the contents of that child/descendant Process Group should be updated.") + public Boolean getUpdateDescendantVersionedFlows() { + return updateDescendantVersionedFlows; + } + + public void setUpdateDescendantVersionedFlows(Boolean update) { + this.updateDescendantVersionedFlows = update; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 16b4b5e..d81b7d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -784,8 +784,10 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will * throw an IllegalStateException * @param updateSettings whether or not to update the process group's name and positions + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group */ - void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings); + void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows); /** * Verifies a template with the specified name can be created. @@ -848,7 +850,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi void verifyCanUpdateVariables(Map<String, String> updatedVariables); /** - * Ensure that the contents of the Process Group can be update to match the given new flow + * Ensures that the contents of the Process Group can be update to match the given new flow * * @param updatedFlow the updated version of the flow * @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed @@ -860,6 +862,27 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); /** + * Ensures that the Process Group can have any local changes reverted + * + * @throws IllegalStateException if the Process Group is not in a state that will allow local changes to be reverted + */ + void verifyCanRevertLocalModifications(); + + /** + * Ensures that the Process Group can have its local modifications shown + * + * @throws IllegalStateException if the Process Group is not in a state that will allow local modifications to be shown + */ + void verifyCanShowLocalModifications(); + + /** + * Ensure that the contents of the Process Group can be saved to a Flow Registry in its current state + * + * @throws IllegalStateException if the Process Group cannot currently be saved to a Flow Registry + */ + void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId); + + /** * Adds the given template to this Process Group * * @param template the template to add http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java index 76f96f2..ae43bb5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -159,6 +159,9 @@ public interface FlowRegistry { * @param bucketId the ID of the bucket * @param flowId the ID of the flow * @param version the version to retrieve + * @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not + * the child's contents should be fetched. + * @param user the user on whose behalf the flow contents are being retrieved * @return the contents of the Flow from the Flow Registry * * @throws IOException if unable to communicate with the Flow Registry @@ -167,7 +170,7 @@ public interface FlowRegistry { * @throws NullPointerException if any of the arguments is not specified * @throws IllegalArgumentException if the given version is less than 1 */ - VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, NiFiUser user) throws IOException, NiFiRegistryException; + VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows, NiFiUser user) throws IOException, NiFiRegistryException; /** * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry @@ -175,6 +178,8 @@ public interface FlowRegistry { * @param bucketId the ID of the bucket * @param flowId the ID of the flow * @param version the version to retrieve + * @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not + * the child's contents should be fetched. * @return the contents of the Flow from the Flow Registry * * @throws IOException if unable to communicate with the Flow Registry @@ -183,7 +188,7 @@ public interface FlowRegistry { * @throws NullPointerException if any of the arguments is not specified * @throws IllegalArgumentException if the given version is less than 1 */ - VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException; + VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws IOException, NiFiRegistryException; /** * Retrieves a VersionedFlow by bucket id and flow id http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 5ed5b6e..3909387 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -165,8 +165,11 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; +import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; @@ -1775,6 +1778,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * processor */ public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { + instantiateSnippet(group, dto, true); + } + + private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException { writeLock.lock(); try { validateSnippetContents(requireNonNull(group), dto); @@ -1789,6 +1796,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData()); serviceNode.setComments(controllerServiceDTO.getComments()); serviceNode.setName(controllerServiceDTO.getName()); + if (!topLevel) { + serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId()); + } group.addControllerService(serviceNode); } @@ -1812,6 +1822,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } label.setStyle(labelDTO.getStyle()); + if (!topLevel) { + label.setVersionedComponentId(labelDTO.getVersionedComponentId()); + } + group.addLabel(label); } @@ -1819,6 +1833,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final FunnelDTO funnelDTO : dto.getFunnels()) { final Funnel funnel = createFunnel(funnelDTO.getId()); funnel.setPosition(toPosition(funnelDTO.getPosition())); + if (!topLevel) { + funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId()); + } + group.addFunnel(funnel); } @@ -1840,6 +1858,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName()); } + if (!topLevel) { + inputPort.setVersionedComponentId(portDTO.getVersionedComponentId()); + } inputPort.setPosition(toPosition(portDTO.getPosition())); inputPort.setProcessGroup(group); inputPort.setComments(portDTO.getComments()); @@ -1861,6 +1882,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName()); } + if (!topLevel) { + outputPort.setVersionedComponentId(portDTO.getVersionedComponentId()); + } outputPort.setPosition(toPosition(portDTO.getPosition())); outputPort.setProcessGroup(group); outputPort.setComments(portDTO.getComments()); @@ -1876,6 +1900,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R procNode.setPosition(toPosition(processorDTO.getPosition())); procNode.setProcessGroup(group); + if (!topLevel) { + procNode.setVersionedComponentId(processorDTO.getVersionedComponentId()); + } final ProcessorConfigDTO config = processorDTO.getConfig(); procNode.setComments(config.getComments()); @@ -1936,6 +1963,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition())); remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout()); remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration()); + if (!topLevel) { + remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId()); + } + if (remoteGroupDTO.getTransportProtocol() == null) { remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW); } else { @@ -1979,6 +2010,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R childGroup.setVariables(groupDTO.getVariables()); } + // If this Process Group is 'top level' then we do not set versioned component ID's. + // We do this only if this component is the child of a Versioned Component. + if (!topLevel) { + childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId()); + } + group.addProcessGroup(childGroup); final FlowSnippetDTO contents = groupDTO.getContents(); @@ -1995,7 +2032,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R childTemplateDTO.setFunnels(contents.getFunnels()); childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups()); childTemplateDTO.setControllerServices(contents.getControllerServices()); - instantiateSnippet(childGroup, childTemplateDTO); + instantiateSnippet(childGroup, childTemplateDTO, false); + + if (groupDTO.getVersionControlInformation() != null) { + final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false); + + final VersionControlInformation vci = StandardVersionControlInformation.Builder + .fromDto(groupDTO.getVersionControlInformation()) + .flowSnapshot(versionedGroup) + .build(); + childGroup.setVersionControlInformation(vci, Collections.emptyMap()); + } } // @@ -2039,6 +2087,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships); + if (!topLevel) { + connection.setVersionedComponentId(connectionDTO.getVersionedComponentId()); + } if (connectionDTO.getBends() != null) { final List<Position> bendPoints = new ArrayList<>(); @@ -2088,6 +2139,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final RemoteProcessGroupPortDTO port : ports) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); descriptor.setId(port.getId()); + descriptor.setVersionedComponentId(port.getVersionedComponentId()); descriptor.setTargetId(port.getTargetId()); descriptor.setName(port.getName()); descriptor.setComments(port.getComments()); http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index d1aa4e2..51839d0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -2821,7 +2821,7 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.getBucketIdentifier(), versionControlInformation.getFlowIdentifier(), versionControlInformation.getVersion(), - versionControlInformation.getFlowSnapshot(), + stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true), versionControlInformation.isModified(), versionControlInformation.isCurrent()) { @@ -2849,6 +2849,51 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private VersionedProcessGroup stripContentsFromRemoteDescendantGroups(final VersionedProcessGroup processGroup, final boolean topLevel) { + if (processGroup == null) { + return null; + } + + final VersionedProcessGroup copy = new VersionedProcessGroup(); + copy.setComments(processGroup.getComments()); + copy.setComponentType(processGroup.getComponentType()); + copy.setGroupIdentifier(processGroup.getGroupIdentifier()); + copy.setIdentifier(processGroup.getIdentifier()); + copy.setName(processGroup.getName()); + copy.setPosition(processGroup.getPosition()); + copy.setVersionedFlowCoordinates(topLevel ? null : processGroup.getVersionedFlowCoordinates()); + copy.setConnections(processGroup.getConnections()); + copy.setControllerServices(processGroup.getControllerServices()); + copy.setFunnels(processGroup.getFunnels()); + copy.setInputPorts(processGroup.getInputPorts()); + copy.setOutputPorts(processGroup.getOutputPorts()); + copy.setProcessors(processGroup.getProcessors()); + copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups()); + copy.setVariables(processGroup.getVariables()); + + final Set<VersionedProcessGroup> copyChildren = new HashSet<>(); + + for (final VersionedProcessGroup childGroup : processGroup.getProcessGroups()) { + if (childGroup.getVersionedFlowCoordinates() == null) { + copyChildren.add(stripContentsFromRemoteDescendantGroups(childGroup, false)); + } else { + final VersionedProcessGroup childCopy = new VersionedProcessGroup(); + childCopy.setComments(childGroup.getComments()); + childCopy.setComponentType(childGroup.getComponentType()); + childCopy.setGroupIdentifier(childGroup.getGroupIdentifier()); + childCopy.setIdentifier(childGroup.getIdentifier()); + childCopy.setName(childGroup.getName()); + childCopy.setPosition(childGroup.getPosition()); + childCopy.setVersionedFlowCoordinates(childGroup.getVersionedFlowCoordinates()); + + copyChildren.add(childCopy); + } + } + + copy.setProcessGroups(copyChildren); + return copy; + } + @Override public void disconnectVersionControl() { writeLock.lock(); @@ -2900,7 +2945,7 @@ public final class StandardProcessGroup implements ProcessGroup { }); processGroup.getProcessGroups().stream() - .filter(childGroup -> childGroup.getVersionControlInformation() != null) + .filter(childGroup -> childGroup.getVersionControlInformation() == null) .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); } @@ -2925,7 +2970,7 @@ public final class StandardProcessGroup implements ProcessGroup { // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry. // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry. try { - final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion()); + final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion(), false); final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents(); vci.setFlowSnapshot(registryFlow); } catch (final IOException | NiFiRegistryException e) { @@ -2958,7 +3003,8 @@ public final class StandardProcessGroup implements ProcessGroup { @Override - public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) { + public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings, + final boolean updateDescendantVersionedFlows) { writeLock.lock(); try { verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); @@ -2986,7 +3032,7 @@ public final class StandardProcessGroup implements ProcessGroup { } final Set<String> knownVariables = getKnownVariableNames(); - updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, knownVariables); + updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables); } catch (final ProcessorInstantiationException pie) { throw new RuntimeException(pie); } finally { @@ -3013,7 +3059,8 @@ public final class StandardProcessGroup implements ProcessGroup { private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed, - final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final Set<String> variablesToSkip) throws ProcessorInstantiationException { + final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups, + final Set<String> variablesToSkip) throws ProcessorInstantiationException { group.setComments(proposed.getComments()); @@ -3033,14 +3080,8 @@ public final class StandardProcessGroup implements ProcessGroup { .map(VariableDescriptor::getName) .collect(Collectors.toSet()); - final Set<String> variablesRemoved = new HashSet<>(existingVariableNames); - - if (proposed.getVariables() != null) { - variablesRemoved.removeAll(proposed.getVariables().keySet()); - } final Map<String, String> updatedVariableMap = new HashMap<>(); - variablesRemoved.forEach(var -> updatedVariableMap.put(var, null)); // If any new variables exist in the proposed flow, add those to the variable registry. for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) { @@ -3069,6 +3110,7 @@ public final class StandardProcessGroup implements ProcessGroup { .flowId(flowId) .flowName(flowId) // flow id not yet known .version(version) + .flowSnapshot(proposed) .modified(false) .current(true) .build(); @@ -3084,11 +3126,13 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); + final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates(); + if (childGroup == null) { final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip); LOG.info("Added {} to {}", added, this); - } else { - updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, variablesToSkip); + } else if (childCoordinates == null || updateDescendantVersionedGroups) { + updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip); LOG.info("Updated {}", childGroup); } @@ -3367,7 +3411,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); group.setVersionedComponentId(proposed.getIdentifier()); group.setParent(destination); - updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, variablesToSkip); + updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip); destination.addProcessGroup(group); return group; } @@ -3739,7 +3783,7 @@ public final class StandardProcessGroup implements ProcessGroup { } final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false); final ComparableDataFlow currentFlow = new ComparableDataFlow() { @Override @@ -3765,7 +3809,7 @@ public final class StandardProcessGroup implements ProcessGroup { } }; - final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow, new EvolvingDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set<FlowDifference> differences = comparison.getDifferences(); final Set<FlowDifference> functionalDifferences = differences.stream() @@ -4002,4 +4046,69 @@ public final class StandardProcessGroup implements ProcessGroup { findAllProcessGroups(child, map); } } + + @Override + public void verifyCanSaveToFlowRegistry(final String registryId, final String bucketId, final String flowId) { + verifyNoDescendantsWithLocalModifications("be saved to a Flow Registry"); + + final StandardVersionControlInformation vci = versionControlInfo.get(); + if (vci != null) { + if (flowId != null && flowId.equals(vci.getFlowIdentifier())) { + // Flow ID is the same. We want to publish the Process Group as the next version of the Flow. + // In order to do this, we have to ensure that the Process Group is 'current'. + final boolean current = vci.isCurrent(); + if (!current) { + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. " + + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry."); + } + + // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must + // ensure that all other parameters match as well. + if (!bucketId.equals(vci.getBucketIdentifier())) { + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); + } + + if (!registryId.equals(vci.getRegistryIdentifier())) { + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); + } + } else if (flowId != null) { + // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated, + // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is + // attempting to save a new version of a different flow. Saving a new version of a different Flow is + // not allowed because the Process Group must be in synch with the latest version of the flow before that + // can be done. + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); + } + } + } + + @Override + public void verifyCanRevertLocalModifications() { + final StandardVersionControlInformation svci = versionControlInfo.get(); + if (svci == null) { + throw new IllegalStateException("Cannot revert local modifications to Process Group because the Process Group is not under Version Control."); + } + + verifyNoDescendantsWithLocalModifications("have its local modifications reverted"); + } + + @Override + public void verifyCanShowLocalModifications() { + + } + + private void verifyNoDescendantsWithLocalModifications(final String action) { + for (final ProcessGroup descendant : findAllProcessGroups()) { + final VersionControlInformation descendantVci = descendant.getVersionControlInformation(); + if (descendantVci != null && descendantVci.isModified()) { + throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and " + + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before " + + "this action can be performed on the parent Process Group."); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index 8bf89c6..1d3eec6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -178,21 +178,24 @@ public class RestBasedFlowRegistry implements FlowRegistry { } @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final NiFiUser user) throws IOException, NiFiRegistryException { + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user) + throws IOException, NiFiRegistryException { final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version); - final VersionedProcessGroup contents = flowSnapshot.getFlowContents(); - for (final VersionedProcessGroup child : contents.getProcessGroups()) { - populateVersionedContentsRecursively(child, user); + if (fetchRemoteFlows) { + final VersionedProcessGroup contents = flowSnapshot.getFlowContents(); + for (final VersionedProcessGroup child : contents.getProcessGroups()) { + populateVersionedContentsRecursively(child, user); + } } return flowSnapshot; } @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException { - return getFlowContents(bucketId, flowId, version, null); + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows) throws IOException, NiFiRegistryException { + return getFlowContents(bucketId, flowId, version, fetchRemoteFlows, null); } private void populateVersionedContentsRecursively(final VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, IOException { @@ -214,7 +217,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { } final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); - final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, user); + final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, true, user); final VersionedProcessGroup contents = snapshot.getFlowContents(); group.setComments(contents.getComments()); http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index db4ac59..ef69906 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -650,11 +650,15 @@ public class MockProcessGroup implements ProcessGroup { } @Override + public void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId) { + } + + @Override public void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry) { } @Override - public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings) { + public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) { } @Override @@ -666,4 +670,12 @@ public class MockProcessGroup implements ProcessGroup { public void disconnectVersionControl() { this.versionControlInfo = null; } + + @Override + public void verifyCanRevertLocalModifications() { + } + + @Override + public void verifyCanShowLocalModifications() { + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 76cd2c4..02df16b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1368,11 +1368,13 @@ public interface NiFiServiceFacade { * Retrieves the Versioned Flow Snapshot for the coordinates provided by the given Version Control Information DTO * * @param versionControlInfo the coordinates of the versioned flow + * @param fetchRemoteFlows if the contents of Versioned Flow that is fetched contains a child/descendant Process Group + * that is also under Version Control, this indicates whether that remote flow should also be fetched * @return the VersionedFlowSnapshot that corresponds to the given coordinates * * @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found */ - VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException; + VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows) throws IOException; /** * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return @@ -1407,6 +1409,28 @@ public interface NiFiServiceFacade { void verifyCanUpdate(String groupId, VersionedFlowSnapshot proposedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); /** + * Verifies that the Process Group with the given identifier can be saved to the flow registry + * + * @param groupId the ID of the Process Group + * @param registryId the ID of the Flow Registry + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * + * @throws IllegalStateException if the Process Group cannot be saved to the flow registry with the coordinates specified + */ + void verifyCanSaveToFlowRegistry(String groupId, String registryId, String bucketId, String flowId); + + /** + * Verifies that the Process Group with the given identifier can have its local modifications reverted to the given VersionedFlowSnapshot + * + * @param groupId the ID of the Process Group + * @param versionedFlowSnapshot the Versioned Flow Snapshot + * + * @throws IllegalStateException if the Process Group cannot have its local modifications reverted + */ + void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot); + + /** * Updates the Process group with the given ID to match the new snapshot * * @param revision the revision of the Process Group @@ -1414,10 +1438,12 @@ public interface NiFiServiceFacade { * @param versionControlInfo the Version Control information * @param snapshot the new snapshot * @param componentIdSeed the seed to use for generating new component ID's + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group * @return the Process Group */ ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, - boolean verifyNotModified); + boolean verifyNotModified, boolean updateDescendantVersionedFlows); /** * Updates the Process group with the given ID to match the new snapshot @@ -1429,10 +1455,12 @@ public interface NiFiServiceFacade { * @param snapshot the new snapshot * @param componentIdSeed the seed to use for generating new component ID's * @param updateSettings whether or not the process group's name and position should be updated + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group * @return the Process Group */ ProcessGroupEntity updateProcessGroupContents(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, - boolean verifyNotModified, boolean updateSettings); + boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows); // ---------------------------------------- // Component state methods http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4d1bbbc..c66aebb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -97,13 +97,12 @@ import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedFlow; -import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; +import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.DifferenceType; -import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.FlowComparator; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; @@ -3751,10 +3750,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(), - versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser()); + versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser()); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); final ComparableDataFlow localFlow = new ComparableDataFlow() { @@ -3781,7 +3780,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } }; - final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new EvolvingDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison); @@ -3853,6 +3852,23 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId); + } + + @Override + public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + group.verifyCanRevertLocalModifications(); + + // verify that the process group can be updated to the given snapshot. We do not verify that connections can + // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components + // have been stopped. + group.verifyCanUpdate(versionedFlowSnapshot, false, false); + } + + @Override public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); @@ -4028,7 +4044,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo) throws IOException { + public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) throws IOException { final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId()); if (flowRegistry == null) { throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId()); @@ -4036,15 +4052,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot snapshot; try { - snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser()); + snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser()); } catch (final NiFiRegistryException e) { throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket " + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion()); } - // If this Flow has a reference to a remote flow, we need to pull that remote flow as well - populateVersionedChildFlows(snapshot); - return snapshot; } @@ -4054,74 +4067,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return flowRegistry == null ? flowRegistryId : flowRegistry.getName(); } - private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException { - final VersionedProcessGroup group = snapshot.getFlowContents(); - - for (final VersionedProcessGroup child : group.getProcessGroups()) { - populateVersionedFlows(child); - } - } - - private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException { - final VersionedFlowCoordinates remoteCoordinates = group.getVersionedFlowCoordinates(); - - if (remoteCoordinates != null) { - final String registryUrl = remoteCoordinates.getRegistryUrl(); - final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl); - if (registryId == null) { - throw new IllegalArgumentException("Process Group with ID " + group.getIdentifier() + " is under Version Control, referencing a Flow Registry at URL [" + registryUrl - + "], but no Flow Registry is currently registered for that URL."); - } - - final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); - - final VersionedFlowSnapshot childSnapshot; - try { - childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion(), NiFiUserUtils.getNiFiUser()); - } catch (final NiFiRegistryException e) { - throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket " - + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion()); - } - - final VersionedProcessGroup fetchedGroup = childSnapshot.getFlowContents(); - group.setComments(fetchedGroup.getComments()); - group.setPosition(fetchedGroup.getPosition()); - group.setName(fetchedGroup.getName()); - group.setVariables(fetchedGroup.getVariables()); - - group.setConnections(new LinkedHashSet<>(fetchedGroup.getConnections())); - group.setControllerServices(new LinkedHashSet<>(fetchedGroup.getControllerServices())); - group.setFunnels(new LinkedHashSet<>(fetchedGroup.getFunnels())); - group.setInputPorts(new LinkedHashSet<>(fetchedGroup.getInputPorts())); - group.setLabels(new LinkedHashSet<>(fetchedGroup.getLabels())); - group.setOutputPorts(new LinkedHashSet<>(fetchedGroup.getOutputPorts())); - group.setProcessGroups(new LinkedHashSet<>(fetchedGroup.getProcessGroups())); - group.setProcessors(new LinkedHashSet<>(fetchedGroup.getProcessors())); - group.setRemoteProcessGroups(new LinkedHashSet<>(fetchedGroup.getRemoteProcessGroups())); - } - - for (final VersionedProcessGroup child : group.getProcessGroups()) { - populateVersionedFlows(child); - } - } - - @Override public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, - final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) { + final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true); + return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true, updateDescendantVersionedFlows); } @Override public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, - final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) { + final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId); final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision, processGroupNode, - () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings), + () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index de56a4f..7262a82 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1644,7 +1644,7 @@ public class ProcessGroupResource extends ApplicationResource { if (versionControlInfo != null) { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 2: Retrieve flow from Flow Registry - final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo); + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true); final Bucket bucket = flowSnapshot.getBucket(); final VersionedFlow flow = flowSnapshot.getFlow(); @@ -1653,6 +1653,8 @@ public class ProcessGroupResource extends ApplicationResource { versionControlInfo.setFlowDescription(flow.getDescription()); versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId())); + versionControlInfo.setModified(false); + versionControlInfo.setCurrent(flowSnapshot.isLatest()); // Step 3: Resolve Bundle info BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents()); @@ -1709,8 +1711,13 @@ public class ProcessGroupResource extends ApplicationResource { final RevisionDTO revisionDto = entity.getRevision(); final String newGroupId = entity.getComponent().getId(); final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId); + + // We don't want the Process Group's position to be updated because we want to keep the position where the user + // placed the Process Group. However, we do want to use the name of the Process Group that is in the Flow Contents. + // To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position. + flowSnapshot.getFlowContents().setPosition(null); entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId, - versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false); + versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true); } populateRemainingProcessGroupEntityContent(entity);
