jsferner commented on a change in pull request #4023: NIFI-6873: Added support
for replacing a process group via import
URL: https://github.com/apache/nifi/pull/4023#discussion_r375835725
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
##########
@@ -3894,6 +3889,252 @@ public Response createControllerService(
);
}
+ /**
+ * Initiates the request to replace the Process Group with the given ID
with the Process Group in the given import entity
+ *
+ * @param groupId The id of the process group to replace
+ * @param importEntity A request entity containing revision info and
the process group to replace with
+ * @return A ProcessGroupReplaceRequestEntity.
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/replace-requests")
+ @ApiOperation(
+ value = "Initiate the Replace Request of a Process Group with the
given ID",
+ response = ProcessGroupReplaceRequestEntity.class,
+ notes = "This will initiate the action of replacing a process
group with the given process group. This can be a lengthy "
+ + "process, as it will stop any Processors and disable any
Controller Services necessary to perform the action and then restart them. As a
result, "
+ + "the endpoint will immediately return a
ProcessGroupReplaceRequestEntity, and the process of replacing the flow will
occur "
+ + "asynchronously in the background. The client may then
periodically poll the status of the request by issuing a GET request to "
+ + "/process-groups/replace-requests/{requestId}. Once the
request is completed, the client is expected to issue a DELETE request to "
+ + "/process-groups/replace-requests/{requestId}. " +
NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}"),
+ @Authorization(value = "Write - /process-groups/{uuid}"),
+ @Authorization(value = "Read - /{component-type}/{uuid} -
For all encapsulated components"),
+ @Authorization(value = "Write - /{component-type}/{uuid} -
For all encapsulated components"),
+ @Authorization(value = "Write - if the template contains
any restricted components - /restricted-components"),
+ @Authorization(value = "Read - /parameter-contexts/{uuid}
- For any Parameter Context that is referenced by a Property that is changed,
added, or removed")
+ }
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response initiateReplaceProcessGroup(@ApiParam(value = "The process
group id.", required = true) @PathParam("id") final String groupId,
+ @ApiParam(value = "The process
group replace request entity", required = true) final ProcessGroupImportEntity
importEntity) {
+ // replacing a flow under version control is not permitted via import.
Versioned flows have additional requirements to allow
+ // them only to be replaced by a different version of the same flow.
+ if (serviceFacade.isAnyProcessGroupUnderVersionControl(groupId)) {
+ throw new IllegalStateException("Cannot replace a Process Group
via import while it or its descendants are under Version Control.");
+ }
+
+ final VersionedFlowSnapshot versionedFlowSnapshot =
importEntity.getVersionedFlowSnapshot();
+ if (versionedFlowSnapshot == null) {
+ throw new IllegalArgumentException("Versioned Flow Snapshot must
be supplied");
+ }
+
+ return initiateFlowUpdate(groupId, importEntity, true,
"replace-requests",
+ "/nifi-api/process-groups/" + groupId + "/replace",
importEntity::getVersionedFlowSnapshot);
+ }
+
+ /**
+ * Replace the Process Group with the given ID with the specified Process
Group.
+ *
+ * This is the endpoint used in a cluster update replication scenario.
+ *
+ * @param groupId The id of the process group to replace
+ * @param importEntity A request entity containing revision info and
the process group to replace with
+ * @return A ProcessGroupImportEntity.
+ */
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/replace")
+ @ApiOperation(
+ value = "Replace Process Group with the given ID with the
specified Process Group",
+ response = ProcessGroupImportEntity.class,
+ notes = "This endpoint is used for replication within a cluster,
when replacing a flow with a new flow. It expects that the flow being"
+ + "replaced is not under version control and that the
given snapshot will not modify any Processor that is currently running "
+ + "or any Controller Service that is enabled. "
+ + NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}"),
+ @Authorization(value = "Write - /process-groups/{uuid}")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response replaceProcessGroup(@ApiParam(value = "The process group
id.", required = true) @PathParam("id") final String groupId,
+ @ApiParam(value = "The process group
replace request entity.", required = true) final ProcessGroupImportEntity
importEntity) {
+
+ // Verify the request
+ final RevisionDTO revisionDto = importEntity.getProcessGroupRevision();
+ if (revisionDto == null) {
+ throw new IllegalArgumentException("Process Group Revision must be
specified.");
+ }
+
+ final VersionedFlowSnapshot requestFlowSnapshot =
importEntity.getVersionedFlowSnapshot();
+ if (requestFlowSnapshot == null) {
+ throw new IllegalArgumentException("Versioned Flow Snapshot must
be supplied.");
+ }
+
+ // Perform the request
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.PUT, importEntity);
+ } else if (isDisconnectedFromCluster()) {
+
verifyDisconnectedNodeModification(importEntity.isDisconnectedNodeAcknowledged());
+ }
+
+ final Revision requestRevision =
getRevision(importEntity.getProcessGroupRevision(), groupId);
+ return withWriteLock(
+ serviceFacade,
+ importEntity,
+ requestRevision,
+ lookup -> {
+ final ProcessGroupAuthorizable groupAuthorizable =
lookup.getProcessGroup(groupId);
+ final Authorizable processGroup =
groupAuthorizable.getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ,
NiFiUserUtils.getNiFiUser());
+ processGroup.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ () -> {
+ // We do not enforce that the Process Group is 'not dirty'
because at this point,
+ // the client has explicitly indicated the dataflow that
the Process Group should
+ // provide and provided the Revision to ensure that they
have the most up-to-date
+ // view of the Process Group.
+ serviceFacade.verifyCanUpdate(groupId,
requestFlowSnapshot, true, false);
+ },
+ (revision, entity) -> {
+ final ProcessGroupEntity updatedGroup =
+ performUpdateFlow(groupId, revision, importEntity,
entity.getVersionedFlowSnapshot(),
+ getIdGenerationSeed().orElse(null), false,
true);
+
+ // response to replication request is an entity with
revision info but no versioned flow snapshot
+ final ProcessGroupImportEntity responseEntity = new
ProcessGroupImportEntity();
+
responseEntity.setProcessGroupRevision(updatedGroup.getRevision());
+
+ return generateOkResponse(responseEntity).build();
+ });
+ }
+
+ /**
+ * Retrieve a request to replace a Process Group by request ID.
+ *
+ * @param replaceRequestId The ID of the replace request
+ * @return A ProcessGroupReplaceRequestEntity.
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("replace-requests/{id}")
+ @ApiOperation(
+ value = "Returns the Replace Request with the given ID",
+ response = ProcessGroupReplaceRequestEntity.class,
+ notes = "Returns the Replace Request with the given ID. Once a
Replace Request has been created by performing a POST to
/process-groups/{id}/replace-requests, "
+ + "that request can subsequently be retrieved via this
endpoint, and the request that is fetched will contain the updated state, such
as percent complete, the "
+ + "current state of the request, and any failures. "
+ + NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Only the user that submitted the
request can get it")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response getReplaceProcessGroupRequest(
+ @ApiParam("The ID of the Replace Request") @PathParam("id") final
String replaceRequestId) {
+ return retrieveFlowUpdateRequest("replace-requests", replaceRequestId);
+ }
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("replace-requests/{id}")
+ @ApiOperation(
+ value = "Deletes the Replace Request with the given ID",
+ response = ProcessGroupReplaceRequestEntity.class,
+ notes = "Deletes the Replace Request with the given ID. After a
request is created via a POST to /process-groups/{id}/replace-requests, it is
expected "
+ + "that the client will properly clean up the request by
DELETE'ing it, once the Replace process has completed. If the request is
deleted before the request "
+ + "completes, then the Replace request will finish the
step that it is currently performing and then will cancel any subsequent steps.
"
+ + NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Only the user that submitted the
request can remove it")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ })
+ public Response deleteReplaceProcessGroupRequest(
+ @ApiParam(value = "Acknowledges that this node is disconnected to
allow for mutable requests to proceed.", required = false)
+ @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED)
@DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
+ @ApiParam("The ID of the Update Request") @PathParam("id") final
String replaceRequestId) {
+ return deleteFlowUpdateRequest("replace-requests", replaceRequestId,
disconnectedNodeAcknowledged.booleanValue());
+ }
+
+ /**
+ * Perform actual flow update of the specified flow. This is used for the
initial flow update and replication updates.
+ */
+ @Override
+ protected ProcessGroupEntity performUpdateFlow(final String groupId, final
Revision revision, final ProcessGroupImportEntity requestEntity,
+ final VersionedFlowSnapshot
flowSnapshot, final String idGenerationSeed,
+ final boolean
verifyNotModified, final boolean updateDescendantVersionedFlows) {
+ logger.info("Replacing Process Group with ID {} with imported Process
Group with ID {}", groupId, flowSnapshot.getFlowContents().getIdentifier());
+
+ // Step 10-11. Update Process Group to the new flow (including name)
and update variable registry with any Variables that were added or removed
Review comment:
agreed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services