http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 3684f04..f2a207e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -17,21 +17,6 @@ package org.apache.nifi.web.api; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.stream.Collectors; - import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -92,10 +77,24 @@ import org.apache.nifi.web.util.AffectedComponentUtils; import org.apache.nifi.web.util.CancellableTimedPause; import org.apache.nifi.web.util.ComponentLifecycle; import org.apache.nifi.web.util.LifecycleManagementException; -import org.apache.nifi.web.util.Pause; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -454,51 +453,15 @@ public class VersionsResource extends ApplicationResource { super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true); }, () -> { - final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId); - if (entity != null) { - final String flowId = requestEntity.getVersionedFlow().getFlowId(); - if (flowId != null && flowId.equals(entity.getVersionControlInformation().getFlowId())) { - // Flow ID is the same. We want to publish the Process Group as the next version of the Flow. - // In order to do this, we have to ensure that the Process Group is 'current'. - final Boolean current = entity.getVersionControlInformation().getCurrent(); - if (current == null) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because it is not yet known whether or not this Process Group is the most recent version of the flow. " - + "Please try the request again after the Process Group has been synchronized with the Flow Registry."); - } - - if (current == Boolean.FALSE) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. " - + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry."); - } - - // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must - // ensure that all other parameters match as well. - if (!requestEntity.getVersionedFlow().getBucketId().equals(entity.getVersionControlInformation().getBucketId())) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); - } - - if (!requestEntity.getVersionedFlow().getRegistryId().equals(entity.getVersionControlInformation().getRegistryId())) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); - } - - } else if (flowId != null) { - // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated, - // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is - // attempting to save a new version of a different flow. Saving a new version of a different Flow is - // not allowed because the Process Group must be in synch with the latest version of the flow before that - // can be done. - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); - } - } + final VersionedFlowDTO versionedFlow = requestEntity.getVersionedFlow(); + final String registryId = versionedFlow.getRegistryId(); + final String bucketId = versionedFlow.getBucketId(); + final String flowId = versionedFlow.getFlowId(); + serviceFacade.verifyCanSaveToFlowRegistry(groupId, registryId, bucketId, flowId); }, (rev, flowEntity) -> { // Register the current flow with the Flow Registry. - final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity); + final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, flowEntity); // Update the Process Group's Version Control Information final VersionControlInformationEntity responseEntity = serviceFacade.setVersionControlInformation(rev, groupId, @@ -756,7 +719,8 @@ public class VersionsResource extends ApplicationResource { versionControlInfoDto.setRegistryId(requestEntity.getRegistryId()); versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId())); - final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false); + final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false, + entity.getUpdateDescendantVersionedFlows()); final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation(); final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity(); @@ -1039,7 +1003,7 @@ public class VersionsResource extends ApplicationResource { // 14. Re-Start all Processors, Funnels, Ports that are affected and not removed. // Step 0: Get the Versioned Flow Snapshot from the Flow Registry - final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation()); + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true); // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // the flow snapshot to contain compatible bundles. @@ -1085,7 +1049,7 @@ public class VersionsResource extends ApplicationResource { final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> { try { final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true); + affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true, true); vcur.markComplete(updatedVersionControlEntity); } catch (final LifecycleManagementException e) { @@ -1188,7 +1152,7 @@ public class VersionsResource extends ApplicationResource { final String idGenerationSeed = getIdGenerationSeed().orElse(null); // Step 0: Get the Versioned Flow Snapshot from the Flow Registry - final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation()); + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false); // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // the flow snapshot to contain compatible bundles. @@ -1221,8 +1185,7 @@ public class VersionsResource extends ApplicationResource { () -> { // Step 3: Verify that all components in the snapshot exist on all nodes // Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow - // Step 5: Verify that Process Group is not 'dirty' - serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, false); + serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot); }, (revision, processGroupEntity) -> { // Ensure that the information passed in is correct @@ -1254,7 +1217,7 @@ public class VersionsResource extends ApplicationResource { final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> { try { final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false); + affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false); vcur.markComplete(updatedVersionControlEntity); } catch (final LifecycleManagementException e) { @@ -1288,7 +1251,7 @@ public class VersionsResource extends ApplicationResource { private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri, final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity, final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed, - final boolean verifyNotModified) throws LifecycleManagementException { + final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException { // Steps 6-7: Determine which components must be stopped and stop them. final Set<String> stoppableReferenceTypes = new HashSet<>(); @@ -1302,7 +1265,8 @@ public class VersionsResource extends ApplicationResource { .collect(Collectors.toSet()); logger.info("Stopping {} Processors", runningComponents.size()); - final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(stopComponentsPause::cancel); componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause); if (asyncRequest.isCancelled()) { @@ -1317,7 +1281,8 @@ public class VersionsResource extends ApplicationResource { .collect(Collectors.toSet()); logger.info("Disabling {} Controller Services", enabledServices.size()); - final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(disableServicesPause::cancel); componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause); if (asyncRequest.isCancelled()) { @@ -1328,96 +1293,113 @@ public class VersionsResource extends ApplicationResource { logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion()); // If replicating request, steps 10-12 are performed on each node individually, and this is accomplished // by replicating a PUT to /nifi-api/versions/process-groups/{groupId} - if (replicateRequest) { + try { + if (replicateRequest) { - final URI updateUri; - try { - updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(), - exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment()); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } + final URI updateUri; + try { + updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(), + exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } - final Map<String, String> headers = new HashMap<>(); - headers.put("content-type", MediaType.APPLICATION_JSON); + final Map<String, String> headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); - final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity(); - snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision()); - snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId()); - snapshotEntity.setVersionedFlow(flowSnapshot); + final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity(); + snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision()); + snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId()); + snapshotEntity.setVersionedFlow(flowSnapshot); + snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows); - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); + final NodeResponse clusterResponse; + try { + logger.debug("Replicating PUT request to {} for user {}", updateUri, user); + + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + logger.warn("Interrupted while replicating PUT request to {} for user {}", updateUri, user); + Thread.currentThread().interrupt(); + throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie); } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie); - } - final int disableServicesStatus = clusterResponse.getStatus(); - if (disableServicesStatus != Status.OK.getStatusCode()) { - final String explanation = getResponseEntity(clusterResponse, String.class); - throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation); + final int updateFlowStatus = clusterResponse.getStatus(); + if (updateFlowStatus != Status.OK.getStatusCode()) { + final String explanation = getResponseEntity(clusterResponse, String.class); + logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}", + updateUri, user, updateFlowStatus, explanation); + throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation); + } + + } else { + // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot, + // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections. + // Ensure that no Output Port was removed, unless it currently has no outgoing connections. + serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified); + + // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed + final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); + final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); + final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); + + final Bucket bucket = flowSnapshot.getBucket(); + final VersionedFlow flow = flowSnapshot.getFlow(); + + final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); + final VersionControlInformationDTO vci = new VersionControlInformationDTO(); + vci.setBucketId(metadata.getBucketIdentifier()); + vci.setBucketName(bucket.getName()); + vci.setCurrent(flowSnapshot.isLatest()); + vci.setFlowDescription(flow.getDescription()); + vci.setFlowId(flow.getIdentifier()); + vci.setFlowName(flow.getName()); + vci.setGroupId(groupId); + vci.setModified(false); + vci.setRegistryId(requestVci.getRegistryId()); + vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId())); + vci.setVersion(metadata.getVersion()); + + serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows); } + } finally { + if (!asyncRequest.isCancelled()) { + if (logger.isDebugEnabled()) { + logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices); + } - } else { - // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot, - // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections. - // Ensure that no Output Port was removed, unless it currently has no outgoing connections. - serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified); - - // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed - final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); - final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); - final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); - - final Bucket bucket = flowSnapshot.getBucket(); - final VersionedFlow flow = flowSnapshot.getFlow(); - - final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); - final VersionControlInformationDTO vci = new VersionControlInformationDTO(); - vci.setBucketId(metadata.getBucketIdentifier()); - vci.setBucketName(bucket.getName()); - vci.setCurrent(flowSnapshot.isLatest()); - vci.setFlowDescription(flow.getDescription()); - vci.setFlowId(flow.getIdentifier()); - vci.setFlowName(flow.getName()); - vci.setGroupId(groupId); - vci.setModified(false); - vci.setRegistryId(requestVci.getRegistryId()); - vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId())); - vci.setVersion(metadata.getVersion()); - - serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false); - } + asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60); - if (asyncRequest.isCancelled()) { - return null; - } - asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60); + // Step 13. Re-enable all disabled controller services + final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(enableServicesPause::cancel); + final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user); + logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size()); + componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); + } - // Step 13. Re-enable all disabled controller services - final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); - final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user); - logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size()); - componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); + if (!asyncRequest.isCancelled()) { + if (logger.isDebugEnabled()) { + logger.debug("Restart {} Processors: {}", runningComponents.size(), runningComponents); + } - if (asyncRequest.isCancelled()) { - return null; - } - asyncRequest.update(new Date(), "Restarting Processors", 80); + asyncRequest.update(new Date(), "Restarting Processors", 80); - // Step 14. Restart all components - final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user); - final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); - logger.info("Restarting {} Processors", componentsToStart.size()); - componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); + // Step 14. Restart all components + final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user); + final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(startComponentsPause::cancel); + logger.info("Restarting {} Processors", componentsToStart.size()); + componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); + } + } + asyncRequest.setCancelCallback(null); if (asyncRequest.isCancelled()) { return null; } @@ -1426,6 +1408,7 @@ public class VersionsResource extends ApplicationResource { return serviceFacade.getVersionControlInformation(groupId); } + /** * Extracts the response entity from the specified node response. *
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java index 1309eee..3cecdeb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java @@ -99,4 +99,12 @@ public interface AsynchronousWebRequest<T> { * @return <code>true</code> if the request has been canceled, <code>false</code> otherwise */ boolean isCancelled(); + + /** + * Sets the cancel callback to the given runnable, so that if {@link #cancel()} is called, the given {@link Runnable} will be triggered. + * If <code>null</code> is passed, no operation will be triggered when the task is cancelled. + * + * @param runnable the callback + */ + void setCancelCallback(Runnable runnable); } http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java index 4810a32..8e2e221 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java @@ -34,6 +34,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest private volatile String failureReason; private volatile boolean cancelled; private volatile T results; + private volatile Runnable cancelCallback; public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) { this.id = requestId; @@ -57,6 +58,11 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest } @Override + public void setCancelCallback(final Runnable runnable) { + this.cancelCallback = runnable; + } + + @Override public void markComplete(final T results) { this.complete = true; this.results = results; @@ -130,6 +136,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest percentComplete = 100; state = "Canceled by user"; setFailureReason("Request cancelled by user"); + cancelCallback.run(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6077268..7d40473 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -118,6 +118,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel; @@ -2202,15 +2203,23 @@ public final class DtoFactory { private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) { VersionedComponent component = difference.getComponentA(); - if (component == null) { + if (component == null || difference.getComponentB() instanceof InstantiatedVersionedComponent) { component = difference.getComponentB(); } final ComponentDifferenceDTO dto = new ComponentDifferenceDTO(); - dto.setComponentId(component.getIdentifier()); dto.setComponentName(component.getName()); dto.setComponentType(component.getComponentType().name()); - dto.setProcessGroupId(dto.getProcessGroupId()); + + if (component instanceof InstantiatedVersionedComponent) { + final InstantiatedVersionedComponent instantiatedComponent = (InstantiatedVersionedComponent) component; + dto.setComponentId(instantiatedComponent.getInstanceId()); + dto.setProcessGroupId(instantiatedComponent.getInstanceGroupId()); + } else { + dto.setComponentId(component.getIdentifier()); + dto.setProcessGroupId(dto.getProcessGroupId()); + } + return dto; } http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 7cf61ea..9259bf4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -114,10 +114,12 @@ public interface ProcessGroupDAO { * @param versionControlInformation the new Version Control Information * @param componentIdSeed the seed value to use for generating ID's for new components * @param updateSettings whether or not to update the process group's name and position + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group * @return the process group */ ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed, - boolean verifyNotModified, boolean updateSettings); + boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows); /** * Applies the given Version Control Information to the Process Group http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index e3c4725..bb7edb1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -29,6 +29,8 @@ import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ProcessGroupDTO; @@ -244,8 +246,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId); final String registryName = flowRegistry == null ? registryId : flowRegistry.getName(); + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false); + final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .registryName(registryName) + .flowSnapshot(flowSnapshot) .modified(false) .current(true) .build(); @@ -264,9 +270,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou @Override public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation, - final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) { + final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { final ProcessGroup group = locateProcessGroup(flowController, groupId); - group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings); + group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows); final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .flowSnapshot(proposedSnapshot.getFlowContents()) http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java index 1f83a6f..dea43f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java @@ -43,7 +43,7 @@ public class CancellableTimedPause implements Pause { long sysTime = System.nanoTime(); final long maxWaitTime = System.nanoTime() + pauseNanos; - while (sysTime < maxWaitTime) { + while (sysTime < maxWaitTime && !cancelled) { try { TimeUnit.NANOSECONDS.sleep(pauseNanos); } catch (final InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index 4469ea1..11bd1b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -421,18 +421,24 @@ public final class SnippetUtils { } // get a list of all names of process groups so that we can rename as needed. - final List<String> groupNames = new ArrayList<>(); + final Set<String> groupNames = new HashSet<>(); for (final ProcessGroup childGroup : group.getProcessGroups()) { groupNames.add(childGroup.getName()); } if (snippetContents.getProcessGroups() != null) { for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) { - String groupName = groupDTO.getName(); - while (groupNames.contains(groupName)) { - groupName = "Copy of " + groupName; + // If Version Control Information is present, then we don't want to rename the + // Process Group - we want it to remain the same as the one in Version Control. + // However, in order to disambiguate things, we generally do want to rename to + // 'Copy of...' so we do this only if there is no Version Control Information present. + if (groupDTO.getVersionControlInformation() == null) { + String groupName = groupDTO.getName(); + while (groupNames.contains(groupName)) { + groupName = "Copy of " + groupName; + } + groupDTO.setName(groupName); } - groupDTO.setName(groupName); groupNames.add(groupDTO.getName()); } }
