bbende commented on code in PR #10833:
URL: https://github.com/apache/nifi/pull/10833#discussion_r2759453610


##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java:
##########
@@ -792,6 +797,201 @@ public Response cancelDrain(
         );
     }
 
+
+    @POST
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests")
+    @Operation(
+            summary = "Creates a request to purge the FlowFiles for this 
connector",
+            responses = {
+                    @ApiResponse(
+                            responseCode = "202", description = "The request 
has been accepted. A HTTP response header will contain the URI where the 
response can be polled.",
+                            content = @Content(schema = @Schema(implementation 
= DropRequestEntity.class))
+                    ),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            description = "This will create a request to purge all FlowFiles 
from the connector. The connector must be in a STOPPED state before purging can 
begin. "
+                    + "This is an asynchronous operation. The client should 
poll the returned URI to get the status of the purge request.",
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response createPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String id) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.POST);
+        }
+
+        final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+        requestConnectorEntity.setId(id);
+
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                lookup -> {
+                    final Authorizable connector = lookup.getConnector(id);
+                    connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+                (connectorEntity) -> performAsyncPurge(connectorEntity, id, 
NiFiUserUtils.getNiFiUser())
+        );
+    }
+
+
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests/{purge-request-id}")
+    @Operation(
+            summary = "Gets the current status of a purge request for the 
specified connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Only the user that submitted 
the request can get it")
+            }
+    )
+    public Response getPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String connectorId,
+            @Parameter(
+                    description = "The purge request id.",
+                    required = true
+            )
+            @PathParam("purge-request-id") final String purgeRequestId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = 
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+        final DropRequestEntity purgeRequestEntity = 
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+        return generateOkResponse(purgeRequestEntity).build();
+    }
+
+
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests/{purge-request-id}")
+    @Operation(
+            summary = "Cancels and/or removes a request to purge the FlowFiles 
for this connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Only the user that submitted 
the request can remove it")
+            }
+    )
+    public Response removePurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String connectorId,
+            @Parameter(
+                    description = "The purge request id.",
+                    required = true
+            )
+            @PathParam("purge-request-id") final String purgeRequestId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.DELETE);
+        }
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();

Review Comment:
   Should there be some kind of authZ check for WRITE on the Connector?



##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java:
##########
@@ -792,6 +797,201 @@ public Response cancelDrain(
         );
     }
 
+
+    @POST
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests")
+    @Operation(
+            summary = "Creates a request to purge the FlowFiles for this 
connector",
+            responses = {
+                    @ApiResponse(
+                            responseCode = "202", description = "The request 
has been accepted. A HTTP response header will contain the URI where the 
response can be polled.",
+                            content = @Content(schema = @Schema(implementation 
= DropRequestEntity.class))
+                    ),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            description = "This will create a request to purge all FlowFiles 
from the connector. The connector must be in a STOPPED state before purging can 
begin. "
+                    + "This is an asynchronous operation. The client should 
poll the returned URI to get the status of the purge request.",
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response createPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String id) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.POST);
+        }
+
+        final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+        requestConnectorEntity.setId(id);
+
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                lookup -> {
+                    final Authorizable connector = lookup.getConnector(id);
+                    connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+                (connectorEntity) -> performAsyncPurge(connectorEntity, id, 
NiFiUserUtils.getNiFiUser())
+        );
+    }
+
+
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests/{purge-request-id}")
+    @Operation(
+            summary = "Gets the current status of a purge request for the 
specified connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Only the user that submitted 
the request can get it")
+            }
+    )
+    public Response getPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String connectorId,
+            @Parameter(
+                    description = "The purge request id.",
+                    required = true
+            )
+            @PathParam("purge-request-id") final String purgeRequestId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = 
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);

Review Comment:
   Should there be some kind of authZ check here for READ on the Connector?



##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java:
##########
@@ -792,6 +797,201 @@ public Response cancelDrain(
         );
     }
 
