markap14 commented on a change in pull request #4023: NIFI-6873: Added support 
for replacing a process group via import
URL: https://github.com/apache/nifi/pull/4023#discussion_r374333221
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 ##########
 @@ -3894,6 +3889,252 @@ public Response createControllerService(
         );
     }
 
+    /**
+     * Initiates the request to replace the Process Group with the given ID 
with the Process Group in the given import entity
+     *
+     * @param groupId          The id of the process group to replace
+     * @param importEntity     A request entity containing revision info and 
the process group to replace with
+     * @return A ProcessGroupReplaceRequestEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/replace-requests")
+    @ApiOperation(
+            value = "Initiate the Replace Request of a Process Group with the 
given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "This will initiate the action of replacing a process 
group with the given process group. This can be a lengthy "
+                    + "process, as it will stop any Processors and disable any 
Controller Services necessary to perform the action and then restart them. As a 
result, "
+                    + "the endpoint will immediately return a 
ProcessGroupReplaceRequestEntity, and the process of replacing the flow will 
occur "
+                    + "asynchronously in the background. The client may then 
periodically poll the status of the request by issuing a GET request to "
+                    + "/process-groups/replace-requests/{requestId}. Once the 
request is completed, the client is expected to issue a DELETE request to "
+                    + "/process-groups/replace-requests/{requestId}. " + 
NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Read - /process-groups/{uuid}"),
+                    @Authorization(value = "Write - /process-groups/{uuid}"),
+                    @Authorization(value = "Read - /{component-type}/{uuid} - 
For all encapsulated components"),
+                    @Authorization(value = "Write - /{component-type}/{uuid} - 
For all encapsulated components"),
+                    @Authorization(value = "Write - if the template contains 
any restricted components - /restricted-components"),
+                    @Authorization(value = "Read - /parameter-contexts/{uuid} 
- For any Parameter Context that is referenced by a Property that is changed, 
added, or removed")
+            }
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+    })
+    public Response initiateReplaceProcessGroup(@ApiParam(value = "The process 
group id.", required = true) @PathParam("id") final String groupId,
+                                                @ApiParam(value = "The process 
group replace request entity", required = true) final ProcessGroupImportEntity 
importEntity) {
+        // replacing a flow under version control is not permitted via import. 
Versioned flows have additional requirements to allow
+        // them only to be replaced by a different version of the same flow.
+        if (serviceFacade.isAnyProcessGroupUnderVersionControl(groupId)) {
+            throw new IllegalStateException("Cannot replace a Process Group 
via import while it or its descendants are under Version Control.");
+        }
+
+        final VersionedFlowSnapshot versionedFlowSnapshot = 
importEntity.getVersionedFlowSnapshot();
+        if (versionedFlowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned Flow Snapshot must 
be supplied");
+        }
+
+        return initiateFlowUpdate(groupId, importEntity, true, 
"replace-requests",
+                "/nifi-api/process-groups/" + groupId + "/replace", 
importEntity::getVersionedFlowSnapshot);
+    }
+
+    /**
+     * Replace the Process Group with the given ID with the specified Process 
Group.
+     *
+     * This is the endpoint used in a cluster update replication scenario.
+     *
+     * @param groupId          The id of the process group to replace
+     * @param importEntity     A request entity containing revision info and 
the process group to replace with
+     * @return A ProcessGroupImportEntity.
+     */
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/replace")
+    @ApiOperation(
+            value = "Replace Process Group with the given ID with the 
specified Process Group",
+            response = ProcessGroupImportEntity.class,
+            notes = "This endpoint is used for replication within a cluster, 
when replacing a flow with a new flow. It expects that the flow being"
+                    + "replaced is not under version control and that the 
given snapshot will not modify any Processor that is currently running "
+                    + "or any Controller Service that is enabled. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Read - /process-groups/{uuid}"),
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+    })
+    public Response replaceProcessGroup(@ApiParam(value = "The process group 
id.", required = true) @PathParam("id") final String groupId,
+                                        @ApiParam(value = "The process group 
replace request entity.", required = true) final ProcessGroupImportEntity 
importEntity) {
+
+        // Verify the request
+        final RevisionDTO revisionDto = importEntity.getProcessGroupRevision();
+        if (revisionDto == null) {
+            throw new IllegalArgumentException("Process Group Revision must be 
specified.");
+        }
+
+        final VersionedFlowSnapshot requestFlowSnapshot = 
importEntity.getVersionedFlowSnapshot();
+        if (requestFlowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned Flow Snapshot must 
be supplied.");
+        }
+
+        // Perform the request
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, importEntity);
+        } else if (isDisconnectedFromCluster()) {
+            
verifyDisconnectedNodeModification(importEntity.isDisconnectedNodeAcknowledged());
+        }
+
+        final Revision requestRevision = 
getRevision(importEntity.getProcessGroupRevision(), groupId);
+        return withWriteLock(
+                serviceFacade,
+                importEntity,
+                requestRevision,
+                lookup -> {
+                    final ProcessGroupAuthorizable groupAuthorizable = 
lookup.getProcessGroup(groupId);
+                    final Authorizable processGroup = 
groupAuthorizable.getAuthorizable();
+                    processGroup.authorize(authorizer, RequestAction.READ, 
NiFiUserUtils.getNiFiUser());
+                    processGroup.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                () -> {
+                    // We do not enforce that the Process Group is 'not dirty' 
because at this point,
+                    // the client has explicitly indicated the dataflow that 
the Process Group should
+                    // provide and provided the Revision to ensure that they 
have the most up-to-date
+                    // view of the Process Group.
+                    serviceFacade.verifyCanUpdate(groupId, 
requestFlowSnapshot, true, false);
+                },
+                (revision, entity) -> {
+                    final ProcessGroupEntity updatedGroup =
+                            performUpdateFlow(groupId, revision, importEntity, 
entity.getVersionedFlowSnapshot(),
+                                    getIdGenerationSeed().orElse(null), false, 
true);
+
+                    // response to replication request is an entity with 
revision info but no versioned flow snapshot
+                    final ProcessGroupImportEntity responseEntity = new 
ProcessGroupImportEntity();
+                    
responseEntity.setProcessGroupRevision(updatedGroup.getRevision());
+
+                    return generateOkResponse(responseEntity).build();
+                });
+    }
+
+    /**
+     * Retrieve a request to replace a Process Group by request ID.
+     *
+     * @param replaceRequestId  The ID of the replace request
+     * @return A ProcessGroupReplaceRequestEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("replace-requests/{id}")
+    @ApiOperation(
+            value = "Returns the Replace Request with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "Returns the Replace Request with the given ID. Once a 
Replace Request has been created by performing a POST to 
/process-groups/{id}/replace-requests, "
+                    + "that request can subsequently be retrieved via this 
endpoint, and the request that is fetched will contain the updated state, such 
as percent complete, the "
+                    + "current state of the request, and any failures. "
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Only the user that submitted the 
request can get it")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+    })
+    public Response getReplaceProcessGroupRequest(
+            @ApiParam("The ID of the Replace Request") @PathParam("id") final 
String replaceRequestId) {
+        return retrieveFlowUpdateRequest("replace-requests", replaceRequestId);
+    }
+
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("replace-requests/{id}")
+    @ApiOperation(
+            value = "Deletes the Replace Request with the given ID",
+            response = ProcessGroupReplaceRequestEntity.class,
+            notes = "Deletes the Replace Request with the given ID. After a 
request is created via a POST to /process-groups/{id}/replace-requests, it is 
expected "
+                    + "that the client will properly clean up the request by 
DELETE'ing it, once the Replace process has completed. If the request is 
deleted before the request "
+                    + "completes, then the Replace request will finish the 
step that it is currently performing and then will cancel any subsequent steps. 
"
+                    + NON_GUARANTEED_ENDPOINT,
+            authorizations = {
+                    @Authorization(value = "Only the user that submitted the 
request can remove it")
+            })
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+    })
+    public Response deleteReplaceProcessGroupRequest(
+            @ApiParam(value = "Acknowledges that this node is disconnected to 
allow for mutable requests to proceed.", required = false)
+                @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) 
@DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
+            @ApiParam("The ID of the Update Request") @PathParam("id") final 
String replaceRequestId) {
+        return deleteFlowUpdateRequest("replace-requests", replaceRequestId, 
disconnectedNodeAcknowledged.booleanValue());
+    }
+
+    /**
+     * Perform actual flow update of the specified flow. This is used for the 
initial flow update and replication updates.
+     */
+    @Override
+    protected ProcessGroupEntity performUpdateFlow(final String groupId, final 
Revision revision, final ProcessGroupImportEntity requestEntity,
+                                                   final VersionedFlowSnapshot 
flowSnapshot, final String idGenerationSeed,
+                                                   final boolean 
verifyNotModified, final boolean updateDescendantVersionedFlows) {
+        logger.info("Replacing Process Group with ID {} with imported Process 
Group with ID {}", groupId, flowSnapshot.getFlowContents().getIdentifier());
+
+        // Step 10-11. Update Process Group to the new flow (including name) 
and update variable registry with any Variables that were added or removed
 
 Review comment:
   Probably can remove the "Step 10-11" part of this comment... not sure that 
it makes sense anymore in this context :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to