This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 8890b4230e NIFI-15427: Added abiliy to drop flowfiles / drain
flowfiles from a C… (#10730)
8890b4230e is described below
commit 8890b4230e0d10aec8eec6e0d8f48ef7057449ba
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jan 9 13:02:05 2026 -0500
NIFI-15427: Added abiliy to drop flowfiles / drain flowfiles from a C…
(#10730)
* NIFI-15427: Added abiliy to drop flowfiles / drain flowfiles from a
Connector; added some system tests to verify existing behavior; fixed existing
issue with IT
* NIFI-15427: Fixed Exception message in case unable to purge FlowFiles due
to Connector state
* NIFI-15427: Ensured thread safety of state transitions when
draining/purging FlowFiles
* NIFI-15427: Addressed review feedback
---
.../nifi/controller/queue/DropFlowFileStatus.java | 4 +-
.../nifi/components/connector/ConnectorNode.java | 13 +++
.../nifi/components/connector/ConnectorState.java | 2 +
.../nifi/controller/queue/DropFlowFileRequest.java | 3 +-
.../nifi/components/connector/GhostConnector.java | 6 +
.../connector/StandardConnectorNode.java | 49 +++++++++
.../components/connector/BlockingConnector.java | 5 +
.../components/connector/SleepingConnector.java | 6 +
.../connector/StandardConnectorNodeIT.java | 76 ++++++++++++-
.../connector/TestStandardConnectorNode.java | 122 +++++++++++++++++++++
.../apache/nifi/controller/flow/NopConnector.java | 5 +
.../org.example.TestConnector/Another_Test_Step.md | 0
.../org.example.TestConnector/Test_Step.md | 0
.../src/test/resources/colors.txt | 3 +
.../java/org/apache/nifi/nar/DummyConnector.java | 6 +
.../tests/system/DataQueuingConnector.java | 98 +++++++++++++++++
.../org.apache.nifi.components.connector.Connector | 3 +-
.../apache/nifi/tests/system/NiFiClientUtil.java | 19 ++++
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 3 +-
.../connectors/ClusteredConnectorAssetsIT.java | 2 -
.../system/connectors/ClusteredConnectorIT.java | 92 ++++++++++++++++
.../tests/system/connectors/ConnectorCrudIT.java | 10 ++
.../org/apache/nifi/toolkit/client/FlowClient.java | 8 ++
.../nifi/toolkit/client/impl/JerseyFlowClient.java | 9 ++
24 files changed, 534 insertions(+), 10 deletions(-)
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 3233465e3c..13d25bb2ed 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -17,7 +17,7 @@
package org.apache.nifi.controller.queue;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
/**
* Represents the status of a Drop FlowFile Request that has been issued to
@@ -81,5 +81,5 @@ public interface DropFlowFileStatus {
/**
* @return a Future that can be used to determine when the drop operation
has completed
*/
- Future<Void> getCompletionFuture();
+ CompletableFuture<Void> getCompletionFuture();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index b7206edbd3..2bd111c752 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -197,6 +197,19 @@ public interface ConnectorNode extends
ComponentAuthorizable, VersionedComponent
*/
Future<Void> stop(FlowEngine scheduler);
+ /**
+ * Allows the Connector to drain any in-flight data while not accepting
any new data.
+ */
+ Future<Void> drainFlowFiles();
+
+ /**
+ * Purges all FlowFiles from the Connector, immediately dropping the data.
+ *
+ * @param requestor the user requesting the purge. This will be recorded
in the associated provenance DROP events.
+ * @return a Future that will be completed when the purge operation is
finished
+ */
+ Future<Void> purgeFlowFiles(String requestor);
+
/**
* Updates the configuration of one of the configuration steps. This
method should only be invoked via the ConnectorRepository.
* @param configurationStepName the name of the configuration step being
set
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
index 08e44e0bc8..ad30ab832b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java
@@ -23,6 +23,8 @@ public enum ConnectorState {
STOPPING,
STOPPED,
DISABLED,
+ DRAINING,
+ PURGING,
PREPARING_FOR_UPDATE,
UPDATING,
UPDATE_FAILED,
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
index 2aa22d978c..10f7fc1e13 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
@@ -18,7 +18,6 @@
package org.apache.nifi.controller.queue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
public class DropFlowFileRequest implements DropFlowFileStatus {
private final String identifier;
@@ -91,7 +90,7 @@ public class DropFlowFileRequest implements
DropFlowFileStatus {
}
@Override
- public Future<Void> getCompletionFuture() {
+ public CompletableFuture<Void> getCompletionFuture() {
return completionFuture;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
index 4c7c5fa45e..73f9c98edc 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java
@@ -26,6 +26,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
public class GhostConnector implements Connector {
private final String identifier;
@@ -119,6 +120,11 @@ public class GhostConnector implements Connector {
return List.of();
}
+ @Override
+ public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
+ return CompletableFuture.completedFuture(null);
+ }
+
@Override
public String toString() {
return "GhostConnector[id=" + identifier + ", type=" +
canonicalClassName + "]";
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 14713ecee9..a35e191c83 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -36,6 +36,7 @@ import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.FlowFileActivity;
import org.apache.nifi.connectable.FlowFileTransferCounts;
import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConfigurationStep;
@@ -64,6 +65,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -442,6 +444,53 @@ public class StandardConnectorNode implements
ConnectorNode {
return stopCompleteFuture;
}
+ @Override
+ public Future<Void> drainFlowFiles() {
+ requireStopped("drain FlowFiles", ConnectorState.DRAINING);
+
+ try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
connectorDetails.getConnector().getClass(), getIdentifier())) {
+ final CompletableFuture<Void> future =
connectorDetails.getConnector().drainFlowFiles(activeFlowContext);
+ final CompletableFuture<Void> stateUpdateFuture =
future.whenComplete((result, failureCause) ->
stateTransition.setCurrentState(ConnectorState.STOPPED));
+ return stateUpdateFuture;
+ } catch (final Throwable t) {
+ stateTransition.setCurrentState(ConnectorState.STOPPED);
+ throw t;
+ }
+ }
+
+ @Override
+ public Future<Void> purgeFlowFiles(final String requestor) {
+ requireStopped("purge FlowFiles", ConnectorState.PURGING);
+
+ try {
+ final String dropRequestId = UUID.randomUUID().toString();
+ final DropFlowFileStatus status =
activeFlowContext.getManagedProcessGroup().dropAllFlowFiles(dropRequestId,
requestor);
+ final CompletableFuture<Void> future =
status.getCompletionFuture();
+ final CompletableFuture<Void> stateUpdateFuture =
future.whenComplete((result, failureCause) ->
stateTransition.setCurrentState(ConnectorState.STOPPED));
+ return stateUpdateFuture;
+ } catch (final Throwable t) {
+ stateTransition.setCurrentState(ConnectorState.STOPPED);
+ throw t;
+ }
+ }
+
+ private void requireStopped(final String action, final ConnectorState
newState) {
+ final ConnectorState desiredState = getDesiredState();
+ if (desiredState != ConnectorState.STOPPED) {
+ throw new IllegalStateException("Cannot " + action + " for " +
this + " because its desired state is currently " + desiredState + "; it must
be STOPPED.");
+ }
+
+ boolean stateUpdated = false;
+ while (!stateUpdated) {
+ final ConnectorState currentState = getCurrentState();
+ if (currentState != ConnectorState.STOPPED) {
+ throw new IllegalStateException("Cannot " + action + " for " +
this + " because its current state is currently " + currentState + "; it must
be STOPPED.");
+ }
+
+ stateUpdated =
stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState);
+ }
+ }
+
private void stopComponent(final FlowEngine scheduler, final
CompletableFuture<Void> stopCompleteFuture) {
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
connectorDetails.getConnector().getClass(), getIdentifier())) {
connectorDetails.getConnector().stop(activeFlowContext);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
index 2d163e5e80..da9836a102 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java
@@ -25,6 +25,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
public class BlockingConnector implements Connector {
@@ -124,4 +125,8 @@ public class BlockingConnector implements Connector {
return List.of();
}
+ @Override
+ public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
index 6443523e43..f2dbbe65c7 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java
@@ -26,6 +26,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
public class SleepingConnector implements Connector {
private final Duration sleepDuration;
@@ -117,4 +118,9 @@ public class SleepingConnector implements Connector {
public List<AllowableValue> fetchAllowableValues(final String stepName,
final String propertyName, final FlowContext workingContext) {
return List.of();
}
+
+ @Override
+ public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
index abdbfd8f7f..d2f8ce2dfc 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
@@ -42,6 +42,8 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
@@ -78,6 +80,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -88,6 +91,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import static java.util.Objects.requireNonNull;
@@ -410,7 +417,8 @@ public class StandardConnectorNodeIT {
.anyMatch(result -> result.getSubject() != null &&
result.getSubject().contains("First Primary Color"));
assertTrue(hasColorError);
- final ConnectorConfiguration validConfig =
createFileAndColorsConfiguration(".", "red");
+ final File colorsFile = new File("src/test/resources/colors.txt");
+ final ConnectorConfiguration validConfig =
createFileAndColorsConfiguration(colorsFile.getAbsolutePath(), "red");
configure(connectorNode, validConfig);
final ValidationState updatedValidationState =
connectorNode.performValidation();
@@ -439,6 +447,58 @@ public class StandardConnectorNodeIT {
assertEquals(1, validationState.getValidationErrors().size());
}
+ @Test
+ public void testPurgeFlowFilesEmptiesQueues() throws FlowUpdateException,
ExecutionException, InterruptedException, TimeoutException {
+ final ConnectorNode connectorNode = initializeDynamicFlowConnector();
+ final ProcessGroup rootGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+
+ final Connection connection = queueDataBySource(rootGroup, "Create
FlowFile");
+ assertEquals(1, connection.getFlowFileQueue().size().getObjectCount());
+
+ final Future<Void> purgeFuture =
connectorNode.purgeFlowFiles("test-user");
+ purgeFuture.get(10, TimeUnit.SECONDS);
+
+ assertTrue(connection.getFlowFileQueue().isEmpty());
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+ }
+
+ @Test
+ public void testPurgeFlowFilesMultipleQueues() throws FlowUpdateException,
ExecutionException, InterruptedException, TimeoutException {
+ final ConnectorNode connectorNode = initializeDynamicFlowConnector();
+ final ProcessGroup rootGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+
+ final Connection connection1 = queueDataBySource(rootGroup, "Create
FlowFile");
+ final Connection connection2 = queueDataByDestination(rootGroup,
"Terminate FlowFile");
+ assertEquals(1,
connection1.getFlowFileQueue().size().getObjectCount());
+ assertEquals(1,
connection2.getFlowFileQueue().size().getObjectCount());
+
+ final Future<Void> purgeFuture =
connectorNode.purgeFlowFiles("test-user");
+ purgeFuture.get(10, TimeUnit.SECONDS);
+
+ assertTrue(connection1.getFlowFileQueue().isEmpty());
+ assertTrue(connection2.getFlowFileQueue().isEmpty());
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+ }
+
+ @Test
+ public void testPurgeFlowFilesRequiresStoppedState() throws
FlowUpdateException {
+ final ConnectorNode connectorNode = initializeDynamicFlowConnector();
+ final ProcessGroup rootGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+ queueDataBySource(rootGroup, "Create FlowFile");
+
+ connectorNode.enable();
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+ connectorNode.start(componentLifecycleThreadPool);
+ assertEquals(ConnectorState.RUNNING, connectorNode.getDesiredState());
+
+ final IllegalStateException exception =
assertThrows(IllegalStateException.class, () ->
connectorNode.purgeFlowFiles("test-user"));
+ assertTrue(exception.getMessage().contains("must be STOPPED"));
+
+ connectorNode.stop(componentLifecycleThreadPool);
+ }
+
+
private List<String> getConfigurationStepNames(final ConnectorNode
connectorNode) {
return connectorNode.getConfigurationSteps().stream()
.map(ConfigurationStep::getName)
@@ -484,6 +544,20 @@ public class StandardConnectorNodeIT {
when(flowFileQueue.getLoadBalanceStrategy()).thenReturn(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE);
when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS);
+ // Mock dropFlowFiles to clear the list and return a completed status
+ when(flowFileQueue.dropFlowFiles(anyString(),
anyString())).thenAnswer(invocation -> {
+ final String requestId = invocation.getArgument(0);
+ final int originalCount = flowFileList.size();
+ flowFileList.clear();
+
+ final DropFlowFileRequest dropRequest = new
DropFlowFileRequest(requestId);
+ dropRequest.setOriginalSize(new QueueSize(originalCount,
originalCount));
+ dropRequest.setCurrentSize(new QueueSize(0, 0));
+ dropRequest.setDroppedSize(new QueueSize(originalCount,
originalCount));
+ dropRequest.setState(DropFlowFileState.COMPLETE);
+ return dropRequest;
+ });
+
final FlowFileQueueFactory flowFileQueueFactory =
(loadBalanceStrategy, partitioningAttribute, processGroup) -> flowFileQueue;
final Connection connection = new
StandardConnection.Builder(processScheduler)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
index a1490409da..bf26c1f7ed 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
@@ -494,6 +494,85 @@ public class TestStandardConnectorNode {
assertEquals("The property value is invalid",
failedResult.getExplanation());
}
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.SECONDS)
+ public void testDrainFlowFilesTransitionsStateToDraining() throws
FlowUpdateException {
+ final CompletableFuture<Void> drainCompletionFuture = new
CompletableFuture<>();
+ final DrainBlockingConnector drainBlockingConnector = new
DrainBlockingConnector(drainCompletionFuture);
+ final StandardConnectorNode connectorNode =
createConnectorNode(drainBlockingConnector);
+
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+ final Future<Void> drainFuture = connectorNode.drainFlowFiles();
+
+ assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+ assertFalse(drainFuture.isDone());
+
+ drainCompletionFuture.complete(null);
+
+ try {
+ drainFuture.get(2, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ throw new RuntimeException("Drain future failed to complete", e);
+ }
+
+ assertTrue(drainFuture.isDone());
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.SECONDS)
+ public void testDrainFlowFilesFutureDoesNotCompleteUntilDrainFinishes()
throws FlowUpdateException, InterruptedException {
+ final CompletableFuture<Void> drainCompletionFuture = new
CompletableFuture<>();
+ final DrainBlockingConnector drainBlockingConnector = new
DrainBlockingConnector(drainCompletionFuture);
+ final StandardConnectorNode connectorNode =
createConnectorNode(drainBlockingConnector);
+
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+
+ final Future<Void> drainFuture = connectorNode.drainFlowFiles();
+ assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+ Thread.sleep(200);
+ assertFalse(drainFuture.isDone());
+ assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+ drainCompletionFuture.complete(null);
+
+ try {
+ drainFuture.get(2, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ throw new RuntimeException("Drain future failed to complete", e);
+ }
+
+ assertTrue(drainFuture.isDone());
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.SECONDS)
+ public void testDrainFlowFilesStateTransitionsBackToStoppedOnCompletion()
throws FlowUpdateException {
+ final CompletableFuture<Void> drainCompletionFuture = new
CompletableFuture<>();
+ final DrainBlockingConnector drainBlockingConnector = new
DrainBlockingConnector(drainCompletionFuture);
+ final StandardConnectorNode connectorNode =
createConnectorNode(drainBlockingConnector);
+
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+ assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState());
+
+ final Future<Void> drainFuture = connectorNode.drainFlowFiles();
+ assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState());
+
+ drainCompletionFuture.complete(null);
+
+ try {
+ drainFuture.get(2, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ throw new RuntimeException("Drain future failed to complete", e);
+ }
+
+ assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
+ assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState());
+ }
+
private StandardConnectorNode createConnectorNode() throws
FlowUpdateException {
final SleepingConnector sleepingConnector = new
SleepingConnector(Duration.ofMillis(1));
return createConnectorNode(sleepingConnector);
@@ -669,4 +748,47 @@ public class TestStandardConnectorNode {
}
}
+ /**
+ * Test connector that allows control over when drainFlowFiles completes
via a CompletableFuture
+ */
+ private static class DrainBlockingConnector extends AbstractConnector {
+ private final CompletableFuture<Void> drainCompletionFuture;
+
+ public DrainBlockingConnector(final CompletableFuture<Void>
drainCompletionFuture) {
+ this.drainCompletionFuture = drainCompletionFuture;
+ }
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ return null;
+ }
+
+ @Override
+ public void prepareForUpdate(final FlowContext workingContext, final
FlowContext activeContext) {
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of();
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingContext, final
FlowContext activeContext) {
+ }
+
+ @Override
+ protected void onStepConfigured(final String stepName, final
FlowContext workingContext) {
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final
String stepName, final Map<String, String> overrides, final FlowContext
flowContext) {
+ return List.of();
+ }
+
+ @Override
+ public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
+ return drainCompletionFuture;
+ }
+ }
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
index b4b7c7d9be..4701b06d68 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java
@@ -31,6 +31,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
* A simple no-op Connector implementation for testing purposes.
@@ -159,4 +160,8 @@ public class NopConnector implements Connector {
return List.of();
}
+ @Override
+ public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Another_Test_Step.md
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md
similarity index 100%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Another_Test_Step.md
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Test_Step.md
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md
similarity index 100%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Test_Step.md
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt
new file mode 100644
index 0000000000..1fedccaeb9
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt
@@ -0,0 +1,3 @@
+red
+green
+blue
\ No newline at end of file
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
index 61738f428a..ed2fdd6b07 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java
@@ -31,6 +31,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
@Tags({"test", "connector"})
public class DummyConnector implements Connector {
@@ -111,4 +112,9 @@ public class DummyConnector implements Connector {
public List<AllowableValue> fetchAllowableValues(final String stepName,
final String propertyName, final FlowContext flowContext, final String filter) {
return List.of();
}
+
+ @Override
+ public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
new file mode 100644
index 0000000000..de6701159d
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DataQueuingConnector extends AbstractConnector {
+ @Override
+ protected void onStepConfigured(final String stepName, final FlowContext
workingContext) {
+
+ }
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ final VersionedProcessor generate = new VersionedProcessor();
+ generate.setName("GenerateFlowFile");
+
generate.setType("org.apache.nifi.processors.tests.system.GenerateFlowFile");
+ generate.setIdentifier("gen-1");
+ generate.setGroupIdentifier("1234");
+ generate.setProperties(Map.of("File Size", "1 KB"));
+
+ final VersionedProcessor terminate = new VersionedProcessor();
+ terminate.setName("TerminateFlowFile");
+
terminate.setType("org.apache.nifi.processors.tests.system.TerminateFlowFile");
+ terminate.setIdentifier("term-1");
+ terminate.setGroupIdentifier("1234");
+ terminate.setScheduledState(ScheduledState.DISABLED);
+
+ final ConnectableComponent source = new ConnectableComponent();
+ source.setId(generate.getIdentifier());
+ source.setType(ConnectableComponentType.PROCESSOR);
+ source.setGroupId("1234");
+
+ final ConnectableComponent destination = new ConnectableComponent();
+ destination.setId(terminate.getIdentifier());
+ destination.setType(ConnectableComponentType.PROCESSOR);
+ destination.setGroupId("1234");
+
+ final VersionedConnection connection = new VersionedConnection();
+ connection.setSource(source);
+ connection.setDestination(destination);
+ connection.setGroupIdentifier("1234");
+ connection.setIdentifier("generate-to-terminate-1");
+
+ final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+ rootGroup.setName("Data Queuing Connector");
+ rootGroup.setIdentifier("1234");
+ rootGroup.setProcessors(Set.of(generate, terminate));
+ rootGroup.setConnections(Set.of(connection));
+
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setFlowContents(rootGroup);
+ return flow;
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final Map<String, String> propertyValueOverrides, final FlowContext
flowContext) {
+ return List.of();
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of();
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingFlowContext, final
FlowContext activeFlowContext) {
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index 85b4fb889a..c8e0d0ed64 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -15,5 +15,4 @@
org.apache.nifi.connectors.tests.system.NopConnector
org.apache.nifi.connectors.tests.system.AssetConnector
-
-
+org.apache.nifi.connectors.tests.system.DataQueuingConnector
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 7a16c754c9..51272e8b8e 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -80,6 +80,7 @@ import org.apache.nifi.web.api.entity.ConfigurationStepEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.ConnectorsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
@@ -397,6 +398,24 @@ public class NiFiClientUtil {
}
}
+ public void stopConnectors() throws NiFiClientException, IOException,
InterruptedException {
+ final ConnectorsEntity connectorsEntity =
nifiClient.getFlowClient().getConnectors();
+ for (final ConnectorEntity connector :
connectorsEntity.getConnectors()) {
+ getConnectorClient().stopConnector(connector);
+ waitForConnectorStopped(connector.getId());
+ }
+ }
+
+ public void stopConnector(final ConnectorEntity connectorEntity) throws
NiFiClientException, IOException, InterruptedException {
+ stopConnector(connectorEntity.getId());
+ }
+
+ public void stopConnector(final String connectorId) throws
NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity entity =
getConnectorClient().getConnector(connectorId);
+ getConnectorClient().stopConnector(entity);
+ waitForConnectorStopped(connectorId);
+ }
+
public void waitForConnectorStopped(final String connectorId) throws
NiFiClientException, IOException, InterruptedException {
waitForConnectorState(connectorId, ConnectorState.STOPPED);
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 146731b573..b73814250f 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -44,7 +44,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -62,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.net.ssl.SSLContext;
@ExtendWith(TroubleshootingTestWatcher.class)
@Timeout(value = 5, unit = TimeUnit.MINUTES)
@@ -261,6 +261,7 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
getClientUtil().disableControllerLevelServices();
getClientUtil().disableFlowAnalysisRules();
getClientUtil().stopTransmitting("root");
+ getClientUtil().stopConnectors();
getClientUtil().deleteAll("root");
getClientUtil().deleteControllerLevelServices();
getClientUtil().deleteReportingTasks();
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
index 9016c504ce..b2309891dd 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
@@ -138,5 +138,3 @@ public class ClusteredConnectorAssetsIT extends
ConnectorAssetsIT {
connectorClient.deleteConnector(connector);
}
}
-
-
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
index 2727203cf8..b07646dd17 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java
@@ -17,20 +17,28 @@
package org.apache.nifi.tests.system.connectors;
+import jakarta.ws.rs.NotFoundException;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.toolkit.client.ConnectorClient;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.NodeEntity;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
public class ClusteredConnectorIT extends ConnectorCrudIT {
private static final Logger logger =
LoggerFactory.getLogger(ClusteredConnectorIT.class);
@@ -109,6 +117,90 @@ public class ClusteredConnectorIT extends ConnectorCrudIT {
logger.info("Validated active and working configuration on Node 2");
}
+ @Test
+ public void testDeleteConnectorOnConnect() throws NiFiClientException,
IOException, InterruptedException {
+ // Create Connector
+ final ConnectorEntity connector =
getClientUtil().createConnector("DataQueuingConnector");
+ assertNotNull(connector);
+
+ // Disconnect node 2
+ disconnectNode(2);
+
+ // Should not be able to delete connector
+ final ConnectorClient connectorClient =
getNifiClient().getConnectorClient();
+ assertThrows(NiFiClientException.class, () ->
connectorClient.deleteConnector(connector));
+
+ final NodeEntity node2Entity = getNodeEntity(2);
+
getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId());
+
+ // Should now be able to delete connector
+ connectorClient.deleteConnector(connector);
+
+ // Should now be able to add node 2 back
+ getNiFiInstance().getNodeInstance(2).stop();
+ getNiFiInstance().getNodeInstance(2).start(true);
+ waitForAllNodesConnected();
+
+ switchClientToNode(2);
+
+ // We should get a 404
+ try {
+
getNifiClient().getConnectorClient().getConnector(connector.getId());
+ fail("Expected NiFiClientException but it was not thrown");
+ } catch (final NiFiClientException e) {
+ assertInstanceOf(NotFoundException.class, e.getCause());
+ }
+ }
+
+
+ @Test
+ public void testDeleteConnectorOnConnectWithDataQueued() throws
NiFiClientException, IOException, InterruptedException {
+ // Create Connector
+ final ConnectorEntity connector =
getClientUtil().createConnector("DataQueuingConnector");
+ assertNotNull(connector);
+
+ getNifiClient().getConnectorClient().startConnector(connector);
+
+ Thread.sleep(1000L); // Wait 1 second to allow some data to queue
+
+ // Disconnect node 2
+ disconnectNode(2);
+ getNiFiInstance().getNodeInstance(2).stop();
+
+ // Should not be able to delete connector
+ final ConnectorClient connectorClient =
getNifiClient().getConnectorClient();
+ assertThrows(NiFiClientException.class, () ->
connectorClient.deleteConnector(connector));
+
+ // Remove node 2 from cluster.
+ final NodeEntity node2Entity = getNodeEntity(2);
+
getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId());
+
+ // We cannot delete the connector directly because it has data queued.
Stop Node 1, delete the flow.json.gz file, and restart Node 1.
+ getNiFiInstance().getNodeInstance(1).stop();
+ final File node1InstanceDir =
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
+ final File node1ConfDir = new File(node1InstanceDir, "conf");
+ final File flowJson = new File(node1ConfDir, "flow.json.gz");
+ Files.delete(flowJson.toPath());
+
+ getNiFiInstance().getNodeInstance(1).start(true);
+ waitForCoordinatorElected();
+
+ // Should now be able to add node 2 back
+ getNiFiInstance().getNodeInstance(2).start(true);
+ waitForAllNodesConnected();
+
+ switchClientToNode(2);
+
+ // We should get a 404
+ try {
+
getNifiClient().getConnectorClient().getConnector(connector.getId());
+ fail("Expected NiFiClientException but it was not thrown");
+ } catch (final NiFiClientException e) {
+ assertInstanceOf(NotFoundException.class, e.getCause());
+ }
+ }
+
+
private void assertActiveConfigurationValue(final ConnectorEntity
connector, final String propertyName, final String expectedValue) {
final String actualValue =
getConfigurationValue(connector.getComponent().getActiveConfiguration(),
propertyName);
assertEquals(expectedValue, actualValue, "Active configuration
property '" + propertyName + "' did not match expected value");
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
index 085177dad4..78728935cc 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
@@ -148,4 +148,14 @@ public class ConnectorCrudIT extends NiFiSystemIT {
getClientUtil().applyConnectorUpdate(connector);
getClientUtil().waitForValidConnector(connector.getId());
}
+
+ @Test
+ public void testDeleteConnectorNoDataQueued() throws NiFiClientException,
IOException {
+ // Create Connector
+ final ConnectorEntity connector =
getClientUtil().createConnector("DataQueuingConnector");
+ assertNotNull(connector);
+
+ // Delete
+ getNifiClient().getConnectorClient().deleteConnector(connector);
+ }
}
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
index e483f1c91a..57fa3c74f5 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java
@@ -20,6 +20,7 @@ import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.ClusterSummaryEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+import org.apache.nifi.web.api.entity.ConnectorsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
@@ -229,4 +230,11 @@ public interface FlowClient {
* @return list of flows
*/
VersionedFlowsEntity getFlowRegistryFlows(String registryClientId, String
branch, String bucket) throws NiFiClientException, IOException;
+
+ /**
+ * Retrieves all of the Connectors.
+ *
+ * @return the list of Connectors
+ */
+ ConnectorsEntity getConnectors() throws NiFiClientException, IOException;
}
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
index 114c4e01c2..f0cb9450ca 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java
@@ -32,6 +32,7 @@ import
org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.ClusterSummaryEntity;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+import org.apache.nifi.web.api.entity.ConnectorsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
@@ -400,4 +401,12 @@ public class JerseyFlowClient extends AbstractJerseyClient
implements FlowClient
return getRequestBuilder(target).get(VersionedFlowsEntity.class);
});
}
+
+ @Override
+ public ConnectorsEntity getConnectors() throws NiFiClientException,
IOException {
+ return executeAction("Error retrieving Connectors", () -> {
+ final WebTarget target = flowTarget.path("/connectors");
+ return getRequestBuilder(target).get(ConnectorsEntity.class);
+ });
+ }
}