This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 135d3f0d70a7b49c6189db084f400cb0ef748d27
Author: Mark Payne <[email protected]>
AuthorDate: Wed Jan 21 14:20:07 2026 -0500

    NIFI-15461: Added ability to initiate drainage of Connector's FlowFiles and 
calce; added tests to verify; some bug fixes (#10767)
---
 .../http/StandardHttpResponseMapper.java           |   2 +
 .../endpoints/ConnectorStatusEndpointMerger.java   |  86 ++++++++
 .../ClusteredConnectorRequestReplicator.java       |  22 +-
 .../cluster/manager/ConnectorEntityMerger.java     |  29 ++-
 .../apache/nifi/cluster/manager/StatusMerger.java  |  15 ++
 .../cluster/manager/ConnectorEntityMergerTest.java |  30 +++
 .../nifi/components/connector/ConnectorNode.java   |  12 ++
 .../connector/StandardConnectorNode.java           |  83 +++++++-
 .../connector/StandardConnectorRepository.java     |  22 +-
 .../components/connector/StandardFlowContext.java  |   5 +
 .../connector/TestStandardConnectorNode.java       | 118 +++++++++++
 .../org/apache/nifi/web/NiFiServiceFacade.java     |   8 +
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  57 ++++++
 .../org/apache/nifi/web/api/ConnectorResource.java | 149 ++++++++++++++
 .../java/org/apache/nifi/web/dao/ConnectorDAO.java |   6 +
 .../nifi/web/dao/impl/StandardConnectorDAO.java    |  18 ++
 .../apache/nifi/web/api/TestConnectorResource.java |  62 +++++-
 .../tests/system/DataQueuingConnector.java         |   3 +-
 ...nnector.java => GatedDataQueuingConnector.java} |  73 ++++++-
 .../processors/tests/system/TerminateFlowFile.java |  28 +++
 .../org.apache.nifi.components.connector.Connector |   1 +
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  24 +++
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |  23 +++
 .../connectors/ClusteredConnectorDrainIT.java      | 228 +++++++++++++++++++++
 .../tests/system/connectors/ConnectorDrainIT.java  | 178 ++++++++++++++++
 .../nifi/toolkit/client/ConnectorClient.java       |  44 ++++
 .../toolkit/client/impl/JerseyConnectorClient.java |  69 +++++++
 27 files changed, 1357 insertions(+), 38 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index 0eefb8b763..efa017ae5f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -30,6 +30,7 @@ import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorEndpointMerg
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorFlowEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorPropertyGroupEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorPropertyGroupNamesEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorStatusEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectorsEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger;
 import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger;
