This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ce1b4150a04731d81ad52018c74301e4b4af9092
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jan 9 13:02:05 2026 -0500

    NIFI-15427: Added abiliy to drop flowfiles / drain flowfiles from a C… 
(#10730)
    
    * NIFI-15427: Added abiliy to drop flowfiles / drain flowfiles from a 
Connector; added some system tests to verify existing behavior; fixed existing 
issue with IT
    
    * NIFI-15427: Fixed Exception message in case unable to purge FlowFiles due 
to Connector state
    
    * NIFI-15427: Ensured thread safety of state transitions when 
draining/purging FlowFiles
    
    * NIFI-15427: Addressed review feedback
---
 .../nifi/controller/queue/DropFlowFileStatus.java  |   4 +-
 .../nifi/components/connector/ConnectorNode.java   |  13 +++
 .../nifi/components/connector/ConnectorState.java  |   2 +
 .../nifi/controller/queue/DropFlowFileRequest.java |   3 +-
 .../nifi/components/connector/GhostConnector.java  |   6 +
 .../connector/StandardConnectorNode.java           |  49 +++++++++
 .../components/connector/BlockingConnector.java    |   5 +
 .../components/connector/SleepingConnector.java    |   6 +
 .../connector/StandardConnectorNodeIT.java         |  76 ++++++++++++-
 .../connector/TestStandardConnectorNode.java       | 122 +++++++++++++++++++++
 .../apache/nifi/controller/flow/NopConnector.java  |   5 +
 .../org.example.TestConnector/Another_Test_Step.md |   0
 .../org.example.TestConnector/Test_Step.md         |   0
 .../src/test/resources/colors.txt                  |   3 +
 .../java/org/apache/nifi/nar/DummyConnector.java   |   6 +
 .../tests/system/DataQueuingConnector.java         |  98 +++++++++++++++++
 .../org.apache.nifi.components.connector.Connector |   3 +-
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  19 ++++
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |   1 +
 .../connectors/ClusteredConnectorAssetsIT.java     |   2 -
 .../system/connectors/ClusteredConnectorIT.java    |  92 ++++++++++++++++
 .../tests/system/connectors/ConnectorCrudIT.java   |  10 ++
 .../org/apache/nifi/toolkit/client/FlowClient.java |   8 ++
 .../nifi/toolkit/client/impl/JerseyFlowClient.java |   9 ++
 24 files changed, 533 insertions(+), 9 deletions(-)

diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 3233465e3c..13d25bb2ed 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -17,7 +17,7 @@
 
 package org.apache.nifi.controller.queue;
 
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Represents the status of a Drop FlowFile Request that has been issued to
@@ -81,5 +81,5 @@ public interface DropFlowFileStatus {
     /**
      * @return a Future that can be used to determine when the drop operation 
has completed
      */
-    Future<Void> getCompletionFuture();
+    CompletableFuture<Void> getCompletionFuture();
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index b7206edbd3..2bd111c752 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -197,6 +197,19 @@ public interface ConnectorNode extends 
ComponentAuthorizable, VersionedComponent
      */
     Future<Void> stop(FlowEngine scheduler);
 
+    /**
+     * Allows the Connector to drain any in-flight data while not accepting 
any new data.
+     */
+    Future<Void> drainFlowFiles();
+
+    /**
+     * Purges all FlowFiles from the Connector, immediately dropping the data.
+     *
+     * @param requestor the user requesting the purge. This will be recorded 
in the associated provenance DROP events.
+     * @return a Future that will be completed when the purge operation is 
finished
+     */
+    Future<Void> purgeFlowFiles(String requestor);
+
     /**
      * Updates the configuration of one of the configuration steps. This 
method should only be invoked via the ConnectorRepository.
      * @param configurationStepName the name of the configuration step being 
set
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
index 08e44e0bc8..ad30ab832b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
@@ -23,6 +23,8 @@ public enum ConnectorState {
     STOPPING,
     STOPPED,
     DISABLED,
+    DRAINING,
+    PURGING,
     PREPARING_FOR_UPDATE,
     UPDATING,
     UPDATE_FAILED,
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
index 2aa22d978c..10f7fc1e13 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.controller.queue;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
 
 public class DropFlowFileRequest implements DropFlowFileStatus {
     private final String identifier;
@@ -91,7 +90,7 @@ public class DropFlowFileRequest implements 
DropFlowFileStatus {
     }
 
     @Override
-    public Future<Void> getCompletionFuture() {
+    public CompletableFuture<Void> getCompletionFuture() {
         return completionFuture;
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
index 4c7c5fa45e..73f9c98edc 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
@@ -26,6 +26,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 public class GhostConnector implements Connector {
     private final String identifier;
@@ -119,6 +120,11 @@ public class GhostConnector implements Connector {
         return List.of();
     }
 
+    @Override
+    public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+        return CompletableFuture.completedFuture(null);
+    }
+
     @Override
     public String toString() {
         return "GhostConnector[id=" + identifier + ", type=" + 
canonicalClassName + "]";
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 14713ecee9..a35e191c83 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -36,6 +36,7 @@ import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.FlowFileActivity;
 import org.apache.nifi.connectable.FlowFileTransferCounts;
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.flow.Bundle;
 import org.apache.nifi.flow.VersionedConfigurationStep;
@@ -64,6 +65,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -442,6 +444,53 @@ public class StandardConnectorNode implements 
ConnectorNode {
         return stopCompleteFuture;
     }
 
+    @Override
+    public Future<Void> drainFlowFiles() {
+        requireStopped("drain FlowFiles", ConnectorState.DRAINING);
+
+        try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, 
connectorDetails.getConnector().getClass(), getIdentifier())) {
+            final CompletableFuture<Void> future = 
connectorDetails.getConnector().drainFlowFiles(activeFlowContext);
+            final CompletableFuture<Void> stateUpdateFuture = 
future.whenComplete((result, failureCause) -> 
stateTransition.setCurrentState(ConnectorState.STOPPED));
+            return stateUpdateFuture;
+        } catch (final Throwable t) {
+            stateTransition.setCurrentState(ConnectorState.STOPPED);
+            throw t;
+        }
+    }
+
+    @Override
+    public Future<Void> purgeFlowFiles(final String requestor) {
+        requireStopped("purge FlowFiles", ConnectorState.PURGING);
+
+        try {
+            final String dropRequestId = UUID.randomUUID().toString();
+            final DropFlowFileStatus status = 
activeFlowContext.getManagedProcessGroup().dropAllFlowFiles(dropRequestId, 
requestor);
+            final CompletableFuture<Void> future = 
status.getCompletionFuture();
+            final CompletableFuture<Void> stateUpdateFuture = 
future.whenComplete((result, failureCause) -> 
stateTransition.setCurrentState(ConnectorState.STOPPED));
+            return stateUpdateFuture;
+        } catch (final Throwable t) {
+            stateTransition.setCurrentState(ConnectorState.STOPPED);
+            throw t;
+        }
+    }
+
+    private void requireStopped(final String action, final ConnectorState 
newState) {
+        final ConnectorState desiredState = getDesiredState();
+        if (desiredState != ConnectorState.STOPPED) {
+            throw new IllegalStateException("Cannot " + action + " for " + 
this + " because its desired state is currently " + desiredState + "; it must 
be STOPPED.");
+        }
+
+        boolean stateUpdated = false;
+        while (!stateUpdated) {
+            final ConnectorState currentState = getCurrentState();
+            if (currentState != ConnectorState.STOPPED) {
+                throw new IllegalStateException("Cannot " + action + " for " + 
this + " because its current state is currently " + currentState + "; it must 
be STOPPED.");
+            }
+
+            stateUpdated = 
stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState);
+        }
+    }
+
     private void stopComponent(final FlowEngine scheduler, final 
CompletableFuture<Void> stopCompleteFuture) {
         try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, 
connectorDetails.getConnector().getClass(), getIdentifier())) {
             connectorDetails.getConnector().stop(activeFlowContext);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
index 2d163e5e80..da9836a102 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
@@ -25,6 +25,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 
 public class BlockingConnector implements Connector {
@@ -124,4 +125,8 @@ public class BlockingConnector implements Connector {
         return List.of();
     }
 
+    @Override
+    public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
index 6443523e43..f2dbbe65c7 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
@@ -26,6 +26,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 public class SleepingConnector implements Connector {
     private final Duration sleepDuration;
@@ -117,4 +118,9 @@ public class SleepingConnector implements Connector {
     public List<AllowableValue> fetchAllowableValues(final String stepName, 
final String propertyName, final FlowContext workingContext) {
         return List.of();
     }
+
+    @Override
+    public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
index abdbfd8f7f..d2f8ce2dfc 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
@@ -42,6 +42,8 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.flow.StandardFlowManager;
 import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueueFactory;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
@@ -78,6 +80,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -88,6 +91,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 
 import static java.util.Objects.requireNonNull;
@@ -410,7 +417,8 @@ public class StandardConnectorNodeIT {
             .anyMatch(result -> result.getSubject() != null && 
result.getSubject().contains("First Primary Color"));
         assertTrue(hasColorError);
 
-        final ConnectorConfiguration validConfig = 
createFileAndColorsConfiguration(".", "red");
+        final File colorsFile = new File("src/test/resources/colors.txt");
+        final ConnectorConfiguration validConfig = 
createFileAndColorsConfiguration(colorsFile.getAbsolutePath(), "red");
         configure(connectorNode, validConfig);
 
         final ValidationState updatedValidationState = 
connectorNode.performValidation();
@@ -439,6 +447,58 @@ public class StandardConnectorNodeIT {
         assertEquals(1, validationState.getValidationErrors().size());
     }
 
+    @Test
+    public void testPurgeFlowFilesEmptiesQueues() throws FlowUpdateException, 
ExecutionException, InterruptedException, TimeoutException {
+        final ConnectorNode connectorNode = initializeDynamicFlowConnector();
+        final ProcessGroup rootGroup = 
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+
+        final Connection connection = queueDataBySource(rootGroup, "Create 
FlowFile");
+        assertEquals(1, connection.getFlowFileQueue().size().getObjectCount());
+
+        final Future<Void> purgeFuture = 
connectorNode.purgeFlowFiles("test-user");
+        purgeFuture.get(10, TimeUnit.SECONDS);
+
+        assertTrue(connection.getFlowFileQueue().isEmpty());
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+    }
+
+    @Test
+    public void testPurgeFlowFilesMultipleQueues() throws FlowUpdateException, 
ExecutionException, InterruptedException, TimeoutException {
+        final ConnectorNode connectorNode = initializeDynamicFlowConnector();
+        final ProcessGroup rootGroup = 
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+
+        final Connection connection1 = queueDataBySource(rootGroup, "Create 
FlowFile");
+        final Connection connection2 = queueDataByDestination(rootGroup, 
"Terminate FlowFile");
+        assertEquals(1, 
connection1.getFlowFileQueue().size().getObjectCount());
+        assertEquals(1, 
connection2.getFlowFileQueue().size().getObjectCount());
+
+        final Future<Void> purgeFuture = 
connectorNode.purgeFlowFiles("test-user");
+        purgeFuture.get(10, TimeUnit.SECONDS);
+
+        assertTrue(connection1.getFlowFileQueue().isEmpty());
+        assertTrue(connection2.getFlowFileQueue().isEmpty());
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+    }
+
+    @Test
+    public void testPurgeFlowFilesRequiresStoppedState() throws 
FlowUpdateException {
+        final ConnectorNode connectorNode = initializeDynamicFlowConnector();
+        final ProcessGroup rootGroup = 
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+        queueDataBySource(rootGroup, "Create FlowFile");
+
+        connectorNode.enable();
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+        connectorNode.start(componentLifecycleThreadPool);
+        assertEquals(ConnectorState.RUNNING, connectorNode.getDesiredState());
+
+        final IllegalStateException exception = 
assertThrows(IllegalStateException.class, () -> 
connectorNode.purgeFlowFiles("test-user"));
+        assertTrue(exception.getMessage().contains("must be STOPPED"));
+
+        connectorNode.stop(componentLifecycleThreadPool);
+    }
+
+
     private List<String> getConfigurationStepNames(final ConnectorNode 
connectorNode) {
         return connectorNode.getConfigurationSteps().stream()
             .map(ConfigurationStep::getName)
@@ -484,6 +544,20 @@ public class StandardConnectorNodeIT {
         
when(flowFileQueue.getLoadBalanceStrategy()).thenReturn(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE);
         
when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS);
 
+        // Mock dropFlowFiles to clear the list and return a completed status
+        when(flowFileQueue.dropFlowFiles(anyString(), 
anyString())).thenAnswer(invocation -> {
+            final String requestId = invocation.getArgument(0);
+            final int originalCount = flowFileList.size();
+            flowFileList.clear();
+
+            final DropFlowFileRequest dropRequest = new 
DropFlowFileRequest(requestId);
+            dropRequest.setOriginalSize(new QueueSize(originalCount, 
originalCount));
+            dropRequest.setCurrentSize(new QueueSize(0, 0));
+            dropRequest.setDroppedSize(new QueueSize(originalCount, 
originalCount));
+            dropRequest.setState(DropFlowFileState.COMPLETE);
+            return dropRequest;
+        });
+
         final FlowFileQueueFactory flowFileQueueFactory = 
(loadBalanceStrategy, partitioningAttribute, processGroup) -> flowFileQueue;
 
         final Connection connection = new 
StandardConnection.Builder(processScheduler)
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
index a1490409da..bf26c1f7ed 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
@@ -494,6 +494,85 @@ public class TestStandardConnectorNode {
         assertEquals("The property value is invalid", 
failedResult.getExplanation());
     }
 
+    @Test
+    @Timeout(value = 5, unit = TimeUnit.SECONDS)
+    public void testDrainFlowFilesTransitionsStateToDraining() throws 
FlowUpdateException {
+        final CompletableFuture<Void> drainCompletionFuture = new 
CompletableFuture<>();
+        final DrainBlockingConnector drainBlockingConnector = new 
DrainBlockingConnector(drainCompletionFuture);
+        final StandardConnectorNode connectorNode = 
createConnectorNode(drainBlockingConnector);
+
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+        final Future<Void> drainFuture = connectorNode.drainFlowFiles();
+
+        assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+        assertFalse(drainFuture.isDone());
+
+        drainCompletionFuture.complete(null);
+
+        try {
+            drainFuture.get(2, TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            throw new RuntimeException("Drain future failed to complete", e);
+        }
+
+        assertTrue(drainFuture.isDone());
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+    }
+
+    @Test
+    @Timeout(value = 5, unit = TimeUnit.SECONDS)
+    public void testDrainFlowFilesFutureDoesNotCompleteUntilDrainFinishes() 
throws FlowUpdateException, InterruptedException {
+        final CompletableFuture<Void> drainCompletionFuture = new 
CompletableFuture<>();
+        final DrainBlockingConnector drainBlockingConnector = new 
DrainBlockingConnector(drainCompletionFuture);
+        final StandardConnectorNode connectorNode = 
createConnectorNode(drainBlockingConnector);
+
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+        final Future<Void> drainFuture = connectorNode.drainFlowFiles();
+        assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+        Thread.sleep(200);
+        assertFalse(drainFuture.isDone());
+        assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+        drainCompletionFuture.complete(null);
+
+        try {
+            drainFuture.get(2, TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            throw new RuntimeException("Drain future failed to complete", e);
+        }
+
+        assertTrue(drainFuture.isDone());
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+    }
+
+    @Test
+    @Timeout(value = 5, unit = TimeUnit.SECONDS)
+    public void testDrainFlowFilesStateTransitionsBackToStoppedOnCompletion() 
throws FlowUpdateException {
+        final CompletableFuture<Void> drainCompletionFuture = new 
CompletableFuture<>();
+        final DrainBlockingConnector drainBlockingConnector = new 
DrainBlockingConnector(drainCompletionFuture);
+        final StandardConnectorNode connectorNode = 
createConnectorNode(drainBlockingConnector);
+
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+        assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState());
+
+        final Future<Void> drainFuture = connectorNode.drainFlowFiles();
+        assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+        drainCompletionFuture.complete(null);
+
+        try {
+            drainFuture.get(2, TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            throw new RuntimeException("Drain future failed to complete", e);
+        }
+
+        assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+        assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState());
+    }
+
     private StandardConnectorNode createConnectorNode() throws 
FlowUpdateException {
         final SleepingConnector sleepingConnector = new 
SleepingConnector(Duration.ofMillis(1));
         return createConnectorNode(sleepingConnector);
@@ -669,4 +748,47 @@ public class TestStandardConnectorNode {
         }
     }
 
+    /**
+     * Test connector that allows control over when drainFlowFiles completes 
via a CompletableFuture
+     */
+    private static class DrainBlockingConnector extends AbstractConnector {
+        private final CompletableFuture<Void> drainCompletionFuture;
+
+        public DrainBlockingConnector(final CompletableFuture<Void> 
drainCompletionFuture) {
+            this.drainCompletionFuture = drainCompletionFuture;
+        }
+
+        @Override
+        public VersionedExternalFlow getInitialFlow() {
+            return null;
+        }
+
+        @Override
+        public void prepareForUpdate(final FlowContext workingContext, final 
FlowContext activeContext) {
+        }
+
+        @Override
+        public List<ConfigurationStep> getConfigurationSteps() {
+            return List.of();
+        }
+
+        @Override
+        public void applyUpdate(final FlowContext workingContext, final 
FlowContext activeContext) {
+        }
+
+        @Override
+        protected void onStepConfigured(final String stepName, final 
FlowContext workingContext) {
+        }
+
+        @Override
+        public List<ConfigVerificationResult> verifyConfigurationStep(final 
String stepName, final Map<String, String> overrides, final FlowContext 
flowContext) {
+            return List.of();
+        }
+
+        @Override
+        public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+            return drainCompletionFuture;
+        }
+    }
+
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
index b4b7c7d9be..4701b06d68 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
@@ -31,6 +31,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A simple no-op Connector implementation for testing purposes.
@@ -159,4 +160,8 @@ public class NopConnector implements Connector {
         return List.of();
     }
 
+    @Override
+    public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Another_Test_Step.md
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md
similarity index 100%
rename from 
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Another_Test_Step.md
rename to 
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Test_Step.md
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md
similarity index 100%
rename from 
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Test_Step.md
rename to 
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt
new file mode 100644
index 0000000000..1fedccaeb9
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt
@@ -0,0 +1,3 @@
+red
+green
+blue
\ No newline at end of file
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
index 61738f428a..ed2fdd6b07 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
@@ -31,6 +31,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 @Tags({"test", "connector"})
 public class DummyConnector implements Connector {
@@ -111,4 +112,9 @@ public class DummyConnector implements Connector {
     public List<AllowableValue> fetchAllowableValues(final String stepName, 
final String propertyName, final FlowContext flowContext, final String filter) {
         return List.of();
     }
+
+    @Override
+    public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
new file mode 100644
index 0000000000..de6701159d
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DataQueuingConnector extends AbstractConnector {
+    @Override
+    protected void onStepConfigured(final String stepName, final FlowContext 
workingContext) {
+
+    }
+
+    @Override
+    public VersionedExternalFlow getInitialFlow() {
+        final VersionedProcessor generate = new VersionedProcessor();
+        generate.setName("GenerateFlowFile");
+        
generate.setType("org.apache.nifi.processors.tests.system.GenerateFlowFile");
+        generate.setIdentifier("gen-1");
+        generate.setGroupIdentifier("1234");
+        generate.setProperties(Map.of("File Size", "1 KB"));
+
+        final VersionedProcessor terminate = new VersionedProcessor();
+        terminate.setName("TerminateFlowFile");
+        
terminate.setType("org.apache.nifi.processors.tests.system.TerminateFlowFile");
+        terminate.setIdentifier("term-1");
+        terminate.setGroupIdentifier("1234");
+        terminate.setScheduledState(ScheduledState.DISABLED);
+
+        final ConnectableComponent source = new ConnectableComponent();
+        source.setId(generate.getIdentifier());
+        source.setType(ConnectableComponentType.PROCESSOR);
+        source.setGroupId("1234");
+
+        final ConnectableComponent destination = new ConnectableComponent();
+        destination.setId(terminate.getIdentifier());
+        destination.setType(ConnectableComponentType.PROCESSOR);
+        destination.setGroupId("1234");
+
+        final VersionedConnection connection = new VersionedConnection();
+        connection.setSource(source);
+        connection.setDestination(destination);
+        connection.setGroupIdentifier("1234");
+        connection.setIdentifier("generate-to-terminate-1");
+
+        final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+        rootGroup.setName("Data Queuing Connector");
+        rootGroup.setIdentifier("1234");
+        rootGroup.setProcessors(Set.of(generate, terminate));
+        rootGroup.setConnections(Set.of(connection));
+
+        final VersionedExternalFlow flow = new VersionedExternalFlow();
+        flow.setFlowContents(rootGroup);
+        return flow;
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verifyConfigurationStep(final String 
stepName, final Map<String, String> propertyValueOverrides, final FlowContext 
flowContext) {
+        return List.of();
+    }
+
+    @Override
+    public List<ConfigurationStep> getConfigurationSteps() {
+        return List.of();
+    }
+
+    @Override
+    public void applyUpdate(final FlowContext workingFlowContext, final 
FlowContext activeFlowContext) {
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index 85b4fb889a..c8e0d0ed64 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -15,5 +15,4 @@
 
 org.apache.nifi.connectors.tests.system.NopConnector
 org.apache.nifi.connectors.tests.system.AssetConnector
-
-
+org.apache.nifi.connectors.tests.system.DataQueuingConnector
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 7a16c754c9..51272e8b8e 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -80,6 +80,7 @@ import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.ConnectorsEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
@@ -397,6 +398,24 @@ public class NiFiClientUtil {
         }
     }
 
+    public void stopConnectors() throws NiFiClientException, IOException, 
InterruptedException {
+        final ConnectorsEntity connectorsEntity = 
nifiClient.getFlowClient().getConnectors();
+        for (final ConnectorEntity connector : 
connectorsEntity.getConnectors()) {
+            getConnectorClient().stopConnector(connector);
+            waitForConnectorStopped(connector.getId());
+        }
+    }
+
+    public void stopConnector(final ConnectorEntity connectorEntity) throws 
NiFiClientException, IOException, InterruptedException {
+        stopConnector(connectorEntity.getId());
+    }
+
+    public void stopConnector(final String connectorId) throws 
NiFiClientException, IOException, InterruptedException {
+        final ConnectorEntity entity = 
getConnectorClient().getConnector(connectorId);
+        getConnectorClient().stopConnector(entity);
+        waitForConnectorStopped(connectorId);
+    }
+
     public void waitForConnectorStopped(final String connectorId) throws 
NiFiClientException, IOException, InterruptedException {
         waitForConnectorState(connectorId, ConnectorState.STOPPED);
     }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 74999daff1..b73814250f 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -261,6 +261,7 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
         getClientUtil().disableControllerLevelServices();
         getClientUtil().disableFlowAnalysisRules();
         getClientUtil().stopTransmitting("root");
+        getClientUtil().stopConnectors();
         getClientUtil().deleteAll("root");
         getClientUtil().deleteControllerLevelServices();
         getClientUtil().deleteReportingTasks();
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
index 9016c504ce..b2309891dd 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
@@ -138,5 +138,3 @@ public class ClusteredConnectorAssetsIT extends 
ConnectorAssetsIT {
         connectorClient.deleteConnector(connector);
     }
 }
-
-
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
index 2727203cf8..b07646dd17 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
@@ -17,20 +17,28 @@
 
 package org.apache.nifi.tests.system.connectors;
 
+import jakarta.ws.rs.NotFoundException;
 import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.toolkit.client.ConnectorClient;
 import org.apache.nifi.toolkit.client.NiFiClientException;
 import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
 import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.NodeEntity;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class ClusteredConnectorIT extends ConnectorCrudIT {
     private static final Logger logger = 
LoggerFactory.getLogger(ClusteredConnectorIT.class);
@@ -109,6 +117,90 @@ public class ClusteredConnectorIT extends ConnectorCrudIT {
         logger.info("Validated active and working configuration on Node 2");
     }
 
+    @Test
+    public void testDeleteConnectorOnConnect() throws NiFiClientException, 
IOException, InterruptedException {
+        // Create Connector
+        final ConnectorEntity connector = 
getClientUtil().createConnector("DataQueuingConnector");
+        assertNotNull(connector);
+
+        // Disconnect node 2
+        disconnectNode(2);
+
+        // Should not be able to delete connector
+        final ConnectorClient connectorClient = 
getNifiClient().getConnectorClient();
+        assertThrows(NiFiClientException.class, () -> 
connectorClient.deleteConnector(connector));
+
+        final NodeEntity node2Entity = getNodeEntity(2);
+        
getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId());
+
+        // Should now be able to delete connector
+        connectorClient.deleteConnector(connector);
+
+        // Should now be able to add node 2 back
+        getNiFiInstance().getNodeInstance(2).stop();
+        getNiFiInstance().getNodeInstance(2).start(true);
+        waitForAllNodesConnected();
+
+        switchClientToNode(2);
+
+        // We should get a 404
+        try {
+            
getNifiClient().getConnectorClient().getConnector(connector.getId());
+            fail("Expected NiFiClientException but it was not thrown");
+        } catch (final NiFiClientException e) {
+            assertInstanceOf(NotFoundException.class, e.getCause());
+        }
+    }
+
+
+    @Test
+    public void testDeleteConnectorOnConnectWithDataQueued() throws 
NiFiClientException, IOException, InterruptedException {
+        // Create Connector
+        final ConnectorEntity connector = 
getClientUtil().createConnector("DataQueuingConnector");
+        assertNotNull(connector);
+
+        getNifiClient().getConnectorClient().startConnector(connector);
+
+        Thread.sleep(1000L); // Wait 1 second to allow some data to queue
+
+        // Disconnect node 2
+        disconnectNode(2);
+        getNiFiInstance().getNodeInstance(2).stop();
+
+        // Should not be able to delete connector
+        final ConnectorClient connectorClient = 
getNifiClient().getConnectorClient();
+        assertThrows(NiFiClientException.class, () -> 
connectorClient.deleteConnector(connector));
+
+        // Remove node 2 from cluster.
+        final NodeEntity node2Entity = getNodeEntity(2);
+        
getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId());
+
+        // We cannot delete the connector directly because it has data queued. 
Stop Node 1, delete the flow.json.gz file, and restart Node 1.
+        getNiFiInstance().getNodeInstance(1).stop();
+        final File node1InstanceDir = 
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
+        final File node1ConfDir = new File(node1InstanceDir, "conf");
+        final File flowJson = new File(node1ConfDir, "flow.json.gz");
+        Files.delete(flowJson.toPath());
+
+        getNiFiInstance().getNodeInstance(1).start(true);
+        waitForCoordinatorElected();
+
+        // Should now be able to add node 2 back
+        getNiFiInstance().getNodeInstance(2).start(true);
+        waitForAllNodesConnected();
+
+        switchClientToNode(2);
+
+        // We should get a 404
+        try {
+            
getNifiClient().getConnectorClient().getConnector(connector.getId());
+            fail("Expected NiFiClientException but it was not thrown");
+        } catch (final NiFiClientException e) {
+            assertInstanceOf(NotFoundException.class, e.getCause());
+        }
+    }
+
+
     private void assertActiveConfigurationValue(final ConnectorEntity 
connector, final String propertyName, final String expectedValue) {
         final String actualValue = 
getConfigurationValue(connector.getComponent().getActiveConfiguration(), 
propertyName);
         assertEquals(expectedValue, actualValue, "Active configuration 
property '" + propertyName + "' did not match expected value");
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
index 085177dad4..78728935cc 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
@@ -148,4 +148,14 @@ public class ConnectorCrudIT extends NiFiSystemIT {
         getClientUtil().applyConnectorUpdate(connector);
         getClientUtil().waitForValidConnector(connector.getId());
     }
+
+    @Test
+    public void testDeleteConnectorNoDataQueued() throws NiFiClientException, 
IOException {
+        // Create Connector
+        final ConnectorEntity connector = 
getClientUtil().createConnector("DataQueuingConnector");
+        assertNotNull(connector);
+
+        // Delete
+        getNifiClient().getConnectorClient().deleteConnector(connector);
+    }
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
index e483f1c91a..57fa3c74f5 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
@@ -20,6 +20,7 @@ import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
 import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.ClusterSummaryEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+import org.apache.nifi.web.api.entity.ConnectorsEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CurrentUserEntity;
@@ -229,4 +230,11 @@ public interface FlowClient {
      * @return list of flows
      */
     VersionedFlowsEntity getFlowRegistryFlows(String registryClientId, String 
branch, String bucket) throws NiFiClientException, IOException;
+
+    /**
+     * Retrieves all of the Connectors.
+     *
+     * @return the list of Connectors
+     */
+    ConnectorsEntity getConnectors() throws NiFiClientException, IOException;
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
index 114c4e01c2..f0cb9450ca 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
@@ -32,6 +32,7 @@ import 
org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.ClusterSummaryEntity;
 import org.apache.nifi.web.api.entity.ComponentEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+import org.apache.nifi.web.api.entity.ConnectorsEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CurrentUserEntity;
@@ -400,4 +401,12 @@ public class JerseyFlowClient extends AbstractJerseyClient 
implements FlowClient
             return getRequestBuilder(target).get(VersionedFlowsEntity.class);
         });
     }
+
+    @Override
+    public ConnectorsEntity getConnectors() throws NiFiClientException, 
IOException {
+        return executeAction("Error retrieving Connectors", () -> {
+            final WebTarget target = flowTarget.path("/connectors");
+            return getRequestBuilder(target).get(ConnectorsEntity.class);
+        });
+    }
 }

Reply via email to