This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ce1b4150a04731d81ad52018c74301e4b4af9092 Author: Mark Payne <[email protected]> AuthorDate: Fri Jan 9 13:02:05 2026 -0500 NIFI-15427: Added abiliy to drop flowfiles / drain flowfiles from a C… (#10730) * NIFI-15427: Added abiliy to drop flowfiles / drain flowfiles from a Connector; added some system tests to verify existing behavior; fixed existing issue with IT * NIFI-15427: Fixed Exception message in case unable to purge FlowFiles due to Connector state * NIFI-15427: Ensured thread safety of state transitions when draining/purging FlowFiles * NIFI-15427: Addressed review feedback --- .../nifi/controller/queue/DropFlowFileStatus.java | 4 +- .../nifi/components/connector/ConnectorNode.java | 13 +++ .../nifi/components/connector/ConnectorState.java | 2 + .../nifi/controller/queue/DropFlowFileRequest.java | 3 +- .../nifi/components/connector/GhostConnector.java | 6 + .../connector/StandardConnectorNode.java | 49 +++++++++ .../components/connector/BlockingConnector.java | 5 + .../components/connector/SleepingConnector.java | 6 + .../connector/StandardConnectorNodeIT.java | 76 ++++++++++++- .../connector/TestStandardConnectorNode.java | 122 +++++++++++++++++++++ .../apache/nifi/controller/flow/NopConnector.java | 5 + .../org.example.TestConnector/Another_Test_Step.md | 0 .../org.example.TestConnector/Test_Step.md | 0 .../src/test/resources/colors.txt | 3 + .../java/org/apache/nifi/nar/DummyConnector.java | 6 + .../tests/system/DataQueuingConnector.java | 98 +++++++++++++++++ .../org.apache.nifi.components.connector.Connector | 3 +- .../apache/nifi/tests/system/NiFiClientUtil.java | 19 ++++ .../org/apache/nifi/tests/system/NiFiSystemIT.java | 1 + .../connectors/ClusteredConnectorAssetsIT.java | 2 - .../system/connectors/ClusteredConnectorIT.java | 92 ++++++++++++++++ .../tests/system/connectors/ConnectorCrudIT.java | 10 ++ .../org/apache/nifi/toolkit/client/FlowClient.java | 8 ++ .../nifi/toolkit/client/impl/JerseyFlowClient.java | 9 ++ 24 files changed, 533 insertions(+), 9 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java index 3233465e3c..13d25bb2ed 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java @@ -17,7 +17,7 @@ package org.apache.nifi.controller.queue; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; /** * Represents the status of a Drop FlowFile Request that has been issued to @@ -81,5 +81,5 @@ public interface DropFlowFileStatus { /** * @return a Future that can be used to determine when the drop operation has completed */ - Future<Void> getCompletionFuture(); + CompletableFuture<Void> getCompletionFuture(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java index b7206edbd3..2bd111c752 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java @@ -197,6 +197,19 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent */ Future<Void> stop(FlowEngine scheduler); + /** + * Allows the Connector to drain any in-flight data while not accepting any new data. + */ + Future<Void> drainFlowFiles(); + + /** + * Purges all FlowFiles from the Connector, immediately dropping the data. + * + * @param requestor the user requesting the purge. This will be recorded in the associated provenance DROP events. + * @return a Future that will be completed when the purge operation is finished + */ + Future<Void> purgeFlowFiles(String requestor); + /** * Updates the configuration of one of the configuration steps. This method should only be invoked via the ConnectorRepository. * @param configurationStepName the name of the configuration step being set diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java index 08e44e0bc8..ad30ab832b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorState.java @@ -23,6 +23,8 @@ public enum ConnectorState { STOPPING, STOPPED, DISABLED, + DRAINING, + PURGING, PREPARING_FOR_UPDATE, UPDATING, UPDATE_FAILED, diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java index 2aa22d978c..10f7fc1e13 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java @@ -18,7 +18,6 @@ package org.apache.nifi.controller.queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; public class DropFlowFileRequest implements DropFlowFileStatus { private final String identifier; @@ -91,7 +90,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus { } @Override - public Future<Void> getCompletionFuture() { + public CompletableFuture<Void> getCompletionFuture() { return completionFuture; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java index 4c7c5fa45e..73f9c98edc 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/GhostConnector.java @@ -26,6 +26,7 @@ import org.apache.nifi.flow.VersionedExternalFlow; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; public class GhostConnector implements Connector { private final String identifier; @@ -119,6 +120,11 @@ public class GhostConnector implements Connector { return List.of(); } + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + return CompletableFuture.completedFuture(null); + } + @Override public String toString() { return "GhostConnector[id=" + identifier + ", type=" + canonicalClassName + "]"; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index 14713ecee9..a35e191c83 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -36,6 +36,7 @@ import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.FlowFileActivity; import org.apache.nifi.connectable.FlowFileTransferCounts; import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedConfigurationStep; @@ -64,6 +65,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -442,6 +444,53 @@ public class StandardConnectorNode implements ConnectorNode { return stopCompleteFuture; } + @Override + public Future<Void> drainFlowFiles() { + requireStopped("drain FlowFiles", ConnectorState.DRAINING); + + try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, connectorDetails.getConnector().getClass(), getIdentifier())) { + final CompletableFuture<Void> future = connectorDetails.getConnector().drainFlowFiles(activeFlowContext); + final CompletableFuture<Void> stateUpdateFuture = future.whenComplete((result, failureCause) -> stateTransition.setCurrentState(ConnectorState.STOPPED)); + return stateUpdateFuture; + } catch (final Throwable t) { + stateTransition.setCurrentState(ConnectorState.STOPPED); + throw t; + } + } + + @Override + public Future<Void> purgeFlowFiles(final String requestor) { + requireStopped("purge FlowFiles", ConnectorState.PURGING); + + try { + final String dropRequestId = UUID.randomUUID().toString(); + final DropFlowFileStatus status = activeFlowContext.getManagedProcessGroup().dropAllFlowFiles(dropRequestId, requestor); + final CompletableFuture<Void> future = status.getCompletionFuture(); + final CompletableFuture<Void> stateUpdateFuture = future.whenComplete((result, failureCause) -> stateTransition.setCurrentState(ConnectorState.STOPPED)); + return stateUpdateFuture; + } catch (final Throwable t) { + stateTransition.setCurrentState(ConnectorState.STOPPED); + throw t; + } + } + + private void requireStopped(final String action, final ConnectorState newState) { + final ConnectorState desiredState = getDesiredState(); + if (desiredState != ConnectorState.STOPPED) { + throw new IllegalStateException("Cannot " + action + " for " + this + " because its desired state is currently " + desiredState + "; it must be STOPPED."); + } + + boolean stateUpdated = false; + while (!stateUpdated) { + final ConnectorState currentState = getCurrentState(); + if (currentState != ConnectorState.STOPPED) { + throw new IllegalStateException("Cannot " + action + " for " + this + " because its current state is currently " + currentState + "; it must be STOPPED."); + } + + stateUpdated = stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState); + } + } + private void stopComponent(final FlowEngine scheduler, final CompletableFuture<Void> stopCompleteFuture) { try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, connectorDetails.getConnector().getClass(), getIdentifier())) { connectorDetails.getConnector().stop(activeFlowContext); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java index 2d163e5e80..da9836a102 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/BlockingConnector.java @@ -25,6 +25,7 @@ import org.apache.nifi.flow.VersionedExternalFlow; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; public class BlockingConnector implements Connector { @@ -124,4 +125,8 @@ public class BlockingConnector implements Connector { return List.of(); } + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + return CompletableFuture.completedFuture(null); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java index 6443523e43..f2dbbe65c7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/SleepingConnector.java @@ -26,6 +26,7 @@ import org.apache.nifi.flow.VersionedExternalFlow; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; public class SleepingConnector implements Connector { private final Duration sleepDuration; @@ -117,4 +118,9 @@ public class SleepingConnector implements Connector { public List<AllowableValue> fetchAllowableValues(final String stepName, final String propertyName, final FlowContext workingContext) { return List.of(); } + + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + return CompletableFuture.completedFuture(null); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java index abdbfd8f7f..d2f8ce2dfc 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java @@ -42,6 +42,8 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.flow.StandardFlowManager; import org.apache.nifi.controller.flowanalysis.FlowAnalyzer; +import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueueFactory; import org.apache.nifi.controller.queue.LoadBalanceCompression; @@ -78,6 +80,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -88,6 +91,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import static java.util.Objects.requireNonNull; @@ -410,7 +417,8 @@ public class StandardConnectorNodeIT { .anyMatch(result -> result.getSubject() != null && result.getSubject().contains("First Primary Color")); assertTrue(hasColorError); - final ConnectorConfiguration validConfig = createFileAndColorsConfiguration(".", "red"); + final File colorsFile = new File("src/test/resources/colors.txt"); + final ConnectorConfiguration validConfig = createFileAndColorsConfiguration(colorsFile.getAbsolutePath(), "red"); configure(connectorNode, validConfig); final ValidationState updatedValidationState = connectorNode.performValidation(); @@ -439,6 +447,58 @@ public class StandardConnectorNodeIT { assertEquals(1, validationState.getValidationErrors().size()); } + @Test + public void testPurgeFlowFilesEmptiesQueues() throws FlowUpdateException, ExecutionException, InterruptedException, TimeoutException { + final ConnectorNode connectorNode = initializeDynamicFlowConnector(); + final ProcessGroup rootGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); + + final Connection connection = queueDataBySource(rootGroup, "Create FlowFile"); + assertEquals(1, connection.getFlowFileQueue().size().getObjectCount()); + + final Future<Void> purgeFuture = connectorNode.purgeFlowFiles("test-user"); + purgeFuture.get(10, TimeUnit.SECONDS); + + assertTrue(connection.getFlowFileQueue().isEmpty()); + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + } + + @Test + public void testPurgeFlowFilesMultipleQueues() throws FlowUpdateException, ExecutionException, InterruptedException, TimeoutException { + final ConnectorNode connectorNode = initializeDynamicFlowConnector(); + final ProcessGroup rootGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); + + final Connection connection1 = queueDataBySource(rootGroup, "Create FlowFile"); + final Connection connection2 = queueDataByDestination(rootGroup, "Terminate FlowFile"); + assertEquals(1, connection1.getFlowFileQueue().size().getObjectCount()); + assertEquals(1, connection2.getFlowFileQueue().size().getObjectCount()); + + final Future<Void> purgeFuture = connectorNode.purgeFlowFiles("test-user"); + purgeFuture.get(10, TimeUnit.SECONDS); + + assertTrue(connection1.getFlowFileQueue().isEmpty()); + assertTrue(connection2.getFlowFileQueue().isEmpty()); + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + } + + @Test + public void testPurgeFlowFilesRequiresStoppedState() throws FlowUpdateException { + final ConnectorNode connectorNode = initializeDynamicFlowConnector(); + final ProcessGroup rootGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup(); + queueDataBySource(rootGroup, "Create FlowFile"); + + connectorNode.enable(); + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + + connectorNode.start(componentLifecycleThreadPool); + assertEquals(ConnectorState.RUNNING, connectorNode.getDesiredState()); + + final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> connectorNode.purgeFlowFiles("test-user")); + assertTrue(exception.getMessage().contains("must be STOPPED")); + + connectorNode.stop(componentLifecycleThreadPool); + } + + private List<String> getConfigurationStepNames(final ConnectorNode connectorNode) { return connectorNode.getConfigurationSteps().stream() .map(ConfigurationStep::getName) @@ -484,6 +544,20 @@ public class StandardConnectorNodeIT { when(flowFileQueue.getLoadBalanceStrategy()).thenReturn(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE); when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS); + // Mock dropFlowFiles to clear the list and return a completed status + when(flowFileQueue.dropFlowFiles(anyString(), anyString())).thenAnswer(invocation -> { + final String requestId = invocation.getArgument(0); + final int originalCount = flowFileList.size(); + flowFileList.clear(); + + final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestId); + dropRequest.setOriginalSize(new QueueSize(originalCount, originalCount)); + dropRequest.setCurrentSize(new QueueSize(0, 0)); + dropRequest.setDroppedSize(new QueueSize(originalCount, originalCount)); + dropRequest.setState(DropFlowFileState.COMPLETE); + return dropRequest; + }); + final FlowFileQueueFactory flowFileQueueFactory = (loadBalanceStrategy, partitioningAttribute, processGroup) -> flowFileQueue; final Connection connection = new StandardConnection.Builder(processScheduler) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java index a1490409da..bf26c1f7ed 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java @@ -494,6 +494,85 @@ public class TestStandardConnectorNode { assertEquals("The property value is invalid", failedResult.getExplanation()); } + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS) + public void testDrainFlowFilesTransitionsStateToDraining() throws FlowUpdateException { + final CompletableFuture<Void> drainCompletionFuture = new CompletableFuture<>(); + final DrainBlockingConnector drainBlockingConnector = new DrainBlockingConnector(drainCompletionFuture); + final StandardConnectorNode connectorNode = createConnectorNode(drainBlockingConnector); + + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + + final Future<Void> drainFuture = connectorNode.drainFlowFiles(); + + assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState()); + assertFalse(drainFuture.isDone()); + + drainCompletionFuture.complete(null); + + try { + drainFuture.get(2, TimeUnit.SECONDS); + } catch (final Exception e) { + throw new RuntimeException("Drain future failed to complete", e); + } + + assertTrue(drainFuture.isDone()); + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS) + public void testDrainFlowFilesFutureDoesNotCompleteUntilDrainFinishes() throws FlowUpdateException, InterruptedException { + final CompletableFuture<Void> drainCompletionFuture = new CompletableFuture<>(); + final DrainBlockingConnector drainBlockingConnector = new DrainBlockingConnector(drainCompletionFuture); + final StandardConnectorNode connectorNode = createConnectorNode(drainBlockingConnector); + + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + + final Future<Void> drainFuture = connectorNode.drainFlowFiles(); + assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState()); + + Thread.sleep(200); + assertFalse(drainFuture.isDone()); + assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState()); + + drainCompletionFuture.complete(null); + + try { + drainFuture.get(2, TimeUnit.SECONDS); + } catch (final Exception e) { + throw new RuntimeException("Drain future failed to complete", e); + } + + assertTrue(drainFuture.isDone()); + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS) + public void testDrainFlowFilesStateTransitionsBackToStoppedOnCompletion() throws FlowUpdateException { + final CompletableFuture<Void> drainCompletionFuture = new CompletableFuture<>(); + final DrainBlockingConnector drainBlockingConnector = new DrainBlockingConnector(drainCompletionFuture); + final StandardConnectorNode connectorNode = createConnectorNode(drainBlockingConnector); + + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState()); + + final Future<Void> drainFuture = connectorNode.drainFlowFiles(); + assertEquals(ConnectorState.DRAINING, connectorNode.getCurrentState()); + + drainCompletionFuture.complete(null); + + try { + drainFuture.get(2, TimeUnit.SECONDS); + } catch (final Exception e) { + throw new RuntimeException("Drain future failed to complete", e); + } + + assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); + assertEquals(ConnectorState.STOPPED, connectorNode.getDesiredState()); + } + private StandardConnectorNode createConnectorNode() throws FlowUpdateException { final SleepingConnector sleepingConnector = new SleepingConnector(Duration.ofMillis(1)); return createConnectorNode(sleepingConnector); @@ -669,4 +748,47 @@ public class TestStandardConnectorNode { } } + /** + * Test connector that allows control over when drainFlowFiles completes via a CompletableFuture + */ + private static class DrainBlockingConnector extends AbstractConnector { + private final CompletableFuture<Void> drainCompletionFuture; + + public DrainBlockingConnector(final CompletableFuture<Void> drainCompletionFuture) { + this.drainCompletionFuture = drainCompletionFuture; + } + + @Override + public VersionedExternalFlow getInitialFlow() { + return null; + } + + @Override + public void prepareForUpdate(final FlowContext workingContext, final FlowContext activeContext) { + } + + @Override + public List<ConfigurationStep> getConfigurationSteps() { + return List.of(); + } + + @Override + public void applyUpdate(final FlowContext workingContext, final FlowContext activeContext) { + } + + @Override + protected void onStepConfigured(final String stepName, final FlowContext workingContext) { + } + + @Override + public List<ConfigVerificationResult> verifyConfigurationStep(final String stepName, final Map<String, String> overrides, final FlowContext flowContext) { + return List.of(); + } + + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + return drainCompletionFuture; + } + } + } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java index b4b7c7d9be..4701b06d68 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/flow/NopConnector.java @@ -31,6 +31,7 @@ import org.apache.nifi.flow.VersionedExternalFlow; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * A simple no-op Connector implementation for testing purposes. @@ -159,4 +160,8 @@ public class NopConnector implements Connector { return List.of(); } + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + return CompletableFuture.completedFuture(null); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Another_Test_Step.md b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md similarity index 100% rename from nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Another_Test_Step.md rename to nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Test_Step.md b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md similarity index 100% rename from nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/step-documentation/org.example.TestConnector/Test_Step.md rename to nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt new file mode 100644 index 0000000000..1fedccaeb9 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/colors.txt @@ -0,0 +1,3 @@ +red +green +blue \ No newline at end of file diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java index 61738f428a..ed2fdd6b07 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/DummyConnector.java @@ -31,6 +31,7 @@ import org.apache.nifi.flow.VersionedExternalFlow; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; @Tags({"test", "connector"}) public class DummyConnector implements Connector { @@ -111,4 +112,9 @@ public class DummyConnector implements Connector { public List<AllowableValue> fetchAllowableValues(final String stepName, final String propertyName, final FlowContext flowContext, final String filter) { return List.of(); } + + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + return CompletableFuture.completedFuture(null); + } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java new file mode 100644 index 0000000000..de6701159d --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.connectors.tests.system; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.connector.AbstractConnector; +import org.apache.nifi.components.connector.ConfigurationStep; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.flow.ConnectableComponent; +import org.apache.nifi.flow.ConnectableComponentType; +import org.apache.nifi.flow.ScheduledState; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DataQueuingConnector extends AbstractConnector { + @Override + protected void onStepConfigured(final String stepName, final FlowContext workingContext) { + + } + + @Override + public VersionedExternalFlow getInitialFlow() { + final VersionedProcessor generate = new VersionedProcessor(); + generate.setName("GenerateFlowFile"); + generate.setType("org.apache.nifi.processors.tests.system.GenerateFlowFile"); + generate.setIdentifier("gen-1"); + generate.setGroupIdentifier("1234"); + generate.setProperties(Map.of("File Size", "1 KB")); + + final VersionedProcessor terminate = new VersionedProcessor(); + terminate.setName("TerminateFlowFile"); + terminate.setType("org.apache.nifi.processors.tests.system.TerminateFlowFile"); + terminate.setIdentifier("term-1"); + terminate.setGroupIdentifier("1234"); + terminate.setScheduledState(ScheduledState.DISABLED); + + final ConnectableComponent source = new ConnectableComponent(); + source.setId(generate.getIdentifier()); + source.setType(ConnectableComponentType.PROCESSOR); + source.setGroupId("1234"); + + final ConnectableComponent destination = new ConnectableComponent(); + destination.setId(terminate.getIdentifier()); + destination.setType(ConnectableComponentType.PROCESSOR); + destination.setGroupId("1234"); + + final VersionedConnection connection = new VersionedConnection(); + connection.setSource(source); + connection.setDestination(destination); + connection.setGroupIdentifier("1234"); + connection.setIdentifier("generate-to-terminate-1"); + + final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); + rootGroup.setName("Data Queuing Connector"); + rootGroup.setIdentifier("1234"); + rootGroup.setProcessors(Set.of(generate, terminate)); + rootGroup.setConnections(Set.of(connection)); + + final VersionedExternalFlow flow = new VersionedExternalFlow(); + flow.setFlowContents(rootGroup); + return flow; + } + + @Override + public List<ConfigVerificationResult> verifyConfigurationStep(final String stepName, final Map<String, String> propertyValueOverrides, final FlowContext flowContext) { + return List.of(); + } + + @Override + public List<ConfigurationStep> getConfigurationSteps() { + return List.of(); + } + + @Override + public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) { + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector index 85b4fb889a..c8e0d0ed64 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector @@ -15,5 +15,4 @@ org.apache.nifi.connectors.tests.system.NopConnector org.apache.nifi.connectors.tests.system.AssetConnector - - +org.apache.nifi.connectors.tests.system.DataQueuingConnector diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 7a16c754c9..51272e8b8e 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -80,6 +80,7 @@ import org.apache.nifi.web.api.entity.ConfigurationStepEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ConnectorEntity; +import org.apache.nifi.web.api.entity.ConnectorsEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; @@ -397,6 +398,24 @@ public class NiFiClientUtil { } } + public void stopConnectors() throws NiFiClientException, IOException, InterruptedException { + final ConnectorsEntity connectorsEntity = nifiClient.getFlowClient().getConnectors(); + for (final ConnectorEntity connector : connectorsEntity.getConnectors()) { + getConnectorClient().stopConnector(connector); + waitForConnectorStopped(connector.getId()); + } + } + + public void stopConnector(final ConnectorEntity connectorEntity) throws NiFiClientException, IOException, InterruptedException { + stopConnector(connectorEntity.getId()); + } + + public void stopConnector(final String connectorId) throws NiFiClientException, IOException, InterruptedException { + final ConnectorEntity entity = getConnectorClient().getConnector(connectorId); + getConnectorClient().stopConnector(entity); + waitForConnectorStopped(connectorId); + } + public void waitForConnectorStopped(final String connectorId) throws NiFiClientException, IOException, InterruptedException { waitForConnectorState(connectorId, ConnectorState.STOPPED); } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index 74999daff1..b73814250f 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -261,6 +261,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { getClientUtil().disableControllerLevelServices(); getClientUtil().disableFlowAnalysisRules(); getClientUtil().stopTransmitting("root"); + getClientUtil().stopConnectors(); getClientUtil().deleteAll("root"); getClientUtil().deleteControllerLevelServices(); getClientUtil().deleteReportingTasks(); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java index 9016c504ce..b2309891dd 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java @@ -138,5 +138,3 @@ public class ClusteredConnectorAssetsIT extends ConnectorAssetsIT { connectorClient.deleteConnector(connector); } } - - diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java index 2727203cf8..b07646dd17 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorIT.java @@ -17,20 +17,28 @@ package org.apache.nifi.tests.system.connectors; +import jakarta.ws.rs.NotFoundException; import org.apache.nifi.tests.system.NiFiInstanceFactory; +import org.apache.nifi.toolkit.client.ConnectorClient; import org.apache.nifi.toolkit.client.NiFiClientException; import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO; import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO; import org.apache.nifi.web.api.entity.ConnectorEntity; +import org.apache.nifi.web.api.entity.NodeEntity; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; public class ClusteredConnectorIT extends ConnectorCrudIT { private static final Logger logger = LoggerFactory.getLogger(ClusteredConnectorIT.class); @@ -109,6 +117,90 @@ public class ClusteredConnectorIT extends ConnectorCrudIT { logger.info("Validated active and working configuration on Node 2"); } + @Test + public void testDeleteConnectorOnConnect() throws NiFiClientException, IOException, InterruptedException { + // Create Connector + final ConnectorEntity connector = getClientUtil().createConnector("DataQueuingConnector"); + assertNotNull(connector); + + // Disconnect node 2 + disconnectNode(2); + + // Should not be able to delete connector + final ConnectorClient connectorClient = getNifiClient().getConnectorClient(); + assertThrows(NiFiClientException.class, () -> connectorClient.deleteConnector(connector)); + + final NodeEntity node2Entity = getNodeEntity(2); + getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId()); + + // Should now be able to delete connector + connectorClient.deleteConnector(connector); + + // Should now be able to add node 2 back + getNiFiInstance().getNodeInstance(2).stop(); + getNiFiInstance().getNodeInstance(2).start(true); + waitForAllNodesConnected(); + + switchClientToNode(2); + + // We should get a 404 + try { + getNifiClient().getConnectorClient().getConnector(connector.getId()); + fail("Expected NiFiClientException but it was not thrown"); + } catch (final NiFiClientException e) { + assertInstanceOf(NotFoundException.class, e.getCause()); + } + } + + + @Test + public void testDeleteConnectorOnConnectWithDataQueued() throws NiFiClientException, IOException, InterruptedException { + // Create Connector + final ConnectorEntity connector = getClientUtil().createConnector("DataQueuingConnector"); + assertNotNull(connector); + + getNifiClient().getConnectorClient().startConnector(connector); + + Thread.sleep(1000L); // Wait 1 second to allow some data to queue + + // Disconnect node 2 + disconnectNode(2); + getNiFiInstance().getNodeInstance(2).stop(); + + // Should not be able to delete connector + final ConnectorClient connectorClient = getNifiClient().getConnectorClient(); + assertThrows(NiFiClientException.class, () -> connectorClient.deleteConnector(connector)); + + // Remove node 2 from cluster. + final NodeEntity node2Entity = getNodeEntity(2); + getNifiClient().getControllerClient().deleteNode(node2Entity.getNode().getNodeId()); + + // We cannot delete the connector directly because it has data queued. Stop Node 1, delete the flow.json.gz file, and restart Node 1. + getNiFiInstance().getNodeInstance(1).stop(); + final File node1InstanceDir = getNiFiInstance().getNodeInstance(1).getInstanceDirectory(); + final File node1ConfDir = new File(node1InstanceDir, "conf"); + final File flowJson = new File(node1ConfDir, "flow.json.gz"); + Files.delete(flowJson.toPath()); + + getNiFiInstance().getNodeInstance(1).start(true); + waitForCoordinatorElected(); + + // Should now be able to add node 2 back + getNiFiInstance().getNodeInstance(2).start(true); + waitForAllNodesConnected(); + + switchClientToNode(2); + + // We should get a 404 + try { + getNifiClient().getConnectorClient().getConnector(connector.getId()); + fail("Expected NiFiClientException but it was not thrown"); + } catch (final NiFiClientException e) { + assertInstanceOf(NotFoundException.class, e.getCause()); + } + } + + private void assertActiveConfigurationValue(final ConnectorEntity connector, final String propertyName, final String expectedValue) { final String actualValue = getConfigurationValue(connector.getComponent().getActiveConfiguration(), propertyName); assertEquals(expectedValue, actualValue, "Active configuration property '" + propertyName + "' did not match expected value"); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java index 085177dad4..78728935cc 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java @@ -148,4 +148,14 @@ public class ConnectorCrudIT extends NiFiSystemIT { getClientUtil().applyConnectorUpdate(connector); getClientUtil().waitForValidConnector(connector.getId()); } + + @Test + public void testDeleteConnectorNoDataQueued() throws NiFiClientException, IOException { + // Create Connector + final ConnectorEntity connector = getClientUtil().createConnector("DataQueuingConnector"); + assertNotNull(connector); + + // Delete + getNifiClient().getConnectorClient().deleteConnector(connector); + } } diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java index e483f1c91a..57fa3c74f5 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/FlowClient.java @@ -20,6 +20,7 @@ import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.ClusterSummaryEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; +import org.apache.nifi.web.api.entity.ConnectorsEntity; import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity; @@ -229,4 +230,11 @@ public interface FlowClient { * @return list of flows */ VersionedFlowsEntity getFlowRegistryFlows(String registryClientId, String branch, String bucket) throws NiFiClientException, IOException; + + /** + * Retrieves all of the Connectors. + * + * @return the list of Connectors + */ + ConnectorsEntity getConnectors() throws NiFiClientException, IOException; } diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java index 114c4e01c2..f0cb9450ca 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyFlowClient.java @@ -32,6 +32,7 @@ import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.ClusterSummaryEntity; import org.apache.nifi.web.api.entity.ComponentEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; +import org.apache.nifi.web.api.entity.ConnectorsEntity; import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity; @@ -400,4 +401,12 @@ public class JerseyFlowClient extends AbstractJerseyClient implements FlowClient return getRequestBuilder(target).get(VersionedFlowsEntity.class); }); } + + @Override + public ConnectorsEntity getConnectors() throws NiFiClientException, IOException { + return executeAction("Error retrieving Connectors", () -> { + final WebTarget target = flowTarget.path("/connectors"); + return getRequestBuilder(target).get(ConnectorsEntity.class); + }); + } }
