This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 4e3292bbb1 NIFI-15511: Added endpoints for purging all FlowFiles for a
given Con… (#10833)
4e3292bbb1 is described below
commit 4e3292bbb1ae2c9674e702c2785f5fbbe4b6c3d0
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 10:17:49 2026 -0500
NIFI-15511: Added endpoints for purging all FlowFiles for a given Con…
(#10833)
* NIFI-15511: Added endpoints for purging all FlowFiles for a given
Connector; added method to ConnectorClient to call these endpoints; updated
system tests to use these endpoints when tearing down flows; some bug fixes
* NIFI-15511: Added WRITE permission requirement for retrieving and
deleting a Connector Purge request
---
.../apache/nifi/groups/StandardProcessGroup.java | 12 ++
.../nifi/components/connector/ConnectorNode.java | 6 +
.../connector/StandardConnectorNode.java | 15 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 16 ++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 10 +
.../org/apache/nifi/web/api/ConnectorResource.java | 211 +++++++++++++++++++++
.../java/org/apache/nifi/web/dao/ConnectorDAO.java | 4 +
.../nifi/web/dao/impl/StandardConnectorDAO.java | 20 ++
.../apache/nifi/tests/system/NiFiClientUtil.java | 28 +++
.../nifi/toolkit/client/ConnectorClient.java | 33 ++++
.../toolkit/client/impl/JerseyConnectorClient.java | 54 ++++++
11 files changed, 408 insertions(+), 1 deletion(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 528842281e..cb62602085 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1591,6 +1591,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
aggregateDropFlowFileStatus.setState(null);
AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
+ final List<CompletableFuture<Void>> completionFutures = new
ArrayList<>();
connections.stream()
.map(Connection::getFlowFileQueue)
@@ -1598,10 +1599,21 @@ public final class StandardProcessGroup implements
ProcessGroup {
.forEach(additionalDropFlowFileStatus -> {
aggregate(aggregateDropFlowFileStatus,
additionalDropFlowFileStatus);
processedAtLeastOne.set(true);
+
completionFutures.add(additionalDropFlowFileStatus.getCompletionFuture());
});
if (processedAtLeastOne.get()) {
resultDropFlowFileStatus = aggregateDropFlowFileStatus;
+
+ // When all individual drop requests complete, mark the aggregate
as complete
+ CompletableFuture.allOf(completionFutures.toArray(new
CompletableFuture[0]))
+ .whenComplete((result, throwable) -> {
+ if (throwable != null) {
+
aggregateDropFlowFileStatus.setState(DropFlowFileState.FAILURE,
throwable.getMessage());
+ } else {
+
aggregateDropFlowFileStatus.setState(DropFlowFileState.COMPLETE);
+ }
+ });
} else {
resultDropFlowFileStatus = null;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index 13cadb3c49..6cafcbbaf1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -211,6 +211,12 @@ public interface ConnectorNode extends
ComponentAuthorizable, VersionedComponent
*/
void verifyCancelDrainFlowFiles() throws IllegalStateException;
+ /**
+ * Verifies that the Connector can have its FlowFiles purged.
+ * @throws IllegalStateException if not in a state where FlowFiles can be
purged
+ */
+ void verifyCanPurgeFlowFiles() throws IllegalStateException;
+
/**
* Purges all FlowFiles from the Connector, immediately dropping the data.
*
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index e614db7070..75525d16c3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -494,6 +494,19 @@ public class StandardConnectorNode implements
ConnectorNode {
}
}
+ @Override
+ public void verifyCanPurgeFlowFiles() throws IllegalStateException {
+ final ConnectorState desiredState = getDesiredState();
+ if (desiredState != ConnectorState.STOPPED) {
+ throw new IllegalStateException("Cannot purge FlowFiles for " +
this + " because its desired state is currently " + desiredState + "; it must
be STOPPED.");
+ }
+
+ final ConnectorState currentState = getCurrentState();
+ if (currentState != ConnectorState.STOPPED) {
+ throw new IllegalStateException("Cannot purge FlowFiles for " +
this + " because its current state is " + currentState + "; it must be
STOPPED.");
+ }
+ }
+
@Override
public Future<Void> purgeFlowFiles(final String requestor) {
requireStopped("purge FlowFiles", ConnectorState.PURGING);
@@ -520,7 +533,7 @@ public class StandardConnectorNode implements ConnectorNode
{
while (!stateUpdated) {
final ConnectorState currentState = getCurrentState();
if (currentState != ConnectorState.STOPPED) {
- throw new IllegalStateException("Cannot " + action + " for " +
this + " because its current state is currently " + currentState + "; it must
be STOPPED.");
+ throw new IllegalStateException("Cannot " + action + " for " +
this + " because its current state is " + currentState + "; it must be
STOPPED.");
}
stateUpdated =
stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index d270c0344f..c350b2b983 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -251,6 +251,22 @@ public interface NiFiServiceFacade {
Optional<Asset> getConnectorAsset(String assetId);
+ /**
+ * Verifies that the connector is in a state where FlowFiles can be purged.
+ *
+ * @param connectorId the connector ID
+ * @throws IllegalStateException if the connector is not in a state where
FlowFiles can be purged
+ */
+ void verifyPurgeConnectorFlowFiles(String connectorId);
+
+ /**
+ * Purges all FlowFiles from the connector.
+ *
+ * @param connectorId the connector ID
+ * @param requestor the identity of the user requesting the purge (used
for provenance events)
+ */
+ void purgeConnectorFlowFiles(String connectorId, String requestor);
+
// ----------------------------------------
// Synchronization methods
// ----------------------------------------
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index f08f8297b5..654644d552 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -3859,6 +3859,16 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
return connectorDAO.getAsset(assetId);
}
+ @Override
+ public void verifyPurgeConnectorFlowFiles(final String connectorId) {
+ connectorDAO.verifyPurgeFlowFiles(connectorId);
+ }
+
+ @Override
+ public void purgeConnectorFlowFiles(final String connectorId, final String
requestor) {
+ connectorDAO.purgeFlowFiles(connectorId, requestor);
+ }
+
@Override
public ReportingTaskEntity updateReportingTask(final Revision revision,
final ReportingTaskDTO reportingTaskDTO) {
// get the component, ensure we have access to it, and perform the
update request
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index 02f115b966..00a44a0575 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -77,6 +77,7 @@ import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.VerifyConnectorConfigStepRequestDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.entity.AssetEntity;
@@ -88,6 +89,7 @@ import
org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.SearchResultsEntity;
@@ -123,6 +125,7 @@ public class ConnectorResource extends ApplicationResource {
private static final Logger logger =
LoggerFactory.getLogger(ConnectorResource.class);
private static final String VERIFICATION_REQUEST_TYPE =
"verification-request";
+ private static final String PURGE_REQUEST_TYPE = "purge-request";
private static final String FILENAME_HEADER = "Filename";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String UPLOAD_CONTENT_TYPE =
"application/octet-stream";
@@ -137,6 +140,8 @@ public class ConnectorResource extends ApplicationResource {
private final RequestManager<VerifyConnectorConfigStepRequestEntity,
List<ConfigVerificationResultDTO>> configVerificationRequestManager =
new AsyncRequestManager<>(100, 1L, "Connector Configuration Step
Verification");
+ private final RequestManager<ConnectorEntity, Void> purgeRequestManager =
new AsyncRequestManager<>(100, 1L, "Connector FlowFile Purge");
+
@Context
private ServletContext servletContext;
@@ -792,6 +797,212 @@ public class ConnectorResource extends
ApplicationResource {
);
}
+
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests")
+ @Operation(
+ summary = "Creates a request to purge the FlowFiles for this
connector",
+ responses = {
+ @ApiResponse(
+ responseCode = "202", description = "The request
has been accepted. A HTTP response header will contain the URI where the
response can be polled.",
+ content = @Content(schema = @Schema(implementation
= DropRequestEntity.class))
+ ),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ description = "This will create a request to purge all FlowFiles
from the connector. The connector must be in a STOPPED state before purging can
begin. "
+ + "This is an asynchronous operation. The client should
poll the returned URI to get the status of the purge request.",
+ security = {
+ @SecurityRequirement(name = "Write - /connectors/{uuid}")
+ }
+ )
+ public Response createPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String id) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.POST);
+ }
+
+ final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+ requestConnectorEntity.setId(id);
+
+ return withWriteLock(
+ serviceFacade,
+ requestConnectorEntity,
+ lookup -> {
+ final Authorizable connector = lookup.getConnector(id);
+ connector.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ },
+ () -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
+ (connectorEntity) -> performAsyncPurge(connectorEntity, id,
NiFiUserUtils.getNiFiUser())
+ );
+ }
+
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests/{purge-request-id}")
+ @Operation(
+ summary = "Gets the current status of a purge request for the
specified connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Only the user that submitted
the request can get it")
+ }
+ )
+ public Response getPurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String connectorId,
+ @Parameter(
+ description = "The purge request id.",
+ required = true
+ )
+ @PathParam("purge-request-id") final String purgeRequestId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable connector = lookup.getConnector(connectorId);
+ connector.authorize(authorizer, RequestAction.WRITE, user);
+ });
+
+ final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest =
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+ final DropRequestEntity purgeRequestEntity =
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+ return generateOkResponse(purgeRequestEntity).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/purge-requests/{purge-request-id}")
+ @Operation(
+ summary = "Cancels and/or removes a request to purge the FlowFiles
for this connector",
+ responses = {
+ @ApiResponse(responseCode = "200", content =
@Content(schema = @Schema(implementation = DropRequestEntity.class))),
+ @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
+ @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
+ @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
+ @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
+ @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
+ },
+ security = {
+ @SecurityRequirement(name = "Only the user that submitted
the request can remove it")
+ }
+ )
+ public Response removePurgeRequest(
+ @Parameter(
+ description = "The connector id.",
+ required = true
+ )
+ @PathParam("id") final String connectorId,
+ @Parameter(
+ description = "The purge request id.",
+ required = true
+ )
+ @PathParam("purge-request-id") final String purgeRequestId) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ // Make sure user has write access to the connector
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable connector = lookup.getConnector(connectorId);
+ connector.authorize(authorizer, RequestAction.WRITE, user);
+ });
+
+ final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
+ final boolean executionPhase = isExecutionPhase(httpServletRequest);
+
+ if (!twoPhaseRequest || executionPhase) {
+ final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest =
purgeRequestManager.removeRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
+
+ if (!asyncRequest.isComplete()) {
+ asyncRequest.cancel();
+ }
+
+ final DropRequestEntity purgeRequestEntity =
createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
+ return generateOkResponse(purgeRequestEntity).build();
+ }
+
+ if (isValidationPhase(httpServletRequest)) {
+ purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId,
user);
+ return generateContinueResponse().build();
+ } else if (isCancellationPhase(httpServletRequest)) {
+ return generateOkResponse().build();
+ } else {
+ throw new IllegalStateException("This request does not appear to
be part of the two phase commit.");
+ }
+ }
+
+ private Response performAsyncPurge(final ConnectorEntity connectorEntity,
final String connectorId, final NiFiUser user) {
+ final String requestId = generateUuid();
+ logger.debug("Generated Purge Request with ID {} for Connector {}",
requestId, connectorId);
+
+ final List<UpdateStep> updateSteps = Collections.singletonList(new
StandardUpdateStep("Purge FlowFiles"));
+
+ final AsynchronousWebRequest<ConnectorEntity, Void> request =
+ new StandardAsynchronousWebRequest<>(requestId,
connectorEntity, connectorId, user, updateSteps);
+
+ final Consumer<AsynchronousWebRequest<ConnectorEntity, Void>>
updateTask = asyncRequest -> {
+ try {
+ serviceFacade.purgeConnectorFlowFiles(connectorId,
user.getIdentity());
+ asyncRequest.markStepComplete(null);
+ } catch (final Exception e) {
+ logger.error("Failed to purge FlowFiles for Connector {}",
connectorId, e);
+ asyncRequest.fail("Failed to purge FlowFiles due to " + e);
+ }
+ };
+
+ purgeRequestManager.submitRequest(PURGE_REQUEST_TYPE, requestId,
request, updateTask);
+
+ final DropRequestEntity purgeRequestEntity =
createPurgeRequestEntity(request, connectorId, requestId);
+ final URI location =
URI.create(purgeRequestEntity.getDropRequest().getUri());
+ return
Response.status(Response.Status.ACCEPTED).location(location).entity(purgeRequestEntity).build();
+ }
+
+ private DropRequestEntity createPurgeRequestEntity(final
AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest,
+ final String
connectorId, final String requestId) {
+ final DropRequestDTO dto = new DropRequestDTO();
+ dto.setId(requestId);
+ dto.setUri(generateResourceUri("connectors", connectorId,
"purge-requests", requestId));
+ dto.setSubmissionTime(asyncRequest.getLastUpdated());
+ dto.setLastUpdated(asyncRequest.getLastUpdated());
+ dto.setPercentCompleted(asyncRequest.getPercentComplete());
+ dto.setFinished(asyncRequest.isComplete());
+ dto.setFailureReason(asyncRequest.getFailureReason());
+ dto.setState(asyncRequest.getState());
+
+ final DropRequestEntity entity = new DropRequestEntity();
+ entity.setDropRequest(dto);
+ return entity;
+ }
+
/**
* Gets the configuration step names for the specified connector.
*
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
index c681c17e9a..cb2e7046ce 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
@@ -54,6 +54,10 @@ public interface ConnectorDAO {
void verifyCancelDrainFlowFile(String id);
+ void verifyPurgeFlowFiles(String id);
+
+ void purgeFlowFiles(String id, String requestor);
+
void updateConnectorConfigurationStep(String id, String
configurationStepName, ConfigurationStepConfigurationDTO
configurationStepConfiguration);
void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index 409dac9623..c9fd333180 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@Repository
@@ -150,6 +151,25 @@ public class StandardConnectorDAO implements ConnectorDAO {
connector.verifyCancelDrainFlowFiles();
}
+ @Override
+ public void verifyPurgeFlowFiles(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ connector.verifyCanPurgeFlowFiles();
+ }
+
+ @Override
+ public void purgeFlowFiles(final String id, final String requestor) {
+ final ConnectorNode connector = getConnector(id);
+ try {
+ connector.purgeFlowFiles(requestor).get();
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Thread was interrupted while
purging FlowFiles for Connector " + id, e);
+ } catch (final ExecutionException e) {
+ throw new IllegalStateException("Failed to purge FlowFiles for
Connector " + id, e.getCause());
+ }
+ }
+
@Override
public void updateConnectorConfigurationStep(final String id, final String
configurationStepName, final ConfigurationStepConfigurationDTO
configurationStepDto) {
final ConnectorNode connector = getConnector(id);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 311843db5d..e0116ff9f7 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -1472,11 +1472,39 @@ public class NiFiClientUtil {
public void deleteConnectors() throws NiFiClientException, IOException {
final ConnectorsEntity connectors =
nifiClient.getFlowClient().getConnectors();
for (final ConnectorEntity connector : connectors.getConnectors()) {
+ purgeConnectorFlowFiles(connector.getId());
connector.setDisconnectedNodeAcknowledged(true);
nifiClient.getConnectorClient().deleteConnector(connector);
}
}
+ public DropRequestEntity purgeConnectorFlowFiles(final String connectorId)
throws NiFiClientException, IOException {
+ final ConnectorClient connectorClient = getConnectorClient();
+ final long maxTimestamp = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(1L);
+
+ DropRequestEntity requestEntity =
connectorClient.createPurgeRequest(connectorId);
+ try {
+ while (requestEntity.getDropRequest().getPercentCompleted() < 100)
{
+ if (System.currentTimeMillis() > maxTimestamp) {
+ throw new IOException("Timed out waiting for Connector " +
connectorId + " to purge FlowFiles");
+ }
+
+ try {
+ Thread.sleep(50L);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+
+ requestEntity = connectorClient.getPurgeRequest(connectorId,
requestEntity.getDropRequest().getId());
+ }
+ } finally {
+ requestEntity = connectorClient.deletePurgeRequest(connectorId,
requestEntity.getDropRequest().getId());
+ }
+
+ return requestEntity;
+ }
+
public void waitForControllerServiceRunStatus(final String id, final
String requestedRunStatus) throws NiFiClientException, IOException {
final long maxTimestamp = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(2L);
logger.info("Waiting for Controller Service {} to have a Run Status of
{}", id, requestedRunStatus);
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
index 76dfefaeaa..3379548955 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
@@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity;
@@ -333,6 +334,38 @@ public interface ConnectorClient {
*/
Path getAssetContent(String connectorId, String assetId, File
outputDirectory) throws NiFiClientException, IOException;
+ /**
+ * Creates a request to purge all FlowFiles for the given connector.
+ *
+ * @param connectorId the connector ID
+ * @return the drop request entity containing the purge request status
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ DropRequestEntity createPurgeRequest(String connectorId) throws
NiFiClientException, IOException;
+
+ /**
+ * Gets the status of a purge request for the given connector.
+ *
+ * @param connectorId the connector ID
+ * @param purgeRequestId the purge request ID
+ * @return the drop request entity containing the purge request status
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ DropRequestEntity getPurgeRequest(String connectorId, String
purgeRequestId) throws NiFiClientException, IOException;
+
+ /**
+ * Deletes (cancels) a purge request for the given connector.
+ *
+ * @param connectorId the connector ID
+ * @param purgeRequestId the purge request ID
+ * @return the drop request entity containing the final purge request
status
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ DropRequestEntity deletePurgeRequest(String connectorId, String
purgeRequestId) throws NiFiClientException, IOException;
+
/**
* Indicates that mutable requests should indicate that the client has
* acknowledged that the node is disconnected.
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
index 4f9b8d8aaa..06476292da 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
@@ -32,6 +32,7 @@ import
org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity;
import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity;
@@ -539,4 +540,57 @@ public class JerseyConnectorClient extends
AbstractJerseyClient implements Conne
}
});
}
+
+ @Override
+ public DropRequestEntity createPurgeRequest(final String connectorId)
throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector id cannot be null or
blank");
+ }
+
+ return executeAction("Error creating purge request for Connector " +
connectorId, () -> {
+ final WebTarget target = connectorsTarget
+ .path("{id}/purge-requests")
+ .resolveTemplate("id", connectorId);
+
+ return getRequestBuilder(target).post(null,
DropRequestEntity.class);
+ });
+ }
+
+ @Override
+ public DropRequestEntity getPurgeRequest(final String connectorId, final
String purgeRequestId) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector id cannot be null or
blank");
+ }
+ if (StringUtils.isBlank(purgeRequestId)) {
+ throw new IllegalArgumentException("Purge request id cannot be
null or blank");
+ }
+
+ return executeAction("Error getting purge request for Connector " +
connectorId, () -> {
+ final WebTarget target = connectorsTarget
+ .path("{id}/purge-requests/{purgeRequestId}")
+ .resolveTemplate("id", connectorId)
+ .resolveTemplate("purgeRequestId", purgeRequestId);
+
+ return getRequestBuilder(target).get(DropRequestEntity.class);
+ });
+ }
+
+ @Override
+ public DropRequestEntity deletePurgeRequest(final String connectorId,
final String purgeRequestId) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector id cannot be null or
blank");
+ }
+ if (StringUtils.isBlank(purgeRequestId)) {
+ throw new IllegalArgumentException("Purge request id cannot be
null or blank");
+ }
+
+ return executeAction("Error deleting purge request for Connector " +
connectorId, () -> {
+ final WebTarget target = connectorsTarget
+ .path("{id}/purge-requests/{purgeRequestId}")
+ .resolveTemplate("id", connectorId)
+ .resolveTemplate("purgeRequestId", purgeRequestId);
+
+ return getRequestBuilder(target).delete(DropRequestEntity.class);
+ });
+ }
}