http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index f61b399..3684f04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -17,12 +17,37 @@ package org.apache.nifi.web.api; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.ProcessGroupAuthorizable; @@ -34,7 +59,9 @@ import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.flow.FlowRegistryUtils; +import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; @@ -69,35 +96,12 @@ import org.apache.nifi.web.util.Pause; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.stream.Collectors; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; @Path("/versions") @Api(value = "/versions", description = "Endpoint for managing version control for a flow") @@ -125,9 +129,12 @@ public class VersionsResource extends ApplicationResource { @Consumes(MediaType.WILDCARD) @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") - @ApiOperation(value = "Gets the Version Control information for a process group", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { - @Authorization(value = "Read - /process-groups/{uuid}") - }) + @ApiOperation(value = "Gets the Version Control information for a process group", + response = VersionControlInformationEntity.class, + notes = NON_GUARANTEED_ENDPOINT, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}") + }) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @@ -164,7 +171,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("start-requests") @ApiOperation( - value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed", + value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will " + + "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A " + + "POST to /versions/process-groups/{id} should be used to initiate saving of the local flow to the Flow Registry.", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT) @ApiResponses(value = { @@ -305,7 +314,8 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("start-requests/{id}") @ApiOperation( - value = "Deletes the request with the given ID", + value = "Deletes the Version Control Request with the given ID. This will allow other threads to save flows to the Flow Registry. See also the documentation " + + "for POSTing to /versions/start-requests for information regarding why this is done.", notes = NON_GUARANTEED_ENDPOINT) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @@ -349,7 +359,8 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") @ApiOperation( - value = "Begins version controlling the Process Group with the given ID", + value = "Begins version controlling the Process Group with the given ID or commits changes to the Versioned Flow, " + + "depending on if the provided VersionControlInformation includes a flowId", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -365,7 +376,7 @@ public class VersionsResource extends ApplicationResource { @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") }) - public Response startVersionControl( + public Response saveToFlowRegistry( @ApiParam("The process group id.") @PathParam("id") final String groupId, @ApiParam(value = "The versioned flow details.", required = true) final StartVersionControlRequestEntity requestEntity) throws IOException { @@ -402,29 +413,7 @@ public class VersionsResource extends ApplicationResource { final URI requestUri; try { final URI originalUri = getAbsolutePath(); - final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), - originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment()); - - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); - } - - if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { - final String errorResponse = getResponseEntity(clusterResponse, String.class); - throw new IllegalStateException( - "Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse); - } - - final String requestId = getResponseEntity(clusterResponse, String.class); + final String requestId = lockVersionControl(originalUri, groupId); requestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), "/nifi-api/versions/start-requests/" + requestId, null, originalUri.getFragment()); @@ -439,54 +428,12 @@ public class VersionsResource extends ApplicationResource { // Finally, we can delete the Request. try { final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity); - - final Map<String, String> headers = new HashMap<>(); - headers.put("content-type", MediaType.APPLICATION_JSON); - - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); - } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); - } - - if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { - final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + "."; - final Throwable cause = clusterResponse.getThrowable(); - if (cause == null) { - throw new IllegalStateException(message); - } else { - throw new IllegalStateException(message, cause); - } - } + replicateVersionControlMapping(mappingEntity, requestEntity, requestUri, groupId); final VersionControlInformationEntity responseEntity = serviceFacade.getVersionControlInformation(groupId); return generateOkResponse(responseEntity).build(); } finally { - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); - } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. " - + "Users may be unable to Version Control other Process Groups until the request lock times out.", ie); - } - - if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { - logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. " - + "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus()); - } + unlockVersionControl(requestUri, groupId); } } @@ -560,13 +507,115 @@ public class VersionsResource extends ApplicationResource { }); } + private void unlockVersionControl(final URI requestUri, final String groupId) { + final NodeResponse clusterResponse; + try { + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.DELETE, requestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("After starting Version Control on Process Group with ID " + groupId + ", interrupted while waiting for deletion of Version Control Request. " + + "Users may be unable to Version Control other Process Groups until the request lock times out.", ie); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + logger.error("After starting Version Control on Process Group with ID " + groupId + ", failed to delete Version Control Request. " + + "Users may be unable to Version Control other Process Groups until the request lock times out. Response status code was " + clusterResponse.getStatus()); + } + } + + private String lockVersionControl(final URI originalUri, final String groupId) throws URISyntaxException { + final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/versions/start-requests", null, originalUri.getFragment()); + + final NodeResponse clusterResponse; + try { + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, new MultivaluedHashMap<>(), Collections.emptyMap()).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + final String errorResponse = getResponseEntity(clusterResponse, String.class); + throw new IllegalStateException( + "Failed to create a Version Control Request across all nodes in the cluster. Received response code " + clusterResponse.getStatus() + " with content: " + errorResponse); + } + + final String requestId = getResponseEntity(clusterResponse, String.class); + return requestId; + } + + private void replicateVersionControlMapping(final VersionControlComponentMappingEntity mappingEntity, final StartVersionControlRequestEntity requestEntity, final URI requestUri, + final String groupId) { + final Map<String, String> headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); + + final NodeResponse clusterResponse; + try { + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.PUT, requestUri, mappingEntity, headers).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + + if (requestEntity.getVersionedFlow().getFlowId() == null) { + // We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the + // flow from the Flow Registry (use best effort; if we can't remove it, just log and move on). + final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation(); + try { + serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId()); + } catch (final Exception e) { + logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. " + + "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e); + } + } + + throw new RuntimeException("Interrupted while updating Version Control Information for Process Group with ID " + groupId + ".", ie); + } + + if (clusterResponse.getStatus() != Status.OK.getStatusCode()) { + if (requestEntity.getVersionedFlow().getFlowId() == null) { + // We had to create the flow for this snapshot. Since we failed to replicate the Version Control Info, remove the + // flow from the Flow Registry (use best effort; if we can't remove it, just log and move on). + final VersionControlInformationDTO vci = mappingEntity.getVersionControlInformation(); + try { + serviceFacade.deleteVersionedFlow(vci.getRegistryId(), vci.getBucketId(), vci.getFlowId()); + } catch (final Exception e) { + logger.error("Created Versioned Flow with ID {} in bucket with ID {} but failed to replicate the Version Control Information to cluster. " + + "Attempted to delete the newly created (empty) flow from the Flow Registry but failed", vci.getFlowId(), vci.getBucketId(), e); + } + } + + final String message = "Failed to update Version Control Information for Process Group with ID " + groupId + "."; + final Throwable cause = clusterResponse.getThrowable(); + if (cause == null) { + throw new IllegalStateException(message); + } else { + throw new IllegalStateException(message, cause); + } + } + } + @DELETE @Consumes(MediaType.WILDCARD) @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") @ApiOperation( - value = "Stops version controlling the Process Group with the given ID", + value = "Stops version controlling the Process Group with the given ID. The Process Group will no longer track to any Versioned Flow.", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -626,7 +675,8 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") @ApiOperation( - value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version", + value = "For a Process Group that is already under Version Control, this will update the version of the flow to a different version. This endpoint expects " + + "that the given snapshot will not modify any Processor that is currently running or any Controller Service that is enabled.", response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -689,14 +739,17 @@ public class VersionsResource extends ApplicationResource { serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false); }, (rev, entity) -> { + final Bucket bucket = flowSnapshot.getBucket(); + final VersionedFlow flow = flowSnapshot.getFlow(); + // Update the Process Group to match the proposed flow snapshot final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO(); versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier()); - versionControlInfoDto.setBucketName(snapshotMetadata.getBucketName()); + versionControlInfoDto.setBucketName(bucket.getName()); versionControlInfoDto.setCurrent(true); versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier()); - versionControlInfoDto.setFlowName(snapshotMetadata.getFlowName()); - versionControlInfoDto.setFlowDescription(snapshotMetadata.getFlowDescription()); + versionControlInfoDto.setFlowName(flow.getName()); + versionControlInfoDto.setFlowDescription(flow.getDescription()); versionControlInfoDto.setGroupId(groupId); versionControlInfoDto.setModified(false); versionControlInfoDto.setVersion(snapshotMetadata.getVersion()); @@ -720,7 +773,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("update-requests/{id}") @ApiOperation( - value = "Returns the Update Request with the given ID", + value = "Returns the Update Request with the given ID. Once an Update Request has been created by performing a POST to /versions/update-requests/process-groups/{id}, " + + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " + + "current state of the request, and any failures.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -741,7 +796,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("revert-requests/{id}") @ApiOperation( - value = "Returns the Revert Request with the given ID", + value = "Returns the Revert Request with the given ID. Once a Revert Request has been created by performing a POST to /versions/revert-requests/process-groups/{id}, " + + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " + + "current state of the request, and any failures.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -795,7 +852,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("update-requests/{id}") @ApiOperation( - value = "Deletes the Update Request with the given ID", + value = "Deletes the Update Request with the given ID. After a request is created via a POST to /versions/update-requests/process-groups/{id}, it is expected " + + "that the client will properly clean up the request by DELETE'ing it, once the Update process has completed. If the request is deleted before the request " + + "completes, then the Update request will finish the step that it is currently performing and then will cancel any subsequent steps.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -816,7 +875,9 @@ public class VersionsResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("revert-requests/{id}") @ApiOperation( - value = "Deletes the Revert Request with the given ID", + value = "Deletes the Revert Request with the given ID. After a request is created via a POST to /versions/revert-requests/process-groups/{id}, it is expected " + + "that the client will properly clean up the request by DELETE'ing it, once the Revert process has completed. If the request is deleted before the request " + + "completes, then the Revert request will finish the step that it is currently performing and then will cancel any subsequent steps.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -881,7 +942,12 @@ public class VersionsResource extends ApplicationResource { @Path("update-requests/process-groups/{id}") @ApiOperation( value = "For a Process Group that is already under Version Control, this will initiate the action of changing " - + "from a specific version of the flow in the Flow Registry to a different version of the flow.", + + "from a specific version of the flow in the Flow Registry to a different version of the flow. This can be a lengthy " + + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, " + + "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur " + + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to " + + "/versions/update-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " + + "/versions/update-requests/{requestId}.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -1058,8 +1124,13 @@ public class VersionsResource extends ApplicationResource { @Path("revert-requests/process-groups/{id}") @ApiOperation( value = "For a Process Group that is already under Version Control, this will initiate the action of reverting " - + "any changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the " - + "flow matching the Versioned Flow that exists in the Flow Registry.", + + "any local changes that have been made to the Process Group since it was last synchronized with the Flow Registry. This will result in the " + + "flow matching the Versioned Flow that exists in the Flow Registry. This can be a lengthy " + + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, " + + "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of updating the flow will occur " + + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to " + + "/versions/revert-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " + + "/versions/revert-requests/{requestId}.", response = VersionedFlowUpdateRequestEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { @@ -1174,34 +1245,6 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - // If the information passed in is correct, but there have been no changes, there is nothing to do - just register the request, mark it complete, and return. - if (currentVersion.getModified() == Boolean.FALSE) { - final String requestId = UUID.randomUUID().toString(); - final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Complete"); - requestManager.submitRequest("revert-requests", requestId, request, task -> { - }); - - // There is nothing to do. Generate the response and send it back to the user. - final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO(); - updateRequestDto.setComplete(true); - updateRequestDto.setFailureReason(null); - updateRequestDto.setLastUpdated(new Date()); - updateRequestDto.setProcessGroupId(groupId); - updateRequestDto.setRequestId(requestId); - updateRequestDto.setVersionControlInformation(currentVersion); - updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId)); - updateRequestDto.setPercentCompleted(100); - updateRequestDto.setState(request.getState()); - - final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); - updateRequestEntity.setProcessGroupRevision(revisionDto); - updateRequestEntity.setRequest(updateRequestDto); - - request.markComplete(currentVersionEntity); - return generateOkResponse(updateRequestEntity).build(); - } - - // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); @@ -1331,7 +1374,25 @@ public class VersionsResource extends ApplicationResource { // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); - final VersionControlInformationDTO vci = requestEntity.getVersionControlInformation(); + final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); + + final Bucket bucket = flowSnapshot.getBucket(); + final VersionedFlow flow = flowSnapshot.getFlow(); + + final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); + final VersionControlInformationDTO vci = new VersionControlInformationDTO(); + vci.setBucketId(metadata.getBucketIdentifier()); + vci.setBucketName(bucket.getName()); + vci.setCurrent(flowSnapshot.isLatest()); + vci.setFlowDescription(flow.getDescription()); + vci.setFlowId(flow.getIdentifier()); + vci.setFlowName(flow.getName()); + vci.setGroupId(groupId); + vci.setModified(false); + vci.setRegistryId(requestVci.getRegistryId()); + vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId())); + vci.setVersion(metadata.getVersion()); + serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/f6cc5b6c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index bd603be..1a12dcf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,33 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -188,32 +215,6 @@ import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -2230,8 +2231,8 @@ public final class DtoFactory { dto.setFlowName(versionControlInfo.getFlowName()); dto.setFlowDescription(versionControlInfo.getFlowDescription()); dto.setVersion(versionControlInfo.getVersion()); - dto.setCurrent(versionControlInfo.getCurrent().orElse(true)); - dto.setModified(versionControlInfo.getModified().orElse(false)); + dto.setCurrent(versionControlInfo.isCurrent()); + dto.setModified(versionControlInfo.isModified()); return dto; }