+
+    @POST
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests")
+    @Operation(
+            summary = "Creates a request to purge the FlowFiles for this 
connector",
+            responses = {
+                    @ApiResponse(
+                            responseCode = "202", description = "The request 
has been accepted. A HTTP response header will contain the URI where the 
response can be polled.",
+                            content = @Content(schema = @Schema(implementation 
= DropRequestEntity.class))
+                    ),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            description = "This will create a request to purge all FlowFiles 
from the connector. The connector must be in a STOPPED state before purging can 
begin. "
+                    + "This is an asynchronous operation. The client should 
poll the returned URI to get the status of the purge request.",
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response createPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String id) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.POST);
+        }
+
+        final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+        requestConnectorEntity.setId(id);
+
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                lookup -> {
+                    final Authorizable connector = lookup.getConnector(id);
+                    connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+                (connectorEntity) -> performAsyncPurge(connectorEntity, id, 
NiFiUserUtils.getNiFiUser())
+        );
+    }
+
+
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests/{purge-request-id}")
+    @Operation(
+            summary = "Gets the current status of a purge request for the 
specified connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Only the user that submitted 
the request can get it")
+            }
+    )
+    public Response getPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String connectorId,
+            @Parameter(
+                    description = "The purge request id.",
+                    required = true
+            )
+            @PathParam("purge-request-id") final String purgeRequestId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = 
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+        final DropRequestEntity purgeRequestEntity = 
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+        return generateOkResponse(purgeRequestEntity).build();
+    }
+
+
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests/{purge-request-id}")
+    @Operation(
+            summary = "Cancels and/or removes a request to purge the FlowFiles 
for this connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Only the user that submitted 
the request can remove it")
+            }
+    )
+    public Response removePurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String connectorId,
+            @Parameter(
+                    description = "The purge request id.",
+                    required = true
+            )
+            @PathParam("purge-request-id") final String purgeRequestId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.DELETE);
+        }
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
+        final boolean executionPhase = isExecutionPhase(httpServletRequest);
+
+        if (!twoPhaseRequest || executionPhase) {
+            final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = 
purgeRequestManager.removeRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+
+            if (!asyncRequest.isComplete()) {
+                asyncRequest.cancel();
+            }
+
+            final DropRequestEntity purgeRequestEntity = 
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+            return generateOkResponse(purgeRequestEntity).build();
+        }
+
+        if (isValidationPhase(httpServletRequest)) {
+            purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, 
user);
+            return generateContinueResponse().build();
+        } else if (isCancellationPhase(httpServletRequest)) {
+            return generateOkResponse().build();
+        } else {
+            throw new IllegalStateException("This request does not appear to 
be part of the two phase commit.");
+        }
+    }
+
+    private Response performAsyncPurge(final ConnectorEntity connectorEntity, 
final String connectorId, final NiFiUser user) {
+        final String requestId = generateUuid();
+        logger.debug("Generated Purge Request with ID {} for Connector {}", 
requestId, connectorId);
+
+        final List<UpdateStep> updateSteps = Collections.singletonList(new 
StandardUpdateStep("Purge FlowFiles"));
+
+        final AsynchronousWebRequest<ConnectorEntity, Void> request =
+                new StandardAsynchronousWebRequest<>(requestId, 
connectorEntity, connectorId, user, updateSteps);
+
+        final Consumer<AsynchronousWebRequest<ConnectorEntity, Void>> 
updateTask = asyncRequest -> {
+            try {
+                serviceFacade.purgeConnectorFlowFiles(connectorId, 
user.getIdentity());
+                asyncRequest.markStepComplete(null);
+            } catch (final Exception e) {
+                logger.error("Failed to purge FlowFiles for Connector {}", 
connectorId, e);
+                asyncRequest.fail("Failed to purge FlowFiles due to " + e);
+            }
+        };
+
+        purgeRequestManager.submitRequest(PURGE_REQUEST_TYPE, requestId, 
request, updateTask);
+
+        final DropRequestEntity purgeRequestEntity = 
createPurgeRequestEntity(request, connectorId, requestId);
+        final URI location = 
URI.create(purgeRequestEntity.getDropRequest().getUri());
+        return 
Response.status(Response.Status.ACCEPTED).location(location).entity(purgeRequestEntity).build();
+    }
+
+    private DropRequestEntity createPurgeRequestEntity(final 
AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest,
+                                                       final String 
connectorId, final String requestId) {
+        final DropRequestDTO dto = new DropRequestDTO();
+        dto.setId(requestId);
+        dto.setUri(generateResourceUri("connectors", connectorId, 
"purge-requests", requestId));
+        dto.setSubmissionTime(asyncRequest.getLastUpdated());
+        dto.setLastUpdated(asyncRequest.getLastUpdated());
+        dto.setPercentCompleted(asyncRequest.getPercentComplete());
+        dto.setFinished(asyncRequest.isComplete());
+        dto.setFailureReason(asyncRequest.getFailureReason());
+        dto.setState(asyncRequest.getState());

Review Comment:
   I assume this is intentional, but just mentioning how the normal usage of 
return DropRequestDTO for connections will set a few other fields like the 
dropped count and size, but maybe we don't plan to return that level of info 
for Connectors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to