This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/NIFI-15258 by this push:
     new 4e3292bbb1 NIFI-15511: Added endpoints for purging all FlowFiles for a 
given Con… (#10833)
4e3292bbb1 is described below

commit 4e3292bbb1ae2c9674e702c2785f5fbbe4b6c3d0
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 10:17:49 2026 -0500

    NIFI-15511: Added endpoints for purging all FlowFiles for a given Con… 
(#10833)
    
    * NIFI-15511: Added endpoints for purging all FlowFiles for a given 
Connector; added method to ConnectorClient to call these endpoints; updated 
system tests to use these endpoints when tearing down flows; some bug fixes
    
    * NIFI-15511: Added WRITE permission requirement for retrieving and 
deleting a Connector Purge request
---
 .../apache/nifi/groups/StandardProcessGroup.java   |  12 ++
 .../nifi/components/connector/ConnectorNode.java   |   6 +
 .../connector/StandardConnectorNode.java           |  15 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java     |  16 ++
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  10 +
 .../org/apache/nifi/web/api/ConnectorResource.java | 211 +++++++++++++++++++++
 .../java/org/apache/nifi/web/dao/ConnectorDAO.java |   4 +
 .../nifi/web/dao/impl/StandardConnectorDAO.java    |  20 ++
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  28 +++
 .../nifi/toolkit/client/ConnectorClient.java       |  33 ++++
 .../toolkit/client/impl/JerseyConnectorClient.java |  54 ++++++
 11 files changed, 408 insertions(+), 1 deletion(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 528842281e..cb62602085 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1591,6 +1591,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         aggregateDropFlowFileStatus.setState(null);
 
         AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
+        final List<CompletableFuture<Void>> completionFutures = new 
ArrayList<>();
 
         connections.stream()
             .map(Connection::getFlowFileQueue)
@@ -1598,10 +1599,21 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             .forEach(additionalDropFlowFileStatus -> {
                 aggregate(aggregateDropFlowFileStatus, 
additionalDropFlowFileStatus);
                 processedAtLeastOne.set(true);
+                
completionFutures.add(additionalDropFlowFileStatus.getCompletionFuture());
             });
 
         if (processedAtLeastOne.get()) {
             resultDropFlowFileStatus = aggregateDropFlowFileStatus;
+
+            // When all individual drop requests complete, mark the aggregate 
as complete
+            CompletableFuture.allOf(completionFutures.toArray(new 
CompletableFuture[0]))
+                .whenComplete((result, throwable) -> {
+                    if (throwable != null) {
+                        
aggregateDropFlowFileStatus.setState(DropFlowFileState.FAILURE, 
throwable.getMessage());
+                    } else {
+                        
aggregateDropFlowFileStatus.setState(DropFlowFileState.COMPLETE);
+                    }
+                });
         } else {
             resultDropFlowFileStatus = null;
         }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index 13cadb3c49..6cafcbbaf1 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -211,6 +211,12 @@ public interface ConnectorNode extends 
ComponentAuthorizable, VersionedComponent
      */
     void verifyCancelDrainFlowFiles() throws IllegalStateException;
 
+    /**
+     * Verifies that the Connector can have its FlowFiles purged.
+     * @throws IllegalStateException if not in a state where FlowFiles can be 
purged
+     */
+    void verifyCanPurgeFlowFiles() throws IllegalStateException;
+
     /**
      * Purges all FlowFiles from the Connector, immediately dropping the data.
      *
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index e614db7070..75525d16c3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -494,6 +494,19 @@ public class StandardConnectorNode implements 
ConnectorNode {
         }
     }
 
+    @Override
+    public void verifyCanPurgeFlowFiles() throws IllegalStateException {
+        final ConnectorState desiredState = getDesiredState();
+        if (desiredState != ConnectorState.STOPPED) {
+            throw new IllegalStateException("Cannot purge FlowFiles for " + 
this + " because its desired state is currently " + desiredState + "; it must 
be STOPPED.");
+        }
+
+        final ConnectorState currentState = getCurrentState();
+        if (currentState != ConnectorState.STOPPED) {
+            throw new IllegalStateException("Cannot purge FlowFiles for " + 
this + " because its current state is " + currentState + "; it must be 
STOPPED.");
+        }
+    }
+
     @Override
     public Future<Void> purgeFlowFiles(final String requestor) {
         requireStopped("purge FlowFiles", ConnectorState.PURGING);
@@ -520,7 +533,7 @@ public class StandardConnectorNode implements ConnectorNode 
{
         while (!stateUpdated) {
             final ConnectorState currentState = getCurrentState();
             if (currentState != ConnectorState.STOPPED) {
-                throw new IllegalStateException("Cannot " + action + " for " + 
this + " because its current state is currently " + currentState + "; it must 
be STOPPED.");
+                throw new IllegalStateException("Cannot " + action + " for " + 
this + " because its current state is " + currentState + "; it must be 
STOPPED.");
             }
 
             stateUpdated = 
stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index d270c0344f..c350b2b983 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -251,6 +251,22 @@ public interface NiFiServiceFacade {
 
     Optional<Asset> getConnectorAsset(String assetId);
 
+    /**
+     * Verifies that the connector is in a state where FlowFiles can be purged.
+     *
+     * @param connectorId the connector ID
+     * @throws IllegalStateException if the connector is not in a state where 
FlowFiles can be purged
+     */
+    void verifyPurgeConnectorFlowFiles(String connectorId);
+
+    /**
+     * Purges all FlowFiles from the connector.
+     *
+     * @param connectorId the connector ID
+     * @param requestor the identity of the user requesting the purge (used 
for provenance events)
+     */
+    void purgeConnectorFlowFiles(String connectorId, String requestor);
+
     // ----------------------------------------
     // Synchronization methods
     // ----------------------------------------
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index f08f8297b5..654644d552 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -3859,6 +3859,16 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         return connectorDAO.getAsset(assetId);
     }
 
+    @Override
+    public void verifyPurgeConnectorFlowFiles(final String connectorId) {
+        connectorDAO.verifyPurgeFlowFiles(connectorId);
+    }
+
+    @Override
+    public void purgeConnectorFlowFiles(final String connectorId, final String 
requestor) {
+        connectorDAO.purgeFlowFiles(connectorId, requestor);
+    }
+
     @Override
     public ReportingTaskEntity updateReportingTask(final Revision revision, 
final ReportingTaskDTO reportingTaskDTO) {
         // get the component, ensure we have access to it, and perform the 
update request
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index 02f115b966..00a44a0575 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -77,6 +77,7 @@ import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
 import org.apache.nifi.web.api.dto.ConnectorDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.VerifyConnectorConfigStepRequestDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
 import org.apache.nifi.web.api.entity.AssetEntity;
@@ -88,6 +89,7 @@ import 
org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
 import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.SearchResultsEntity;
@@ -123,6 +125,7 @@ public class ConnectorResource extends ApplicationResource {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ConnectorResource.class);
     private static final String VERIFICATION_REQUEST_TYPE = 
"verification-request";
+    private static final String PURGE_REQUEST_TYPE = "purge-request";
     private static final String FILENAME_HEADER = "Filename";
     private static final String CONTENT_TYPE_HEADER = "Content-Type";
     private static final String UPLOAD_CONTENT_TYPE = 
"application/octet-stream";
@@ -137,6 +140,8 @@ public class ConnectorResource extends ApplicationResource {
     private final RequestManager<VerifyConnectorConfigStepRequestEntity, 
List<ConfigVerificationResultDTO>> configVerificationRequestManager =
             new AsyncRequestManager<>(100, 1L, "Connector Configuration Step 
Verification");
 
+    private final RequestManager<ConnectorEntity, Void> purgeRequestManager = 
new AsyncRequestManager<>(100, 1L, "Connector FlowFile Purge");
+
     @Context
     private ServletContext servletContext;
 
@@ -792,6 +797,212 @@ public class ConnectorResource extends 
ApplicationResource {
         );
     }
 
+
+    @POST
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests")
+    @Operation(
+            summary = "Creates a request to purge the FlowFiles for this 
connector",
+            responses = {
+                    @ApiResponse(
+                            responseCode = "202", description = "The request 
has been accepted. A HTTP response header will contain the URI where the 
response can be polled.",
+                            content = @Content(schema = @Schema(implementation 
= DropRequestEntity.class))
+                    ),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            description = "This will create a request to purge all FlowFiles 
from the connector. The connector must be in a STOPPED state before purging can 
begin. "
+                    + "This is an asynchronous operation. The client should 
poll the returned URI to get the status of the purge request.",
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid}")
+            }
+    )
+    public Response createPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String id) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.POST);
+        }
+
+        final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+        requestConnectorEntity.setId(id);
+
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                lookup -> {
+                    final Authorizable connector = lookup.getConnector(id);
+                    connector.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+                (connectorEntity) -> performAsyncPurge(connectorEntity, id, 
NiFiUserUtils.getNiFiUser())
+        );
+    }
+
+
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests/{purge-request-id}")
+    @Operation(
+            summary = "Gets the current status of a purge request for the 
specified connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Only the user that submitted 
the request can get it")
+            }
+    )
+    public Response getPurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String connectorId,
+            @Parameter(
+                    description = "The purge request id.",
+                    required = true
+            )
+            @PathParam("purge-request-id") final String purgeRequestId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connector = lookup.getConnector(connectorId);
+            connector.authorize(authorizer, RequestAction.WRITE, user);
+        });
+
+        final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = 
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+        final DropRequestEntity purgeRequestEntity = 
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+        return generateOkResponse(purgeRequestEntity).build();
+    }
+
+
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/purge-requests/{purge-request-id}")
+    @Operation(
+            summary = "Cancels and/or removes a request to purge the FlowFiles 
for this connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Only the user that submitted 
the request can remove it")
+            }
+    )
+    public Response removePurgeRequest(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String connectorId,
+            @Parameter(
+                    description = "The purge request id.",
+                    required = true
+            )
+            @PathParam("purge-request-id") final String purgeRequestId) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.DELETE);
+        }
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        // Make sure user has write access to the connector
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connector = lookup.getConnector(connectorId);
+            connector.authorize(authorizer, RequestAction.WRITE, user);
+        });
+
+        final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
+        final boolean executionPhase = isExecutionPhase(httpServletRequest);
+
+        if (!twoPhaseRequest || executionPhase) {
+            final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = 
purgeRequestManager.removeRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+
+            if (!asyncRequest.isComplete()) {
+                asyncRequest.cancel();
+            }
+
+            final DropRequestEntity purgeRequestEntity = 
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+            return generateOkResponse(purgeRequestEntity).build();
+        }
+
+        if (isValidationPhase(httpServletRequest)) {
+            purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, 
user);
+            return generateContinueResponse().build();
+        } else if (isCancellationPhase(httpServletRequest)) {
+            return generateOkResponse().build();
+        } else {
+            throw new IllegalStateException("This request does not appear to 
be part of the two phase commit.");
+        }
+    }
+
+    private Response performAsyncPurge(final ConnectorEntity connectorEntity, 
final String connectorId, final NiFiUser user) {
+        final String requestId = generateUuid();
+        logger.debug("Generated Purge Request with ID {} for Connector {}", 
requestId, connectorId);
+
+        final List<UpdateStep> updateSteps = Collections.singletonList(new 
StandardUpdateStep("Purge FlowFiles"));
+
+        final AsynchronousWebRequest<ConnectorEntity, Void> request =
+                new StandardAsynchronousWebRequest<>(requestId, 
connectorEntity, connectorId, user, updateSteps);
+
+        final Consumer<AsynchronousWebRequest<ConnectorEntity, Void>> 
updateTask = asyncRequest -> {
+            try {
+                serviceFacade.purgeConnectorFlowFiles(connectorId, 
user.getIdentity());
+                asyncRequest.markStepComplete(null);
+            } catch (final Exception e) {
+                logger.error("Failed to purge FlowFiles for Connector {}", 
connectorId, e);
+                asyncRequest.fail("Failed to purge FlowFiles due to " + e);
+            }
+        };
+
+        purgeRequestManager.submitRequest(PURGE_REQUEST_TYPE, requestId, 
request, updateTask);
+
+        final DropRequestEntity purgeRequestEntity = 
createPurgeRequestEntity(request, connectorId, requestId);
+        final URI location = 
URI.create(purgeRequestEntity.getDropRequest().getUri());
+        return 
Response.status(Response.Status.ACCEPTED).location(location).entity(purgeRequestEntity).build();
+    }
+
+    private DropRequestEntity createPurgeRequestEntity(final 
AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest,
+                                                       final String 
connectorId, final String requestId) {
+        final DropRequestDTO dto = new DropRequestDTO();
+        dto.setId(requestId);
+        dto.setUri(generateResourceUri("connectors", connectorId, 
"purge-requests", requestId));
+        dto.setSubmissionTime(asyncRequest.getLastUpdated());
+        dto.setLastUpdated(asyncRequest.getLastUpdated());
+        dto.setPercentCompleted(asyncRequest.getPercentComplete());
+        dto.setFinished(asyncRequest.isComplete());
+        dto.setFailureReason(asyncRequest.getFailureReason());
+        dto.setState(asyncRequest.getState());
+
+        final DropRequestEntity entity = new DropRequestEntity();
+        entity.setDropRequest(dto);
+        return entity;
+    }
+
     /**
      * Gets the configuration step names for the specified connector.
      *
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
index c681c17e9a..cb2e7046ce 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
@@ -54,6 +54,10 @@ public interface ConnectorDAO {
 
     void verifyCancelDrainFlowFile(String id);
 
+    void verifyPurgeFlowFiles(String id);
+
+    void purgeFlowFiles(String id, String requestor);
+
     void updateConnectorConfigurationStep(String id, String 
configurationStepName, ConfigurationStepConfigurationDTO 
configurationStepConfiguration);
 
     void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index 409dac9623..c9fd333180 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -53,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 @Repository
@@ -150,6 +151,25 @@ public class StandardConnectorDAO implements ConnectorDAO {
         connector.verifyCancelDrainFlowFiles();
     }
 
+    @Override
+    public void verifyPurgeFlowFiles(final String id) {
+        final ConnectorNode connector = getConnector(id);
+        connector.verifyCanPurgeFlowFiles();
+    }
+
+    @Override
+    public void purgeFlowFiles(final String id, final String requestor) {
+        final ConnectorNode connector = getConnector(id);
+        try {
+            connector.purgeFlowFiles(requestor).get();
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException("Thread was interrupted while 
purging FlowFiles for Connector " + id, e);
+        } catch (final ExecutionException e) {
+            throw new IllegalStateException("Failed to purge FlowFiles for 
Connector " + id, e.getCause());
+        }
+    }
+
     @Override
     public void updateConnectorConfigurationStep(final String id, final String 
configurationStepName, final ConfigurationStepConfigurationDTO 
configurationStepDto) {
         final ConnectorNode connector = getConnector(id);
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 311843db5d..e0116ff9f7 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -1472,11 +1472,39 @@ public class NiFiClientUtil {
     public void deleteConnectors() throws NiFiClientException, IOException {
         final ConnectorsEntity connectors = 
nifiClient.getFlowClient().getConnectors();
         for (final ConnectorEntity connector : connectors.getConnectors()) {
+            purgeConnectorFlowFiles(connector.getId());
             connector.setDisconnectedNodeAcknowledged(true);
             nifiClient.getConnectorClient().deleteConnector(connector);
         }
     }
 
+    public DropRequestEntity purgeConnectorFlowFiles(final String connectorId) 
throws NiFiClientException, IOException {
+        final ConnectorClient connectorClient = getConnectorClient();
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(1L);
+
+        DropRequestEntity requestEntity = 
connectorClient.createPurgeRequest(connectorId);
+        try {
+            while (requestEntity.getDropRequest().getPercentCompleted() < 100) 
{
+                if (System.currentTimeMillis() > maxTimestamp) {
+                    throw new IOException("Timed out waiting for Connector " + 
connectorId + " to purge FlowFiles");
+                }
+
+                try {
+                    Thread.sleep(50L);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    return null;
+                }
+
+                requestEntity = connectorClient.getPurgeRequest(connectorId, 
requestEntity.getDropRequest().getId());
+            }
+        } finally {
+            requestEntity = connectorClient.deletePurgeRequest(connectorId, 
requestEntity.getDropRequest().getId());
+        }
+
+        return requestEntity;
+    }
+
     public void waitForControllerServiceRunStatus(final String id, final 
String requestedRunStatus) throws NiFiClientException, IOException {
         final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
         logger.info("Waiting for Controller Service {} to have a Run Status of 
{}", id, requestedRunStatus);
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
index 76dfefaeaa..3379548955 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
@@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
 import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
 import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity;
@@ -333,6 +334,38 @@ public interface ConnectorClient {
      */
     Path getAssetContent(String connectorId, String assetId, File 
outputDirectory) throws NiFiClientException, IOException;
 
