bbende commented on code in PR #10833:
URL: https://github.com/apache/nifi/pull/10833#discussion_r2759453610
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java:
##########
@@ -792,6 +797,201 @@ public Response cancelDrain(
);
}
+
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests")
+ @Operation(
+ summary = "Creates a request to purge the FlowFiles for this
connector",
+ responses = {
+ @ApiResponse(
+ responseCode = "202", description = "The request
has been accepted. A HTTP response header will contain the URI where the
response can be polled.",
+ content = @Content(schema = @Schema(implementation
= DropRequestEntity.class))
+ ),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ description = "This will create a request to purge all FlowFiles
from the connector. The connector must be in a STOPPED state before purging can
begin. "
+ + "This is an asynchronous operation. The client should
poll the returned URI to get the status of the purge request.",
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response createPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String id) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.POST);
+ }
+
+ final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+ requestConnectorEntity.setId(id);
+
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ lookup -> {
+ final Authorizable connector = lookup.getConnector(id);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+ (connectorEntity) -> performAsyncPurge(connectorEntity, id,
NiFiUserUtils.getNiFiUser())
+ );
+ }
+
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests/{purge-request-id}")
+ @Operation(
+ summary = "Gets the current status of a purge request for the
specified connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Only the user that submitted
the request can get it")
+ }
+ )
+ public Response getPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String connectorId,
+ @Parameter(
+ description = "The purge request id.",
+ required = true
+ )
+ @PathParam("purge-request-id") final String purgeRequestId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest =
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+ final DropRequestEntity purgeRequestEntity =
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+ return generateOkResponse(purgeRequestEntity).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests/{purge-request-id}")
+ @Operation(
+ summary = "Cancels and/or removes a request to purge the FlowFiles
for this connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Only the user that submitted
the request can remove it")
+ }
+ )
+ public Response removePurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String connectorId,
+ @Parameter(
+ description = "The purge request id.",
+ required = true
+ )
+ @PathParam("purge-request-id") final String purgeRequestId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
Review Comment:
Should there be some kind of authZ check for WRITE on the Connector?
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java:
##########
@@ -792,6 +797,201 @@ public Response cancelDrain(
);
}
+
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests")
+ @Operation(
+ summary = "Creates a request to purge the FlowFiles for this
connector",
+ responses = {
+ @ApiResponse(
+ responseCode = "202", description = "The request
has been accepted. A HTTP response header will contain the URI where the
response can be polled.",
+ content = @Content(schema = @Schema(implementation
= DropRequestEntity.class))
+ ),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ description = "This will create a request to purge all FlowFiles
from the connector. The connector must be in a STOPPED state before purging can
begin. "
+ + "This is an asynchronous operation. The client should
poll the returned URI to get the status of the purge request.",
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response createPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String id) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.POST);
+ }
+
+ final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+ requestConnectorEntity.setId(id);
+
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ lookup -> {
+ final Authorizable connector = lookup.getConnector(id);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+ (connectorEntity) -> performAsyncPurge(connectorEntity, id,
NiFiUserUtils.getNiFiUser())
+ );
+ }
+
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests/{purge-request-id}")
+ @Operation(
+ summary = "Gets the current status of a purge request for the
specified connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Only the user that submitted
the request can get it")
+ }
+ )
+ public Response getPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String connectorId,
+ @Parameter(
+ description = "The purge request id.",
+ required = true
+ )
+ @PathParam("purge-request-id") final String purgeRequestId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest =
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
Review Comment:
Should there be some kind of authZ check here for READ on the Connector?
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java:
##########
@@ -792,6 +797,201 @@ public Response cancelDrain(
);
}
+
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests")
+ @Operation(
+ summary = "Creates a request to purge the FlowFiles for this
connector",
+ responses = {
+ @ApiResponse(
+ responseCode = "202", description = "The request
has been accepted. A HTTP response header will contain the URI where the
response can be polled.",
+ content = @Content(schema = @Schema(implementation
= DropRequestEntity.class))
+ ),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ description = "This will create a request to purge all FlowFiles
from the connector. The connector must be in a STOPPED state before purging can
begin. "
+ + "This is an asynchronous operation. The client should
poll the returned URI to get the status of the purge request.",
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response createPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String id) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.POST);
+ }
+
+ final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+ requestConnectorEntity.setId(id);
+
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ lookup -> {
+ final Authorizable connector = lookup.getConnector(id);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+ (connectorEntity) -> performAsyncPurge(connectorEntity, id,
NiFiUserUtils.getNiFiUser())
+ );
+ }
+
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests/{purge-request-id}")
+ @Operation(
+ summary = "Gets the current status of a purge request for the
specified connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Only the user that submitted
the request can get it")
+ }
+ )
+ public Response getPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String connectorId,
+ @Parameter(
+ description = "The purge request id.",
+ required = true
+ )
+ @PathParam("purge-request-id") final String purgeRequestId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest =
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+ final DropRequestEntity purgeRequestEntity =
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+ return generateOkResponse(purgeRequestEntity).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests/{purge-request-id}")
+ @Operation(
+ summary = "Cancels and/or removes a request to purge the FlowFiles
for this connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Only the user that submitted
the request can remove it")
+ }
+ )
+ public Response removePurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String connectorId,
+ @Parameter(
+ description = "The purge request id.",
+ required = true
+ )
+ @PathParam("purge-request-id") final String purgeRequestId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
+ final boolean executionPhase = isExecutionPhase(httpServletRequest);
+
+ if (!twoPhaseRequest || executionPhase) {
+ final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest =
purgeRequestManager.removeRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+
+ if (!asyncRequest.isComplete()) {
+ asyncRequest.cancel();
+ }
+
+ final DropRequestEntity purgeRequestEntity =
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+ return generateOkResponse(purgeRequestEntity).build();
+ }
+
+ if (isValidationPhase(httpServletRequest)) {
+ purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId,
user);
+ return generateContinueResponse().build();
+ } else if (isCancellationPhase(httpServletRequest)) {
+ return generateOkResponse().build();
+ } else {
+ throw new IllegalStateException("This request does not appear to
be part of the two phase commit.");
+ }
+ }
+
+ private Response performAsyncPurge(final ConnectorEntity connectorEntity,
final String connectorId, final NiFiUser user) {
+ final String requestId = generateUuid();
+ logger.debug("Generated Purge Request with ID {} for Connector {}",
requestId, connectorId);
+
+ final List<UpdateStep> updateSteps = Collections.singletonList(new
StandardUpdateStep("Purge FlowFiles"));
+
+ final AsynchronousWebRequest<ConnectorEntity, Void> request =
+ new StandardAsynchronousWebRequest<>(requestId,
connectorEntity, connectorId, user, updateSteps);
+
+ final Consumer<AsynchronousWebRequest<ConnectorEntity, Void>>
updateTask = asyncRequest -> {
+ try {
+ serviceFacade.purgeConnectorFlowFiles(connectorId,
user.getIdentity());
+ asyncRequest.markStepComplete(null);
+ } catch (final Exception e) {
+ logger.error("Failed to purge FlowFiles for Connector {}",
connectorId, e);
+ asyncRequest.fail("Failed to purge FlowFiles due to " + e);
+ }
+ };
+
+ purgeRequestManager.submitRequest(PURGE_REQUEST_TYPE, requestId,
request, updateTask);
+
+ final DropRequestEntity purgeRequestEntity =
createPurgeRequestEntity(request, connectorId, requestId);
+ final URI location =
URI.create(purgeRequestEntity.getDropRequest().getUri());
+ return
Response.status(Response.Status.ACCEPTED).location(location).entity(purgeRequestEntity).build();
+ }
+
+ private DropRequestEntity createPurgeRequestEntity(final
AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest,
+ final String
connectorId, final String requestId) {
+ final DropRequestDTO dto = new DropRequestDTO();
+ dto.setId(requestId);
+ dto.setUri(generateResourceUri("connectors", connectorId,
"purge-requests", requestId));
+ dto.setSubmissionTime(asyncRequest.getLastUpdated());
+ dto.setLastUpdated(asyncRequest.getLastUpdated());
+ dto.setPercentCompleted(asyncRequest.getPercentComplete());
+ dto.setFinished(asyncRequest.isComplete());
+ dto.setFailureReason(asyncRequest.getFailureReason());
+ dto.setState(asyncRequest.getState());
Review Comment:
I assume this is intentional, but just mentioning how the normal usage of
return DropRequestDTO for connections will set a few other fields like the
dropped count and size, but maybe we don't plan to return that level of info
for Connectors
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]