@@ -146,6 +147,7 @@ public class StandardHttpResponseMapper implements 
HttpResponseMapper {
         endpointMergers.add(new ProcessorRunStatusDetailsEndpointMerger());
         endpointMergers.add(new ConnectorEndpointMerger());
         endpointMergers.add(new ConnectorsEndpointMerger());
+        endpointMergers.add(new ConnectorStatusEndpointMerger());
         endpointMergers.add(new ConnectorFlowEndpointMerger());
         endpointMergers.add(new ConnectorPropertyGroupEndpointMerger());
         endpointMergers.add(new ConnectorPropertyGroupNamesEndpointMerger());
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java
new file mode 100644
index 0000000000..bff04b3fe7
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectorStatusEndpointMerger.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.ComponentEntityStatusMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ConnectorStatusEndpointMerger extends 
AbstractSingleEntityEndpoint<ProcessGroupStatusEntity> implements 
ComponentEntityStatusMerger<ProcessGroupStatusDTO> {
+    public static final Pattern CONNECTOR_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/status");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONNECTOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<ProcessGroupStatusEntity> getEntityClass() {
+        return ProcessGroupStatusEntity.class;
+    }
+
+    @Override
+    protected void mergeResponses(final ProcessGroupStatusEntity clientEntity, 
final Map<NodeIdentifier, ProcessGroupStatusEntity> entityMap,
+                                  final Set<NodeResponse> successfulResponses, 
final Set<NodeResponse> problematicResponses) {
+        final ProcessGroupStatusDTO mergedProcessGroupStatus = 
clientEntity.getProcessGroupStatus();
+        mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<>());
+
+        final NodeIdentifier selectedNodeId = entityMap.entrySet().stream()
+                .filter(e -> e.getValue() == clientEntity)
+                .map(Map.Entry::getKey)
+                .findFirst()
+                .orElse(null);
+
+        final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new 
NodeProcessGroupStatusSnapshotDTO();
+        
selectedNodeSnapshot.setStatusSnapshot(mergedProcessGroupStatus.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        for (final Map.Entry<NodeIdentifier, ProcessGroupStatusEntity> entry : 
entityMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessGroupStatusEntity nodeProcessGroupStatusEntity = 
entry.getValue();
+            final ProcessGroupStatusDTO nodeProcessGroupStatus = 
nodeProcessGroupStatusEntity.getProcessGroupStatus();
+            if (nodeProcessGroupStatus == mergedProcessGroupStatus) {
+                continue;
+            }
+
+            mergeStatus(mergedProcessGroupStatus, clientEntity.getCanRead(), 
nodeProcessGroupStatus, nodeProcessGroupStatusEntity.getCanRead(), nodeId);
+        }
+    }
+
+    @Override
+    public void mergeStatus(final ProcessGroupStatusDTO clientStatus, final 
boolean clientStatusReadablePermission,
+                            final ProcessGroupStatusDTO status, final boolean 
statusReadablePermission,
+                            final NodeIdentifier statusNodeIdentifier) {
+        StatusMerger.merge(clientStatus, clientStatusReadablePermission, 
status, statusReadablePermission,
+                statusNodeIdentifier.getId(), 
statusNodeIdentifier.getApiAddress(), statusNodeIdentifier.getApiPort());
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
index 8fdc58dd35..a3db59cf4f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ClusteredConnectorRequestReplicator.java
@@ -24,11 +24,13 @@ import jakarta.ws.rs.core.Response.StatusType;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.StandardNiFiUser;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.components.connector.ConnectorRequestReplicator;
 import org.apache.nifi.components.connector.ConnectorState;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.Entity;
 
 import java.io.IOException;
 import java.net.URI;
@@ -55,14 +57,26 @@ public class ClusteredConnectorRequestReplicator implements 
ConnectorRequestRepl
     public ConnectorState getState(final String connectorId) throws 
IOException {
         final RequestReplicator requestReplicator = getRequestReplicator();
         final NiFiUser nodeUser = getNodeUser();
-        final AsyncClusterResponse asyncResponse = 
requestReplicator.replicate(nodeUser, GET, URI.create(replicationScheme + 
"://localhost/nifi-api/connectors/" + connectorId), Map.of(), Map.of());
+        final URI uri = URI.create(replicationScheme + 
"://localhost/nifi-api/connectors/" + connectorId);
+        final AsyncClusterResponse asyncResponse = 
requestReplicator.replicate(nodeUser, GET, uri, Map.of(), Map.of());
+
         try {
-            final Response response = 
asyncResponse.awaitMergedResponse().getClientResponse();
+            final NodeResponse mergedNodeResponse = 
asyncResponse.awaitMergedResponse();
+            final Response response = mergedNodeResponse.getClientResponse();
             verifyResponse(response.getStatusInfo(), connectorId);
 
-            final ConnectorEntity connectorEntity = 
response.readEntity(ConnectorEntity.class);
-            final String stateName = connectorEntity.getComponent().getState();
+            // Use the merged/updated entity if available, otherwise fall back 
to reading from the raw response.
+            // The updatedEntity contains the properly merged state from all 
nodes, while readEntity() would
+            // only return the state from whichever single node was selected 
as the "client response".
+            final ConnectorEntity connectorEntity;
+            final Entity updatedEntity = mergedNodeResponse.getUpdatedEntity();
+            if (updatedEntity instanceof ConnectorEntity 
mergedConnectorEntity) {
+                connectorEntity = mergedConnectorEntity;
+            } else {
+                connectorEntity = response.readEntity(ConnectorEntity.class);
+            }
 
+            final String stateName = connectorEntity.getComponent().getState();
             try {
                 return ConnectorState.valueOf(stateName);
             } catch (final IllegalArgumentException e) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
index 5f880dfc3f..7ef7fdf76b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectorEntityMerger.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.cluster.manager;
 
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.components.connector.ConnectorState;
 import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
 import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
 import org.apache.nifi.web.api.dto.ConnectorDTO;
@@ -26,7 +27,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class ConnectorEntityMerger {
-
     /**
      * Merges the ConnectorEntity responses.
      *
@@ -43,6 +43,16 @@ public class ConnectorEntityMerger {
         }
 
         mergeDtos(clientDto, dtoMap);
+
+        mergeStatus(clientEntity, entityMap);
+    }
+
+    private static void mergeStatus(final ConnectorEntity clientEntity, final 
Map<NodeIdentifier, ConnectorEntity> entityMap) {
+        for (final ConnectorEntity nodeEntity : entityMap.values()) {
+            if (nodeEntity != clientEntity && nodeEntity != null) {
+                StatusMerger.merge(clientEntity.getStatus(), 
nodeEntity.getStatus());
+            }
+        }
     }
 
     private static void mergeDtos(final ConnectorDTO clientDto, final 
Map<NodeIdentifier, ConnectorDTO> dtoMap) {
@@ -89,13 +99,16 @@ public class ConnectorEntityMerger {
         if (state == null) {
             return 0;
         }
-        return switch (state) {
-            case "UPDATE_FAILED" -> 6;
-            case "PREPARING_FOR_UPDATE" -> 5;
-            case "UPDATING" -> 4;
-            case "UPDATED" -> 3;
-            case "STARTING", "STOPPING" -> 2;
-            default -> 1; // RUNNING, STOPPED, DISABLED
+
+        final ConnectorState connectorState = ConnectorState.valueOf(state);
+        return switch (connectorState) {
+            case UPDATE_FAILED -> 7;
+            case PREPARING_FOR_UPDATE -> 6;
+            case UPDATING -> 5;
+            case DRAINING, PURGING -> 4;
+            case UPDATED -> 3;
+            case STARTING, STOPPING -> 2;
+            default -> 1; // RUNNING, STOPPED
         };
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index f238dc1049..5b403e95f8 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -43,6 +43,7 @@ import 
org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import 
org.apache.nifi.web.api.dto.status.ConnectionStatusPredictionsSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
 import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.dto.status.FlowAnalysisRuleStatusDTO;
@@ -1084,4 +1085,18 @@ public class StatusMerger {
             target.setValidationStatus(ValidationStatus.INVALID.name());
         }
     }
+
+    public static void merge(final ConnectorStatusDTO target, final 
ConnectorStatusDTO toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setActiveThreadCount(target.getActiveThreadCount() + 
toMerge.getActiveThreadCount());
+
+        if 
(ValidationStatus.VALIDATING.name().equalsIgnoreCase(toMerge.getValidationStatus()))
 {
+            target.setValidationStatus(ValidationStatus.VALIDATING.name());
+        } else if 
(ValidationStatus.INVALID.name().equalsIgnoreCase(toMerge.getValidationStatus()))
 {
+            target.setValidationStatus(ValidationStatus.INVALID.name());
+        }
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
index da0ed151f2..21f351b7a9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectorEntityMergerTest.java
@@ -25,6 +25,7 @@ import 
org.apache.nifi.web.api.dto.ConnectorPropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.PermissionsDTO;
 import org.apache.nifi.web.api.dto.PropertyGroupConfigurationDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.ConnectorStatusDTO;
 import org.apache.nifi.web.api.entity.AllowableValueEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
 import org.junit.jupiter.api.Test;
@@ -130,6 +131,22 @@ class ConnectorEntityMergerTest {
         assertEquals(2, config.getConfigurationStepConfigurations().size());
     }
 
+    @Test
+    void testMergeConnectorStatusValidationStatusPriority() {
+        final ConnectorEntity clientEntity = 
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID");
+        final ConnectorEntity node1Entity = 
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALIDATING");
+        final ConnectorEntity node2Entity = 
createConnectorEntityWithStatus("connector1", "RUNNING", 1, "VALID");
+
+        final Map<NodeIdentifier, ConnectorEntity> entityMap = new HashMap<>();
+        entityMap.put(getNodeIdentifier("client", 8000), clientEntity);
+        entityMap.put(getNodeIdentifier("node1", 8001), node1Entity);
+        entityMap.put(getNodeIdentifier("node2", 8002), node2Entity);
+
+        ConnectorEntityMerger.merge(clientEntity, entityMap);
+
+        assertEquals("VALIDATING", 
clientEntity.getStatus().getValidationStatus());
+    }
+
     private NodeIdentifier getNodeIdentifier(final String id, final int port) {
         return new NodeIdentifier(id, "localhost", port, "localhost", port + 
1, "localhost", port + 2, port + 3, true);
     }
@@ -153,6 +170,19 @@ class ConnectorEntityMergerTest {
         return entity;
     }
 
+    private ConnectorEntity createConnectorEntityWithStatus(final String id, 
final String state, final int activeThreadCount, final String validationStatus) 
{
+        final ConnectorEntity entity = createConnectorEntity(id, state);
+
+        final ConnectorStatusDTO status = new ConnectorStatusDTO();
+        status.setId(id);
+        status.setRunStatus(state);
+        status.setActiveThreadCount(activeThreadCount);
+        status.setValidationStatus(validationStatus);
+        entity.setStatus(status);
+
+        return entity;
+    }
+
     private ConnectorEntity createConnectorEntityWithConfig(final String id, 
final String state,
                                                            final Map<String, 
List<String>> propertyAllowableValues) {
         final ConnectorEntity entity = createConnectorEntity(id, state);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index bb3bdf6e2d..8a39f4b66a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -199,6 +199,18 @@ public interface ConnectorNode extends 
ComponentAuthorizable, VersionedComponent
      */
     Future<Void> drainFlowFiles();
 
+    /**
+     * Cancels the draining of FlowFiles that is currently in progress.
+     * @throws IllegalStateException if the Connector is not currently 
draining FlowFiles
+     */
+    void cancelDrainFlowFiles();
+
+    /**
+     * Verifies that the Connector can cancel draining FlowFiles.
+     * @throws IllegalStateException if not in a state where draining 
FlowFiles can be cancelled
+     */
+    void verifyCancelDrainFlowFiles() throws IllegalStateException;
+
     /**
      * Purges all FlowFiles from the Connector, immediately dropping the data.
      *
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index afb6e37bd9..471fbe10ae 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -41,9 +41,13 @@ import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ScheduledState;
 import org.apache.nifi.flow.VersionedConfigurationStep;
 import org.apache.nifi.flow.VersionedConnectorValueReference;
+import org.apache.nifi.flow.VersionedControllerService;
 import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
@@ -95,6 +99,7 @@ public class StandardConnectorNode implements ConnectorNode {
     private final ConnectorValidationTrigger validationTrigger;
     private final boolean extensionMissing;
     private volatile boolean triggerValidation = true;
+    private final AtomicReference<CompletableFuture<Void>> drainFutureRef = 
new AtomicReference<>();
 
     private volatile FrameworkFlowContext workingFlowContext;
 
@@ -398,6 +403,7 @@ public class StandardConnectorNode implements ConnectorNode 
{
 
     @Override
     public Future<Void> stop(final FlowEngine scheduler) {
+        logger.info("Stopping {}", this);
         final CompletableFuture<Void> stopCompleteFuture = new 
CompletableFuture<>();
 
         stateTransition.setDesiredState(ConnectorState.STOPPED);
@@ -406,7 +412,7 @@ public class StandardConnectorNode implements ConnectorNode 
{
         while (!stateUpdated) {
             final ConnectorState currentState = getCurrentState();
             if (currentState == ConnectorState.STOPPED) {
-                logger.debug("{} is already {}; will not attempt to stop", 
this, currentState);
+                logger.info("{} is already stopped.", this);
                 stopCompleteFuture.complete(null);
                 return stopCompleteFuture;
             }
@@ -430,8 +436,24 @@ public class StandardConnectorNode implements 
ConnectorNode {
         requireStopped("drain FlowFiles", ConnectorState.DRAINING);
 
         try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, 
connectorDetails.getConnector().getClass(), getIdentifier())) {
-            final CompletableFuture<Void> future = 
connectorDetails.getConnector().drainFlowFiles(activeFlowContext);
-            final CompletableFuture<Void> stateUpdateFuture = 
future.whenComplete((result, failureCause) -> 
stateTransition.setCurrentState(ConnectorState.STOPPED));
+            getComponentLog().info("Draining FlowFiles from {}", this);
+            final CompletableFuture<Void> drainFuture = 
connectorDetails.getConnector().drainFlowFiles(activeFlowContext);
+            drainFutureRef.set(drainFuture);
+
+            final CompletableFuture<Void> stateUpdateFuture = 
drainFuture.whenComplete((result, failureCause) -> {
+                drainFutureRef.set(null);
+                logger.info("Successfully drained FlowFiles from {}; ensuring 
all components are stopped.", this);
+
+                try {
+                    connectorDetails.getConnector().stop(activeFlowContext);
+                } catch (final Exception e) {
+                    logger.warn("Failed to stop {} after draining FlowFiles", 
this, e);
+                }
+
+                stateTransition.setCurrentState(ConnectorState.STOPPED);
+                logger.info("All components of {} are now stopped after 
draining FlowFiles.", this);
+            });
+
             return stateUpdateFuture;
         } catch (final Throwable t) {
             stateTransition.setCurrentState(ConnectorState.STOPPED);
@@ -439,6 +461,30 @@ public class StandardConnectorNode implements 
ConnectorNode {
         }
     }
 
+    @Override
+    public void cancelDrainFlowFiles() {
+        final Future<Void> future = this.drainFutureRef.getAndSet(null);
+        if (future == null) {
+            logger.debug("No active drain to cancel for {}; drain may have 
already completed", this);
+            return;
+        }
+
+        future.cancel(true);
+        logger.info("Cancelled draining of FlowFiles for {}", this);
+    }
+
+    @Override
+    public void verifyCancelDrainFlowFiles() throws IllegalStateException {
+        final ConnectorState state = getCurrentState();
+
+        // Allow if we're currently draining or if we're stopped; if stopped 
the cancel drain action will be a no-op
+        // but we don't want to throw an IllegalStateException in that case 
because doing so would mean that if one
+        // node in the cluster is stopped while another is draining we cannot 
cancel the drain.
+        if (state != ConnectorState.DRAINING && state != 
ConnectorState.STOPPED) {
+            throw new IllegalStateException("Cannot cancel draining of 
FlowFiles for " + this + " because its current state is " + state + "; it must 
be DRAINING.");
+        }
+    }
+
     @Override
     public Future<Void> purgeFlowFiles(final String requestor) {
         requireStopped("purge FlowFiles", ConnectorState.PURGING);
@@ -610,6 +656,9 @@ public class StandardConnectorNode implements ConnectorNode 
{
             logger.info("{} has no initial flow to load", this);
         } else {
             logger.info("Loading initial flow for {}", this);
+            // Update all RUNNING components to ENABLED before applying the 
initial flow so that components
+            // are not started before being configured.
+            stopComponents(initialFlow.getFlowContents());
             initializationContext.updateFlow(activeFlowContext, initialFlow);
         }
 
@@ -617,6 +666,24 @@ public class StandardConnectorNode implements 
ConnectorNode {
         recreateWorkingFlowContext();
     }
 
+    private void stopComponents(final VersionedProcessGroup group) {
+        for (final VersionedProcessor processor : group.getProcessors()) {
+            if (processor.getScheduledState() == ScheduledState.RUNNING) {
+                processor.setScheduledState(ScheduledState.ENABLED);
+            }
+        }
+
+        for (final VersionedControllerService service : 
group.getControllerServices()) {
+            if (service.getScheduledState() == ScheduledState.RUNNING) {
+                service.setScheduledState(ScheduledState.ENABLED);
+            }
+        }
+
+        for (final VersionedProcessGroup childGroup : 
group.getProcessGroups()) {
+            stopComponents(childGroup);
+        }
+    }
+
     private void recreateWorkingFlowContext() {
         destroyWorkingContext();
         workingFlowContext = 
flowContextFactory.createWorkingFlowContext(identifier,
@@ -879,6 +946,7 @@ public class StandardConnectorNode implements ConnectorNode 
{
         actions.add(createDiscardWorkingConfigAction());
         actions.add(createPurgeFlowFilesAction(stopped, dataQueued));
         actions.add(createDrainFlowFilesAction(stopped, dataQueued));
+        actions.add(createCancelDrainFlowFilesAction(currentState == 
ConnectorState.DRAINING));
         actions.add(createApplyUpdatesAction(currentState));
         actions.add(createDeleteAction(stopped, dataQueued));
 
@@ -987,6 +1055,15 @@ public class StandardConnectorNode implements 
ConnectorNode {
         return new StandardConnectorAction(actionName, description, allowed, 
reason);
     }
 
+    private ConnectorAction createCancelDrainFlowFilesAction(final boolean 
draining) {
+        if (draining) {
+            return new StandardConnectorAction("CANCEL_DRAIN_FLOWFILES", 
"Cancel the ongoing drain of FlowFiles", true, null);
+        }
+
+        return new StandardConnectorAction("CANCEL_DRAIN_FLOWFILES", "Cancel 
the ongoing drain of FlowFiles", false,
+            "Connector is not currently draining FlowFiles");
+    }
+
     private ConnectorAction createApplyUpdatesAction(final ConnectorState 
currentState) {
         final boolean allowed;
         final String reason;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index d47cb4ed2e..40b39694fd 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -107,11 +106,14 @@ public class StandardConnectorRepository implements 
ConnectorRepository {
     @Override
     public void applyUpdate(final ConnectorNode connector, final 
ConnectorUpdateContext context) throws FlowUpdateException {
         final ConnectorState initialDesiredState = connector.getDesiredState();
+        logger.info("Applying update to Connector {}", connector);
 
         // Transition the connector's state to PREPARING_FOR_UPDATE before 
starting the background process.
         // This allows us to ensure that if we poll and see the state in the 
same state it was in before that
         // we know the update has already completed (successfully or 
otherwise).
+        logger.debug("Transitioning {} to PREPARING_FOR_UPDATE state before 
applying update", connector);
         connector.transitionStateForUpdating();
+        logger.debug("{} is now in PREPARING_FOR_UPDATE state", connector);
 
         // Update connector in a background thread. This will handle 
transitioning the Connector state appropriately
         // so that it's clear when the update has completed.
@@ -121,28 +123,24 @@ public class StandardConnectorRepository implements 
ConnectorRepository {
     private void updateConnector(final ConnectorNode connector, final 
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
         try {
             // Perform whatever preparation is necessary for the update. 
Default implementation is to stop the connector.
+            logger.debug("Preparing {} for update", connector);
             connector.prepareForUpdate();
 
             // Wait for Connector State to become UPDATING
+            logger.debug("Waiting for {} to transition to UPDATING state", 
connector);
             waitForState(connector, Set.of(ConnectorState.UPDATING), 
Set.of(ConnectorState.PREPARING_FOR_UPDATE));
 
             // Apply the update to the connector.
+            logger.info("{} has now completed preparations for update; 
applying update now", connector);
             connector.applyUpdate();
+            logger.info("{} has successfully applied update", connector);
 
             // Now that the update has been applied, save the flow so that the 
updated configuration is persisted.
             context.saveFlow();
 
-            // Wait for Connector State to become UPDATED, or to revert to the 
initial desired state because, depending upon timing,
-            // other nodes may have already seen the transition to UPDATED and 
moved the connector back to the initial desired state.
-            final Set<ConnectorState> desirableStates = new HashSet<>();
-            desirableStates.add(initialDesiredState);
-            desirableStates.add(ConnectorState.UPDATED);
-            if (initialDesiredState == ConnectorState.RUNNING) {
-                desirableStates.add(ConnectorState.STARTING);
-            } else if (initialDesiredState == ConnectorState.STOPPED) {
-                desirableStates.add(ConnectorState.STOPPING);
-            }
-            waitForState(connector, desirableStates, 
Set.of(ConnectorState.UPDATING));
+            // Wait for all nodes to complete the update.
+            waitForState(connector, Set.of(ConnectorState.UPDATED), 
Set.of(ConnectorState.UPDATING));
+            logger.info("{} has successfully completed update on all nodes", 
connector);
 
             // If the initial desired state was RUNNING, start the connector 
again. Otherwise, stop it.
             // We don't simply leave it be as the prepareForUpdate / update 
may have changed the state of some components.
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
index 16ebbc3e39..70ff4e079d 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
@@ -28,6 +28,7 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
 
 import java.util.Collections;
+import java.util.HashMap;
 
 public class StandardFlowContext implements FrameworkFlowContext {
     private final ProcessGroup managedProcessGroup;
@@ -76,6 +77,10 @@ public class StandardFlowContext implements 
FrameworkFlowContext {
 
     @Override
     public void updateFlow(final VersionedExternalFlow versionedExternalFlow, 
final AssetManager assetManager) throws FlowUpdateException {
+        if (versionedExternalFlow.getParameterContexts() == null) {
+            versionedExternalFlow.setParameterContexts(new HashMap<>());
+        }
+
         final String parameterContextName = 
managedProcessGroup.getParameterContext().getName();
         updateParameterContext(versionedExternalFlow.getFlowContents(), 
parameterContextName);
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
index 4eba662d26..d306ac45aa 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -509,6 +510,33 @@ public class TestStandardConnectorNode {
         assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState());
     }
 
+    @Test
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    public void testCancelDrainFlowFilesInterruptsConnector() throws Exception 
{
+        final BusyLoopDrainConnector busyLoopConnector = new 
BusyLoopDrainConnector();
+        final StandardConnectorNode connectorNode = 
createConnectorNode(busyLoopConnector);
+
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+        connectorNode.drainFlowFiles();
+
+        assertTrue(busyLoopConnector.awaitDrainStarted(2, TimeUnit.SECONDS));
+        assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+        Thread.sleep(1000);
+
+        assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+        assertFalse(busyLoopConnector.wasInterrupted());
+        assertFalse(busyLoopConnector.wasStopCalled());
+
+        connectorNode.cancelDrainFlowFiles();
+
+        assertTrue(busyLoopConnector.awaitDrainCompleted(2, TimeUnit.SECONDS));
+        assertTrue(busyLoopConnector.wasInterrupted());
+        assertTrue(busyLoopConnector.wasStopCalled());
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+    }
+
     private StandardConnectorNode createConnectorNode() throws 
FlowUpdateException {
         final SleepingConnector sleepingConnector = new 
SleepingConnector(Duration.ofMillis(1));
         return createConnectorNode(sleepingConnector);
@@ -727,4 +755,94 @@ public class TestStandardConnectorNode {
         }
     }
 
+    /**
+     * Test connector that uses a busy loop for draining that never completes 
naturally,
+     * but can be interrupted via cancellation.
+     */
+    private static class BusyLoopDrainConnector extends AbstractConnector {
+        private volatile boolean interrupted = false;
+        private volatile boolean stopCalled = false;
+        private final AtomicReference<Thread> drainThreadRef = new 
AtomicReference<>();
+        private final CountDownLatch drainStartedLatch = new CountDownLatch(1);
+        private final CountDownLatch drainCompletedLatch = new 
CountDownLatch(1);
+
+        @Override
+        public VersionedExternalFlow getInitialFlow() {
+            return null;
+        }
+
+        @Override
+        public void prepareForUpdate(final FlowContext workingContext, final 
FlowContext activeContext) {
+        }
+
+        @Override
+        public List<ConfigurationStep> getConfigurationSteps() {
+            return List.of();
+        }
+
+        @Override
+        public void applyUpdate(final FlowContext workingContext, final 
FlowContext activeContext) {
+        }
+
+        @Override
+        protected void onStepConfigured(final String stepName, final 
FlowContext workingContext) {
+        }
+
+        @Override
+        public List<ConfigVerificationResult> verifyConfigurationStep(final 
String stepName, final Map<String, String> overrides, final FlowContext 
flowContext) {
+            return List.of();
+        }
+
+        @Override
+        public void stop(final FlowContext activeContext) {
+            stopCalled = true;
+        }
+
+        @Override
+        public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+            final CompletableFuture<Void> future = new CompletableFuture<>();
+
+            final Thread drainThread = new Thread(() -> {
+                drainStartedLatch.countDown();
+                try {
+                    while (!Thread.currentThread().isInterrupted()) {
+                        // Busy loop that never completes naturally
+                    }
+                } finally {
+                    interrupted = Thread.currentThread().isInterrupted();
+                    drainCompletedLatch.countDown();
+                }
+            });
+
+            drainThreadRef.set(drainThread);
+
+            future.whenComplete((result, throwable) -> {
+                final Thread thread = drainThreadRef.get();
+                if (thread != null) {
+                    thread.interrupt();
+                }
+            });
+
+            drainThread.start();
+
+            return future;
+        }
+
+        public boolean awaitDrainStarted(final long timeout, final TimeUnit 
unit) throws InterruptedException {
+            return drainStartedLatch.await(timeout, unit);
+        }
+
+        public boolean awaitDrainCompleted(final long timeout, final TimeUnit 
unit) throws InterruptedException {
+            return drainCompletedLatch.await(timeout, unit);
+        }
+
+        public boolean wasInterrupted() {
+            return interrupted;
+        }
+
+        public boolean wasStopCalled() {
+            return stopCalled;
+        }
+    }
+
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 967c0e1f29..d270c0344f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -210,6 +210,14 @@ public interface NiFiServiceFacade {
 
     ConnectorEntity scheduleConnector(Revision revision, String id, 
ScheduledState state);
 
+    void verifyDrainConnector(String id);
+
+    ConnectorEntity drainConnector(Revision revision, String id);
+
+    void verifyCancelConnectorDrain(String id);
+
+    ConnectorEntity cancelConnectorDrain(Revision revision, String id);
+
     ConfigurationStepNamesEntity getConnectorConfigurationSteps(String id);
 
     ConfigurationStepEntity getConnectorConfigurationStep(String id, String 
configurationStepName);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 1826a1efd4..bee52d0446 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -81,6 +81,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.connector.Connector;
 import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
 import org.apache.nifi.components.connector.ConnectorUpdateContext;
 import org.apache.nifi.components.connector.Secret;
 import org.apache.nifi.components.connector.secrets.AuthorizableSecret;
@@ -3609,6 +3610,62 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         return entityFactory.createConnectorEntity(snapshot.getComponent(), 
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, 
operatePermissions);
     }
 
+    @Override
+    public void verifyDrainConnector(final String id) {
+        final ConnectorNode connector = connectorDAO.getConnector(id);
+        final ConnectorState currentState = connector.getCurrentState();
+        if (currentState != ConnectorState.STOPPED) {
+            throw new IllegalStateException("Cannot drain FlowFiles for 
Connector " + id + " because it is not currently stopped. Current state: " + 
currentState);
+        }
+    }
+
+    @Override
+    public ConnectorEntity drainConnector(final Revision revision, final 
String id) {
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+        final RevisionUpdate<ConnectorDTO> snapshot = 
revisionManager.updateRevision(claim, user, () -> {
+            connectorDAO.drainFlowFiles(id);
+            controllerFacade.save();
+
+            final ConnectorNode node = connectorDAO.getConnector(id);
+            final ConnectorDTO dto = dtoFactory.createConnectorDto(node);
+            final FlowModification lastMod = new 
FlowModification(revision.incrementRevision(revision.getClientId()), 
user.getIdentity());
+            return new StandardRevisionUpdate<>(dto, lastMod);
+        });
+
+        final ConnectorNode node = 
connectorDAO.getConnector(snapshot.getComponent().getId());
+        final PermissionsDTO permissions = 
dtoFactory.createPermissionsDto(node);
+        final PermissionsDTO operatePermissions = 
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
+        return entityFactory.createConnectorEntity(snapshot.getComponent(), 
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, 
operatePermissions);
+    }
+
+    @Override
+    public void verifyCancelConnectorDrain(final String id) {
+        connectorDAO.verifyCancelDrainFlowFile(id);
+    }
+
+    @Override
+    public ConnectorEntity cancelConnectorDrain(final Revision revision, final 
String id) {
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final RevisionClaim claim = new StandardRevisionClaim(revision);
+
+        final RevisionUpdate<ConnectorDTO> snapshot = 
revisionManager.updateRevision(claim, user, () -> {
+            connectorDAO.cancelDrainFlowFiles(id);
+            controllerFacade.save();
+
+            final ConnectorNode node = connectorDAO.getConnector(id);
+            final ConnectorDTO dto = dtoFactory.createConnectorDto(node);
+            final FlowModification lastMod = new 
FlowModification(revision.incrementRevision(revision.getClientId()), 
user.getIdentity());
+            return new StandardRevisionUpdate<>(dto, lastMod);
+        });
+
+        final ConnectorNode node = 
connectorDAO.getConnector(snapshot.getComponent().getId());
+        final PermissionsDTO permissions = 
dtoFactory.createPermissionsDto(node);
+        final PermissionsDTO operatePermissions = 
dtoFactory.createPermissionsDto(new OperationAuthorizable(node));
+        return entityFactory.createConnectorEntity(snapshot.getComponent(), 
dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, 
operatePermissions);
+    }
+
     @Override
     public ConfigurationStepNamesEntity getConnectorConfigurationSteps(final 
String id) {
         final ConnectorNode node = connectorDAO.getConnector(id);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index cb5146afdb..f7cd6c50f0 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -622,6 +622,155 @@ public class ConnectorResource extends 
ApplicationResource {
         );
     }
 
+    /**
+     * Initiates the draining of FlowFiles for the specified connector.
+     *
+     * @param id The id of the connector to drain.
+     * @param requestConnectorEntity A connectorEntity containing the revision.
+     * @return A connectorEntity.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/drain")
+    @Operation(
+            summary = "Initiates draining of FlowFiles for a connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = ConnectorEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            description = "This will initiate draining of FlowFiles for a 
stopped connector. Draining allows the connector to process " +
+                    "data that is currently in the flow but does not ingest 
any additional data. The connector must be in a STOPPED state " +
+                    "before draining can begin. Once initiated, the connector 
will transition to a DRAINING state. Use the DELETE method " +
+                    "on this endpoint to cancel an ongoing drain operation.",
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid} or 
/operation/connectors/{uuid}")
+            }
+    )
+    public Response initiateDrain(
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String id,
+            @Parameter(
+                    description = "The connector entity with revision.",
+                    required = true
+            ) final ConnectorEntity requestConnectorEntity) {
+
+        if (requestConnectorEntity == null || 
requestConnectorEntity.getRevision() == null) {
+            throw new IllegalArgumentException("Connector entity with revision 
must be specified.");
+        }
+
+        if (requestConnectorEntity.getId() != null && 
!id.equals(requestConnectorEntity.getId())) {
+            throw new IllegalArgumentException(String.format("The connector id 
(%s) in the request body does not equal the "
+                    + "connector id of the requested resource (%s).", 
requestConnectorEntity.getId(), id));
+        }
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.POST, requestConnectorEntity);
+        } else if (isDisconnectedFromCluster()) {
+            
verifyDisconnectedNodeModification(requestConnectorEntity.isDisconnectedNodeAcknowledged());
+        }
+
+        final Revision requestRevision = getRevision(requestConnectorEntity, 
id);
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                requestRevision,
+                lookup -> {
+                    final NiFiUser user = NiFiUserUtils.getNiFiUser();
+                    final Authorizable connector = lookup.getConnector(id);
+                    OperationAuthorizable.authorizeOperation(connector, 
authorizer, user);
+                },
+                () -> serviceFacade.verifyDrainConnector(id),
+                (revision, connectorEntity) -> {
+                    final ConnectorEntity entity = 
serviceFacade.drainConnector(revision, id);
+                    populateRemainingConnectorEntityContent(entity);
+
+                    return generateOkResponse(entity).build();
+                }
+        );
+    }
+
+    /**
+     * Cancels the draining of FlowFiles for the specified connector.
+     *
+     * @param version The revision is used to verify the client is working 
with the latest version of the flow.
+     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
+     * @param id The id of the connector to cancel draining for.
+     * @return A connectorEntity.
+     */
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("/{id}/drain")
+    @Operation(
+            summary = "Cancels the draining of FlowFiles for a connector",
+            responses = {
+                    @ApiResponse(responseCode = "200", content = 
@Content(schema = @Schema(implementation = ConnectorEntity.class))),
+                    @ApiResponse(responseCode = "400", description = "NiFi was 
unable to complete the request because it was invalid. The request should not 
be retried without modification."),
+                    @ApiResponse(responseCode = "401", description = "Client 
could not be authenticated."),
+                    @ApiResponse(responseCode = "403", description = "Client 
is not authorized to make this request."),
+                    @ApiResponse(responseCode = "404", description = "The 
specified resource could not be found."),
+                    @ApiResponse(responseCode = "409", description = "The 
request was valid but NiFi was not in the appropriate state to process it.")
+            },
+            security = {
+                    @SecurityRequirement(name = "Write - /connectors/{uuid} or 
/operation/connectors/{uuid}")
+            }
+    )
+    public Response cancelDrain(
+            @Parameter(
+                    description = "The revision is used to verify the client 
is working with the latest version of the flow."
+            )
+            @QueryParam(VERSION) final LongParameter version,
+            @Parameter(
+                    description = "If the client id is not specified, new one 
will be generated. This value (whether specified or generated) is included in 
the response."
+            )
+            @QueryParam(CLIENT_ID) final ClientIdParameter clientId,
+            @Parameter(
+                    description = "Acknowledges that this node is disconnected 
to allow for mutable requests to proceed."
+            )
+            @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") 
final Boolean disconnectedNodeAcknowledged,
+            @Parameter(
+                    description = "The connector id.",
+                    required = true
+            )
+            @PathParam("id") final String id) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.DELETE);
+        } else if (isDisconnectedFromCluster()) {
+            verifyDisconnectedNodeModification(disconnectedNodeAcknowledged);
+        }
+
+        final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
+        requestConnectorEntity.setId(id);
+
+        final Revision requestRevision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
+        return withWriteLock(
+                serviceFacade,
+                requestConnectorEntity,
+                requestRevision,
+                lookup -> {
+                    final NiFiUser user = NiFiUserUtils.getNiFiUser();
+                    final Authorizable connector = lookup.getConnector(id);
+                    OperationAuthorizable.authorizeOperation(connector, 
authorizer, user);
+                },
+                () -> serviceFacade.verifyCancelConnectorDrain(id),
+                (revision, connectorEntity) -> {
+                    final ConnectorEntity entity = 
serviceFacade.cancelConnectorDrain(revision, id);
+                    populateRemainingConnectorEntityContent(entity);
+
+                    return generateOkResponse(entity).build();
+                }
+        );
+    }
+
     /**
      * Gets the configuration step names for the specified connector.
      *
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
index a987dde126..d094f18ca9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
@@ -45,6 +45,12 @@ public interface ConnectorDAO {
 
     void stopConnector(String id);
 
+    void drainFlowFiles(String id);
+
+    void cancelDrainFlowFiles(String id);
+
+    void verifyCancelDrainFlowFile(String id);
+
     void updateConnectorConfigurationStep(String id, String 
configurationStepName, ConfigurationStepConfigurationDTO 
configurationStepConfiguration);
 
     void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index e55e6ceee0..19b58dd207 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -127,6 +127,24 @@ public class StandardConnectorDAO implements ConnectorDAO {
         getConnectorRepository().stopConnector(connector);
     }
 
+    @Override
+    public void drainFlowFiles(final String id) {
+        final ConnectorNode connector = getConnector(id);
+        connector.drainFlowFiles();
+    }
+
+    @Override
+    public void cancelDrainFlowFiles(final String id) {
+        final ConnectorNode connector = getConnector(id);
+        connector.cancelDrainFlowFiles();
+    }
+
+    @Override
+    public void verifyCancelDrainFlowFile(final String id) {
+        final ConnectorNode connector = getConnector(id);
+        connector.verifyCancelDrainFlowFiles();
+    }
+
     @Override
     public void updateConnectorConfigurationStep(final String id, final String 
configurationStepName, final ConfigurationStepConfigurationDTO 
configurationStepDto) {
         final ConnectorNode connector = getConnector(id);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
index 17678723c7..60702b8b23 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestConnectorResource.java
@@ -412,12 +412,72 @@ public class TestConnectorResource {
         
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
 
         assertThrows(AccessDeniedException.class, () ->
-                
connectorResource.getControllerServicesFromConnectorProcessGroup(CONNECTOR_ID, 
PROCESS_GROUP_ID, true, false, true));
+            
connectorResource.getControllerServicesFromConnectorProcessGroup(CONNECTOR_ID, 
PROCESS_GROUP_ID, true, false, true));
 
         verify(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
         verify(serviceFacade, 
never()).getConnectorControllerServices(anyString(), anyString(), eq(true), 
eq(false), eq(true));
     }
 
+    @Test
+    public void testInitiateDrain() {
+        final ConnectorEntity requestEntity = createConnectorEntity();
+        final ConnectorEntity responseEntity = createConnectorEntity();
+        responseEntity.getComponent().setState("DRAINING");
+
+        when(serviceFacade.drainConnector(any(Revision.class), 
eq(CONNECTOR_ID))).thenReturn(responseEntity);
+
+        try (Response response = connectorResource.initiateDrain(CONNECTOR_ID, 
requestEntity)) {
+            assertEquals(200, response.getStatus());
+            assertEquals(responseEntity, response.getEntity());
+        }
+
+        verify(serviceFacade).verifyDrainConnector(CONNECTOR_ID);
+        verify(serviceFacade).drainConnector(any(Revision.class), 
eq(CONNECTOR_ID));
+    }
+
+    @Test
+    public void testInitiateDrainNotAuthorized() {
+        final ConnectorEntity requestEntity = createConnectorEntity();
+
+        
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));
+
+        assertThrows(AccessDeniedException.class, () ->
+            connectorResource.initiateDrain(CONNECTOR_ID, requestEntity));
+
+        verify(serviceFacade, never()).verifyDrainConnector(anyString());
+        verify(serviceFacade, never()).drainConnector(any(Revision.class), 
anyString());
+    }
+
+    @Test
+    public void testInitiateDrainWithNullEntity() {
+        assertThrows(IllegalArgumentException.class, () ->
+            connectorResource.initiateDrain(CONNECTOR_ID, null));
+
+        verify(serviceFacade, never()).drainConnector(any(Revision.class), 
anyString());
+    }
+
+    @Test
+    public void testInitiateDrainWithNullRevision() {
+        final ConnectorEntity requestEntity = createConnectorEntity();
+        requestEntity.setRevision(null);
+
+        assertThrows(IllegalArgumentException.class, () ->
+            connectorResource.initiateDrain(CONNECTOR_ID, requestEntity));
+
+        verify(serviceFacade, never()).drainConnector(any(Revision.class), 
anyString());
+    }
+
+    @Test
+    public void testInitiateDrainWithMismatchedId() {
+        final ConnectorEntity requestEntity = createConnectorEntity();
+        requestEntity.setId("different-id");
+
+        assertThrows(IllegalArgumentException.class, () ->
+            connectorResource.initiateDrain(CONNECTOR_ID, requestEntity));
+
+        verify(serviceFacade, never()).drainConnector(any(Revision.class), 
anyString());
+    }
+
     private ConnectorEntity createConnectorEntity() {
         final ConnectorEntity entity = new ConnectorEntity();
 
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
index 920c61480a..987b7e812d 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
@@ -50,7 +50,8 @@ public class DataQueuingConnector extends AbstractConnector {
         bundle.setVersion("2.8.0-SNAPSHOT");
 
         final VersionedProcessor generate = createVersionedProcessor("gen-1", 
"1234", "GenerateFlowFile",
-            "org.apache.nifi.processors.tests.system.GenerateFlowFile", 
bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING);
+            "org.apache.nifi.processors.tests.system.GenerateFlowFile", 
bundle, Map.of("File Size", "1 KB"), ScheduledState.ENABLED);
+        generate.setSchedulingPeriod("100 millis");
 
         final VersionedProcessor terminate = 
createVersionedProcessor("term-1", "1234", "TerminateFlowFile",
             "org.apache.nifi.processors.tests.system.TerminateFlowFile", 
bundle, Collections.emptyMap(), ScheduledState.DISABLED);
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
similarity index 64%
copy from 
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
copy to 
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
index 920c61480a..857e77234f 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
@@ -20,6 +20,10 @@ package org.apache.nifi.connectors.tests.system;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.connector.AbstractConnector;
 import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.ConnectorPropertyDescriptor;
+import org.apache.nifi.components.connector.ConnectorPropertyGroup;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.PropertyType;
 import org.apache.nifi.components.connector.components.FlowContext;
 import org.apache.nifi.flow.Bundle;
 import org.apache.nifi.flow.ConnectableComponent;
@@ -30,16 +34,47 @@ import org.apache.nifi.flow.VersionedConnection;
 import org.apache.nifi.flow.VersionedExternalFlow;
 import org.apache.nifi.flow.VersionedProcessGroup;
 import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class DataQueuingConnector extends AbstractConnector {
+/**
+ * A connector that generates FlowFiles and allows controlling when the 
TerminateFlowFile processor
+ * will process them via a gate file. If the gate file does not exist, 
FlowFiles will queue up.
+ * When the gate file exists, the TerminateFlowFile processor will process the 
queued FlowFiles.
+ * <p>
+ * The connector uses a configuration step to specify the gate file path. The 
test must configure
+ * the connector with a gate file path and apply the update before starting 
the connector.
+ */
+public class GatedDataQueuingConnector extends AbstractConnector {
+
+    private static final String TERMINATE_PROCESSOR_ID = "term-1";
+
+    static final ConnectorPropertyDescriptor GATE_FILE_PATH = new 
ConnectorPropertyDescriptor.Builder()
+            .name("Gate File Path")
+            .description("The path to the gate file. When this file exists, 
the TerminateFlowFile processor " +
+                    "will process FlowFiles. When it does not exist, FlowFiles 
will queue up.")
+            .required(true)
+            .type(PropertyType.STRING)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private static final ConnectorPropertyGroup PROPERTY_GROUP = new 
ConnectorPropertyGroup.Builder()
+            .name("Gate Configuration")
+            .addProperty(GATE_FILE_PATH)
+            .build();
+
+    private static final ConfigurationStep CONFIG_STEP = new 
ConfigurationStep.Builder()
+            .name("Gate Configuration")
+            .propertyGroups(List.of(PROPERTY_GROUP))
+            .build();
+
     @Override
     protected void onStepConfigured(final String stepName, final FlowContext 
workingContext) {
-
     }
 
     @Override
@@ -47,13 +82,16 @@ public class DataQueuingConnector extends AbstractConnector 
{
         final Bundle bundle = new Bundle();
         bundle.setGroup("org.apache.nifi");
         bundle.setArtifact("nifi-system-test-extensions-nar");
-        bundle.setVersion("2.8.0-SNAPSHOT");
+        bundle.setVersion("2.7.0-SNAPSHOT");
 
         final VersionedProcessor generate = createVersionedProcessor("gen-1", 
"1234", "GenerateFlowFile",
-            "org.apache.nifi.processors.tests.system.GenerateFlowFile", 
bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING);
+            "org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle,
+            Map.of("File Size", "1 KB"), ScheduledState.ENABLED);
+        generate.setSchedulingPeriod("100 millis");
 
-        final VersionedProcessor terminate = 
createVersionedProcessor("term-1", "1234", "TerminateFlowFile",
-            "org.apache.nifi.processors.tests.system.TerminateFlowFile", 
bundle, Collections.emptyMap(), ScheduledState.DISABLED);
+        final VersionedProcessor terminate = 
createVersionedProcessor(TERMINATE_PROCESSOR_ID, "1234", "TerminateFlowFile",
+            "org.apache.nifi.processors.tests.system.TerminateFlowFile", 
bundle,
+            Collections.emptyMap(), ScheduledState.ENABLED);
 
         final ConnectableComponent source = new ConnectableComponent();
         source.setId(generate.getIdentifier());
@@ -80,7 +118,7 @@ public class DataQueuingConnector extends AbstractConnector {
         connection.setzIndex(1L);
 
         final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
-        rootGroup.setName("Data Queuing Connector");
+        rootGroup.setName("Gated Data Queuing Connector");
         rootGroup.setIdentifier("1234");
         rootGroup.setProcessors(Set.of(generate, terminate));
         rootGroup.setConnections(Set.of(connection));
@@ -98,11 +136,28 @@ public class DataQueuingConnector extends 
AbstractConnector {
 
     @Override
     public List<ConfigurationStep> getConfigurationSteps() {
-        return List.of();
+        return List.of(CONFIG_STEP);
     }
 
     @Override
-    public void applyUpdate(final FlowContext workingFlowContext, final 
FlowContext activeFlowContext) {
+    public void applyUpdate(final FlowContext workingFlowContext, final 
FlowContext activeFlowContext) throws FlowUpdateException {
+        final String gateFilePath = 
workingFlowContext.getConfigurationContext().getProperty(CONFIG_STEP, 
GATE_FILE_PATH).getValue();
+        if (gateFilePath == null) {
+            return;
+        }
+
+        final VersionedExternalFlow flow = getInitialFlow();
+        final VersionedProcessGroup rootGroup = flow.getFlowContents();
+
+        for (final VersionedProcessor processor : rootGroup.getProcessors()) {
+            if (TERMINATE_PROCESSOR_ID.equals(processor.getIdentifier())) {
+                final Map<String, String> properties = new 
HashMap<>(processor.getProperties());
+                properties.put("Gate File", gateFilePath);
+                processor.setProperties(properties);
+            }
+        }
+
+        getInitializationContext().updateFlow(activeFlowContext, flow);
     }
 
     private VersionedProcessor createVersionedProcessor(final String 
identifier, final String groupIdentifier, final String name,
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
index 734766fa37..9ab6fd71b1 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/TerminateFlowFile.java
@@ -16,15 +16,43 @@
  */
 package org.apache.nifi.processors.tests.system;
 
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.util.List;
 
 public class TerminateFlowFile extends AbstractProcessor {
+
+    public static final PropertyDescriptor GATE_FILE = new 
PropertyDescriptor.Builder()
+            .name("Gate File")
+            .description("An optional file path. If specified, the processor 
will only process FlowFiles when this file exists. " +
+                    "If the file does not exist, the processor will yield and 
return without processing any data.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return List.of(GATE_FILE);
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String gateFilePath = context.getProperty(GATE_FILE).getValue();
+        if (gateFilePath != null) {
+            final File gateFile = new File(gateFilePath);
+            if (!gateFile.exists()) {
+                context.yield();
+                return;
+            }
+        }
+
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index 34fe7c90ea..3885e4c079 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -16,5 +16,6 @@
 org.apache.nifi.connectors.tests.system.NopConnector
 org.apache.nifi.connectors.tests.system.AssetConnector
 org.apache.nifi.connectors.tests.system.DataQueuingConnector
+org.apache.nifi.connectors.tests.system.GatedDataQueuingConnector
 org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
 org.apache.nifi.connectors.tests.system.CalculateConnector
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 51272e8b8e..1a3b91c366 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -406,6 +406,16 @@ public class NiFiClientUtil {
         }
     }
 
+    public void startConnector(final ConnectorEntity connectorEntity) throws 
NiFiClientException, IOException, InterruptedException {
+        startConnector(connectorEntity.getId());
+    }
+
+    public void startConnector(final String connectorId) throws 
NiFiClientException, IOException, InterruptedException {
+        final ConnectorEntity entity = 
getConnectorClient().getConnector(connectorId);
+        getConnectorClient().startConnector(entity);
+        waitForConnectorState(connectorId, ConnectorState.RUNNING);
+    }
+
     public void stopConnector(final ConnectorEntity connectorEntity) throws 
NiFiClientException, IOException, InterruptedException {
         stopConnector(connectorEntity.getId());
     }
@@ -437,6 +447,20 @@ public class NiFiClientUtil {
         }
     }
 
+    public ConnectorEntity drainConnector(final String connectorId) throws 
NiFiClientException, IOException {
+        final ConnectorEntity entity = 
getConnectorClient().getConnector(connectorId);
+        return getConnectorClient().drainConnector(entity);
+    }
+
+    public ConnectorEntity cancelDrain(final String connectorId) throws 
NiFiClientException, IOException {
+        final ConnectorEntity entity = 
getConnectorClient().getConnector(connectorId);
+        return getConnectorClient().cancelDrain(entity);
+    }
+
+    public void waitForConnectorDraining(final String connectorId) throws 
NiFiClientException, IOException, InterruptedException {
+        waitForConnectorState(connectorId, ConnectorState.DRAINING);
+    }
+
     public ParameterProviderEntity createParameterProvider(final String 
simpleTypeName) throws NiFiClientException, IOException {
         return 
createParameterProvider(NiFiSystemIT.TEST_PARAM_PROVIDERS_PACKAGE + "." + 
simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, 
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
     }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index b73814250f..93614725e7 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -468,6 +468,29 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
         logger.info("Queue Count for Connection {} is now {}", connectionId, 
queueSize);
     }
 
+    protected void waitForConnectorMinQueueCount(final String connectorId, 
final int minQueueSize) throws InterruptedException {
+        logger.info("Waiting for Queue Count of at least {} in Connector {}", 
minQueueSize, connectorId);
+
+        waitFor(() -> {
+            try {
+                final ProcessGroupStatusEntity statusEntity = 
getNifiClient().getConnectorClient().getStatus(connectorId, true);
+                final int currentSize = 
statusEntity.getProcessGroupStatus().getAggregateSnapshot().getFlowFilesQueued();
+                logEverySecond("Current Queue Size for Connector {} = {}, 
Waiting for at least {}", connectorId, currentSize, minQueueSize);
+                return currentSize >= minQueueSize;
+            } catch (final Exception e) {
+                logger.error("Failed to get connector queue count", e);
+                return false;
+            }
+        });
+
+        logger.info("Queue Count for Connector {} is now at least {}", 
connectorId, minQueueSize);
+    }
+
+    protected int getConnectorQueuedFlowFileCount(final String connectorId) 
throws NiFiClientException, IOException {
+        final ProcessGroupStatusEntity statusEntity = 
getNifiClient().getConnectorClient().getStatus(connectorId, true);
+        return 
statusEntity.getProcessGroupStatus().getAggregateSnapshot().getFlowFilesQueued();
+    }
+
     private void waitForQueueCountToMatch(final String connectionId, final 
Predicate<Integer> test, final String queueSizeDescription) throws 
InterruptedException {
         waitFor(() -> {
             final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connectionId);
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java
new file mode 100644
index 0000000000..9ee3fbd09b
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorDrainIT.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Cluster-specific tests for connector draining functionality.
+ * These tests verify that draining works correctly when nodes complete 
draining at different times.
+ */
+public class ClusteredConnectorDrainIT extends NiFiSystemIT {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusteredConnectorDrainIT.class);
+
+    @Override
+    public NiFiInstanceFactory getInstanceFactory() {
+        return createTwoNodeInstanceFactory();
+    }
+
+    /**
+     * Tests that when draining in a cluster:
+     * 1. Create gate file on Node 1 only - Node 1 finishes draining 
(STOPPED), Node 2 still DRAINING
+     * 2. Aggregate state should be DRAINING (since at least one node is still 
draining)
+     * 3. Create gate file on Node 2 - both nodes finish draining
+     * 4. Aggregate state should be STOPPED
+     */
+    @Test
+    public void testDrainWithNodeCompletingAtDifferentTimes() throws 
NiFiClientException, IOException, InterruptedException {
+        final File node1InstanceDir = 
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
+        final File node2InstanceDir = 
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
+        final File node1GateFile = new File(node1InstanceDir, "gate-file.txt");
+        final File node2GateFile = new File(node2InstanceDir, "gate-file.txt");
+        node1GateFile.deleteOnExit();
+        node2GateFile.deleteOnExit();
+        deleteIfExists(node1GateFile);
+        deleteIfExists(node2GateFile);
+
+        logger.info("Creating GatedDataQueuingConnector");
+        final ConnectorEntity connector = 
getClientUtil().createConnector("GatedDataQueuingConnector");
+        assertNotNull(connector);
+        final String connectorId = connector.getId();
+
+        final String gateFilePath = "./gate-file.txt";
+        logger.info("Configuring connector {} with gate file path: {}", 
connectorId, gateFilePath);
+        getClientUtil().configureConnector(connector, "Gate Configuration", 
Map.of("Gate File Path", gateFilePath));
+        getClientUtil().applyConnectorUpdate(connector);
+
+        logger.info("Starting connector {}", connectorId);
+        getClientUtil().startConnector(connectorId);
+
+        logger.info("Waiting for at least 5 FlowFiles to queue");
+        waitForConnectorMinQueueCount(connectorId, 5);
+
+        logger.info("Stopping connector {}", connectorId);
+        getClientUtil().stopConnector(connectorId);
+        getClientUtil().waitForConnectorStopped(connectorId);
+
+        final int queuedCountBeforeDrain = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Queued FlowFile count before drain: {}", 
queuedCountBeforeDrain);
+        assertTrue(queuedCountBeforeDrain > 0);
+
+        logger.info("Initiating drain for connector {}", connectorId);
+        getClientUtil().drainConnector(connectorId);
+
+        logger.info("Waiting for aggregate connector state to be DRAINING");
+        getClientUtil().waitForConnectorDraining(connectorId);
+
+        logger.info("Creating gate file for Node 1 only: {}", 
node1GateFile.getAbsolutePath());
+        assertTrue(node1GateFile.createNewFile());
+
+        logger.info("Waiting for Node 1 to finish draining and become 
STOPPED");
+        waitForNodeConnectorState(1, connectorId, ConnectorState.STOPPED);
+        waitForNodeConnectorState(2, connectorId, ConnectorState.DRAINING);
+
+        logger.info("Verifying aggregate state is still DRAINING (Node 2 still 
draining)");
+        final ConnectorEntity aggregateConnector = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        assertEquals(ConnectorState.DRAINING.name(), 
aggregateConnector.getComponent().getState());
+
+        logger.info("Creating gate file for Node 2: {}", 
node2GateFile.getAbsolutePath());
+        assertTrue(node2GateFile.createNewFile());
+
+        logger.info("Waiting for aggregate state to become STOPPED");
+        waitFor(() -> {
+            try {
+                final ConnectorEntity entity = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+                return 
ConnectorState.STOPPED.name().equals(entity.getComponent().getState());
+            } catch (final Exception e) {
+                return false;
+            }
+        });
+
+        final ConnectorEntity finalConnector = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        logger.info("Final aggregate connector state: {}", 
finalConnector.getComponent().getState());
+        assertEquals(ConnectorState.STOPPED.name(), 
finalConnector.getComponent().getState());
+
+        final int finalQueuedCount = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Final queued FlowFile count: {}", finalQueuedCount);
+        assertEquals(0, finalQueuedCount);
+
+        logger.info("testDrainWithNodeCompletingAtDifferentTimes completed 
successfully");
+    }
+
+    /**
+     * Tests that when canceling drain in a cluster where one node has already 
finished:
+     * 1. Create gate file on Node 1 only - Node 1 finishes draining 
(STOPPED), Node 2 still DRAINING
+     * 2. Aggregate state should be DRAINING
+     * 3. Cancel drain - Node 2 should stop draining
+     * 4. Aggregate state should be STOPPED
+     * 5. Data should still be queued (from Node 2)
+     */
+    @Test
+    public void testCancelDrainWithOneNodeAlreadyComplete() throws 
NiFiClientException, IOException, InterruptedException {
+        final File node1InstanceDir = 
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
+        final File node2InstanceDir = 
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
+        final File node1GateFile = new File(node1InstanceDir, 
"gate-file-cancel.txt");
+        final File node2GateFile = new File(node2InstanceDir, 
"gate-file-cancel.txt");
+        node1GateFile.deleteOnExit();
+        node2GateFile.deleteOnExit();
+        deleteIfExists(node1GateFile);
+        deleteIfExists(node2GateFile);
+
+        logger.info("Creating GatedDataQueuingConnector");
+        final ConnectorEntity connector = 
getClientUtil().createConnector("GatedDataQueuingConnector");
+        assertNotNull(connector);
+        final String connectorId = connector.getId();
+
+        final String gateFilePath = "./gate-file-cancel.txt";
+        logger.info("Configuring connector {} with gate file path: {}", 
connectorId, gateFilePath);
+        getClientUtil().configureConnector(connector, "Gate Configuration", 
Map.of("Gate File Path", gateFilePath));
+        getClientUtil().applyConnectorUpdate(connector);
+
+        logger.info("Starting connector {}", connectorId);
+        getClientUtil().startConnector(connectorId);
+
+        logger.info("Waiting for at least 5 FlowFiles to queue");
+        waitForConnectorMinQueueCount(connectorId, 5);
+
+        logger.info("Stopping connector {}", connectorId);
+        getClientUtil().stopConnector(connectorId);
+        getClientUtil().waitForConnectorStopped(connectorId);
+
+        final int queuedCountBeforeDrain = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Queued FlowFile count before drain: {}", 
queuedCountBeforeDrain);
+        assertTrue(queuedCountBeforeDrain > 0);
+
+        logger.info("Initiating drain for connector {}", connectorId);
+        getClientUtil().drainConnector(connectorId);
+
+        logger.info("Waiting for aggregate connector state to be DRAINING");
+        getClientUtil().waitForConnectorDraining(connectorId);
+
+        logger.info("Creating gate file for Node 1 only: {}", 
node1GateFile.getAbsolutePath());
+        assertTrue(node1GateFile.createNewFile());
+
+        logger.info("Waiting for Node 1 to finish draining and become 
STOPPED");
+        waitForNodeConnectorState(1, connectorId, ConnectorState.STOPPED);
+
+        logger.info("Verifying aggregate state is still DRAINING (Node 2 still 
draining)");
+        final ConnectorEntity aggregateBeforeCancel = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        assertEquals(ConnectorState.DRAINING.name(), 
aggregateBeforeCancel.getComponent().getState());
+
+        logger.info("Canceling drain for connector {}", connectorId);
+        getClientUtil().cancelDrain(connectorId);
+
+        logger.info("Waiting for aggregate state to become STOPPED after 
cancel");
+        getClientUtil().waitForConnectorStopped(connectorId);
+
+        final ConnectorEntity finalConnector = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        logger.info("Final aggregate connector state: {}", 
finalConnector.getComponent().getState());
+        assertEquals(ConnectorState.STOPPED.name(), 
finalConnector.getComponent().getState());
+
+        final int finalQueuedCount = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Final queued FlowFile count after cancel (should still 
have data from Node 2): {}", finalQueuedCount);
+        assertTrue(finalQueuedCount > 0);
+
+        logger.info("testCancelDrainWithOneNodeAlreadyComplete completed 
successfully");
+    }
+
+    private void waitForNodeConnectorState(final int nodeIndex, final String 
connectorId, final ConnectorState expectedState) throws InterruptedException {
+        logger.info("Waiting for Node {} connector {} to reach state {}", 
nodeIndex, connectorId, expectedState);
+        waitFor(() -> {
+            try {
+                switchClientToNode(nodeIndex);
+                final ConnectorEntity entity = 
getNifiClient().getConnectorClient(DO_NOT_REPLICATE).getConnector(connectorId);
+                return 
expectedState.name().equals(entity.getComponent().getState());
+            } catch (final Exception e) {
+                return false;
+            }
+        });
+        logger.info("Node {} connector {} reached state {}", nodeIndex, 
connectorId, expectedState);
+    }
+
+    private void deleteIfExists(final File file) {
+        if (file.exists()) {
+            assertTrue(file.delete());
+        }
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
new file mode 100644
index 0000000000..eef6c4bf97
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConnectorDrainIT extends NiFiSystemIT {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ConnectorDrainIT.class);
+
+    @Test
+    public void testDrainFlowFiles() throws NiFiClientException, IOException, 
InterruptedException {
+        final File gateFile = new 
File(getNiFiInstance().getInstanceDirectory(), "gate-file.txt");
+        gateFile.deleteOnExit();
+
+        if (gateFile.exists()) {
+            assertTrue(gateFile.delete());
+        }
+
+        logger.info("Creating GatedDataQueuingConnector");
+        final ConnectorEntity connector = 
getClientUtil().createConnector("GatedDataQueuingConnector");
+        assertNotNull(connector);
+        final String connectorId = connector.getId();
+
+        logger.info("Configuring connector {} with gate file path: {}", 
connectorId, gateFile.getAbsolutePath());
+        getClientUtil().configureConnector(connector, "Gate Configuration", 
Map.of("Gate File Path", gateFile.getAbsolutePath()));
+        getClientUtil().applyConnectorUpdate(connector);
+
+        logger.info("Starting connector {}", connectorId);
+        getClientUtil().startConnector(connectorId);
+
+        logger.info("Waiting for at least 5 FlowFiles to queue");
+        waitForConnectorMinQueueCount(connectorId, 5);
+
+        logger.info("Stopping connector {}", connectorId);
+        getClientUtil().stopConnector(connectorId);
+        getClientUtil().waitForConnectorStopped(connectorId);
+
+        final int queuedCountBeforeDrain = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Queued FlowFile count before drain: {}", 
queuedCountBeforeDrain);
+        assertTrue(queuedCountBeforeDrain > 0);
+
+        logger.info("Initiating drain for connector {}", connectorId);
+        getClientUtil().drainConnector(connectorId);
+
+        logger.info("Waiting for connector to enter DRAINING state");
+        getClientUtil().waitForConnectorDraining(connectorId);
+
+        logger.info("Sleeping for 2 seconds to verify connector remains in 
DRAINING state");
+        Thread.sleep(2000L);
+
+        ConnectorEntity drainingConnector = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        logger.info("Connector state after 2 seconds: {}", 
drainingConnector.getComponent().getState());
+        assertEquals(ConnectorState.DRAINING.name(), 
drainingConnector.getComponent().getState());
+
+        final int queuedWhileDraining = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Queued FlowFile count while draining (gate file absent): 
{}", queuedWhileDraining);
+        assertTrue(queuedWhileDraining > 0);
+
+        logger.info("Creating gate file to allow draining to complete: {}", 
gateFile.getAbsolutePath());
+        assertTrue(gateFile.createNewFile());
+
+        logger.info("Waiting for connector to complete draining and return to 
STOPPED state");
+        waitFor(() -> {
+            try {
+                final ConnectorEntity entity = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+                return 
ConnectorState.STOPPED.name().equals(entity.getComponent().getState());
+            } catch (final Exception e) {
+                return false;
+            }
+        });
+
+        final ConnectorEntity finalConnector = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        logger.info("Final connector state: {}", 
finalConnector.getComponent().getState());
+        assertEquals(ConnectorState.STOPPED.name(), 
finalConnector.getComponent().getState());
+
+        final int finalQueuedCount = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Final queued FlowFile count: {}", finalQueuedCount);
+        assertEquals(0, finalQueuedCount);
+
+        logger.info("testDrainFlowFiles completed successfully");
+    }
+
+    @Test
+    public void testCancelDrainFlowFiles() throws NiFiClientException, 
IOException, InterruptedException {
+        final File gateFile = new 
File(getNiFiInstance().getInstanceDirectory(), "gate-file-cancel.txt");
+        gateFile.deleteOnExit();
+
+        if (gateFile.exists()) {
+            assertTrue(gateFile.delete());
+        }
+
+        logger.info("Creating GatedDataQueuingConnector");
+        final ConnectorEntity connector = 
getClientUtil().createConnector("GatedDataQueuingConnector");
+        assertNotNull(connector);
+        final String connectorId = connector.getId();
+
+        logger.info("Configuring connector {} with gate file path: {}", 
connectorId, gateFile.getAbsolutePath());
+        getClientUtil().configureConnector(connector, "Gate Configuration", 
Map.of("Gate File Path", gateFile.getAbsolutePath()));
+        getClientUtil().applyConnectorUpdate(connector);
+
+        logger.info("Starting connector {}", connectorId);
+        getClientUtil().startConnector(connectorId);
+
+        logger.info("Waiting for at least 5 FlowFiles to queue");
+        waitForConnectorMinQueueCount(connectorId, 5);
+
+        logger.info("Stopping connector {}", connectorId);
+        getClientUtil().stopConnector(connectorId);
+        getClientUtil().waitForConnectorStopped(connectorId);
+
+        final int queuedCountBeforeDrain = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Queued FlowFile count before drain: {}", 
queuedCountBeforeDrain);
+        assertTrue(queuedCountBeforeDrain > 0);
+
+        logger.info("Initiating drain for connector {}", connectorId);
+        getClientUtil().drainConnector(connectorId);
+
+        logger.info("Waiting for connector to enter DRAINING state");
+        getClientUtil().waitForConnectorDraining(connectorId);
+
+        logger.info("Sleeping for 2 seconds to verify connector remains in 
DRAINING state");
+        Thread.sleep(2000L);
+
+        ConnectorEntity drainingConnector = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        logger.info("Connector state after 2 seconds: {}", 
drainingConnector.getComponent().getState());
+        assertEquals(ConnectorState.DRAINING.name(), 
drainingConnector.getComponent().getState());
+
+        final int queuedCountWhileDraining = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Queued FlowFile count while draining: {}", 
queuedCountWhileDraining);
+        assertTrue(queuedCountWhileDraining > 0);
+
+        logger.info("Canceling drain for connector {}", connectorId);
+        getClientUtil().cancelDrain(connectorId);
+
+        logger.info("Waiting for connector to return to STOPPED state after 
cancel");
+        getClientUtil().waitForConnectorStopped(connectorId);
+
+        final ConnectorEntity finalConnector = 
getNifiClient().getConnectorClient().getConnector(connectorId);
+        logger.info("Final connector state: {}", 
finalConnector.getComponent().getState());
+        assertEquals(ConnectorState.STOPPED.name(), 
finalConnector.getComponent().getState());
+
+        final int queuedCountAfterCancel = 
getConnectorQueuedFlowFileCount(connectorId);
+        logger.info("Queued FlowFile count after cancel (should still have 
data): {}", queuedCountAfterCancel);
+        assertTrue(queuedCountAfterCancel > 0);
+
+        logger.info("testCancelDrainFlowFiles completed successfully");
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
index 1e95222225..76dfefaeaa 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
@@ -131,6 +131,50 @@ public interface ConnectorClient {
      */
     ConnectorEntity stopConnector(ConnectorEntity connectorEntity) throws 
NiFiClientException, IOException;
 
+    /**
+     * Initiates draining of FlowFiles for a connector.
+     *
+     * @param connectorId the connector ID
+     * @param clientId the client ID
+     * @param version the revision version
+     * @return the updated connector entity
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ConnectorEntity drainConnector(String connectorId, String clientId, long 
version) throws NiFiClientException, IOException;
+
+    /**
+     * Initiates draining of FlowFiles for a connector using the information 
from the entity.
+     *
+     * @param connectorEntity the connector entity to drain
+     * @return the updated connector entity
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ConnectorEntity drainConnector(ConnectorEntity connectorEntity) throws 
NiFiClientException, IOException;
+
+    /**
+     * Cancels an ongoing drain operation for a connector.
+     *
+     * @param connectorId the connector ID
+     * @param clientId the client ID
+     * @param version the revision version
+     * @return the updated connector entity
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ConnectorEntity cancelDrain(String connectorId, String clientId, long 
version) throws NiFiClientException, IOException;
+
+    /**
+     * Cancels an ongoing drain operation for a connector using the 
information from the entity.
+     *
+     * @param connectorEntity the connector entity to cancel draining for
+     * @return the updated connector entity
+     * @throws NiFiClientException if an error occurs during the request
+     * @throws IOException if an I/O error occurs
+     */
+    ConnectorEntity cancelDrain(ConnectorEntity connectorEntity) throws 
NiFiClientException, IOException;
+
     /**
      * Gets the configuration step names for a connector.
      *
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
index b5ad5878d5..4f9b8d8aaa 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
@@ -164,6 +164,75 @@ public class JerseyConnectorClient extends 
AbstractJerseyClient implements Conne
                 connectorEntity.getRevision().getVersion(), 
connectorEntity.isDisconnectedNodeAcknowledged());
     }
 
+    @Override
+    public ConnectorEntity drainConnector(final String connectorId, final 
String clientId, final long version) throws NiFiClientException, IOException {
+        return drainConnector(connectorId, clientId, version, false);
+    }
+
+    @Override
+    public ConnectorEntity drainConnector(final ConnectorEntity 
connectorEntity) throws NiFiClientException, IOException {
+        return drainConnector(connectorEntity.getId(), 
connectorEntity.getRevision().getClientId(),
+                connectorEntity.getRevision().getVersion(), 
connectorEntity.isDisconnectedNodeAcknowledged());
+    }
+
+    private ConnectorEntity drainConnector(final String connectorId, final 
String clientId, final long version,
+            final Boolean disconnectedNodeAcknowledged) throws 
NiFiClientException, IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector ID cannot be null or 
blank");
+        }
+
+        return executeAction("Error initiating connector drain", () -> {
+            final WebTarget target = connectorTarget
+                    .path("/drain")
+                    .resolveTemplate("id", connectorId);
+
+            final ConnectorEntity requestEntity = new ConnectorEntity();
+            requestEntity.setId(connectorId);
+            
requestEntity.setDisconnectedNodeAcknowledged(disconnectedNodeAcknowledged);
+
+            final RevisionDTO revisionDto = new RevisionDTO();
+            revisionDto.setClientId(clientId);
+            revisionDto.setVersion(version);
+            requestEntity.setRevision(revisionDto);
+
+            return getRequestBuilder(target).post(
+                    Entity.entity(requestEntity, 
MediaType.APPLICATION_JSON_TYPE),
+                    ConnectorEntity.class);
+        });
+    }
+
+    @Override
+    public ConnectorEntity cancelDrain(final String connectorId, final String 
clientId, final long version) throws NiFiClientException, IOException {
+        return cancelDrain(connectorId, clientId, version, false);
+    }
+
+    @Override
+    public ConnectorEntity cancelDrain(final ConnectorEntity connectorEntity) 
throws NiFiClientException, IOException {
+        return cancelDrain(connectorEntity.getId(), 
connectorEntity.getRevision().getClientId(),
+                connectorEntity.getRevision().getVersion(), 
connectorEntity.isDisconnectedNodeAcknowledged());
+    }
+
+    private ConnectorEntity cancelDrain(final String connectorId, final String 
clientId, final long version,
+            final Boolean disconnectedNodeAcknowledged) throws 
NiFiClientException, IOException {
+        if (StringUtils.isBlank(connectorId)) {
+            throw new IllegalArgumentException("Connector ID cannot be null or 
blank");
+        }
+
+        return executeAction("Error canceling connector drain", () -> {
+            WebTarget target = connectorTarget
+                    .path("/drain")
+                    .queryParam("version", version)
+                    .queryParam("clientId", clientId)
+                    .resolveTemplate("id", connectorId);
+
+            if (disconnectedNodeAcknowledged == Boolean.TRUE) {
+                target = target.queryParam("disconnectedNodeAcknowledged", 
"true");
+            }
+
+            return getRequestBuilder(target).delete(ConnectorEntity.class);
+        });
+    }
+
     private ConnectorEntity updateConnectorRunStatus(final String connectorId, 
final String desiredState, final String clientId,
             final long version, final Boolean disconnectedNodeAcknowledged) 
throws NiFiClientException, IOException {
         if (StringUtils.isBlank(connectorId)) {

Reply via email to