+    /**
+     * Creates a request to purge all FlowFiles for the given connector.
+     *
+     * @param connectorId the connector ID
+     * @return the drop request entity containing the purge request status
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    DropRequestEntity createPurgeRequest(String connectorId) throws 
NiFiClientException, IOException;
+
+    /**
+     * Gets the status of a purge request for the given connector.
+     *
+     * @param connectorId the connector ID
+     * @param purgeRequestId the purge request ID
+     * @return the drop request entity containing the purge request status
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    DropRequestEntity getPurgeRequest(String connectorId, String 
purgeRequestId) throws NiFiClientException, IOException;
+
+    /**
+     * Deletes (cancels) a purge request for the given connector.
+     *
+     * @param connectorId the connector ID
+     * @param purgeRequestId the purge request ID
+     * @return the drop request entity containing the final purge request 
status
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    DropRequestEntity deletePurgeRequest(String connectorId, String 
purgeRequestId) throws NiFiClientException, IOException;
+
     /**
      * Indicates that mutable requests should indicate that the client has
      * acknowledged that the node is disconnected.
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
index 4f9b8d8aaa..06476292da 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
@@ -32,6 +32,7 @@ import 
org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
 import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
 import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity;
@@ -539,4 +540,57 @@ public class JerseyConnectorClient extends 
AbstractJerseyClient implements Conne
             }
         });
     }
+
+    @Override
+    public DropRequestEntity createPurgeRequest(final String connectorId) 
throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector id cannot be null or 
blank");
+        }
+
+        return executeAction("Error creating purge request for Connector " + 
connectorId, () -> {
+            final WebTarget target = connectorsTarget
+                .path("{id}/purge-requests")
+                .resolveTemplate("id", connectorId);
+
+            return getRequestBuilder(target).post(null, 
DropRequestEntity.class);
+        });
+    }
+
+    @Override
+    public DropRequestEntity getPurgeRequest(final String connectorId, final 
String purgeRequestId) throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector id cannot be null or 
blank");
+        }
+        if (StringUtils.isBlank(purgeRequestId)) {
+            throw new IllegalArgumentException("Purge request id cannot be 
null or blank");
+        }
+
+        return executeAction("Error getting purge request for Connector " + 
connectorId, () -> {
+            final WebTarget target = connectorsTarget
+                .path("{id}/purge-requests/{purgeRequestId}")
+                .resolveTemplate("id", connectorId)
+                .resolveTemplate("purgeRequestId", purgeRequestId);
+
+            return getRequestBuilder(target).get(DropRequestEntity.class);
+        });
+    }
+
+    @Override
+    public DropRequestEntity deletePurgeRequest(final String connectorId, 
final String purgeRequestId) throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector id cannot be null or 
blank");
+        }
+        if (StringUtils.isBlank(purgeRequestId)) {
+            throw new IllegalArgumentException("Purge request id cannot be 
null or blank");
+        }
+
+        return executeAction("Error deleting purge request for Connector " + 
connectorId, () -> {
+            final WebTarget target = connectorsTarget
+                .path("{id}/purge-requests/{purgeRequestId}")
+                .resolveTemplate("id", connectorId)
+                .resolveTemplate("purgeRequestId", purgeRequestId);
+
+            return getRequestBuilder(target).delete(DropRequestEntity.class);
+        });
+    }
 }


Reply via email to