This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi-api.git
commit a63bc4e80276f52e5f5da024b5be6655d51818a9 Author: Mark Payne <[email protected]> AuthorDate: Fri Jan 9 13:01:26 2026 -0500 NIFI-15427: Added drainFlowFiles(FlowContext) method to Connector (#42) * NIFI-15427: Added drainFlowFiles(FlowContext) method to Connector * NIFI-15427: Added missing @Override annotation * NIFI-15427: Ensured proper error handling when draining FlowFiles --- .../apache/nifi/components/connector/AbstractConnector.java | 11 +++++++++++ .../java/org/apache/nifi/components/connector/Connector.java | 9 +++++++++ 2 files changed, 20 insertions(+) diff --git a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java index 8045bc6..d965699 100644 --- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java +++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java @@ -156,6 +156,7 @@ public abstract class AbstractConnector implements Connector { * @param flowContext the FlowContext to use for drainage * @return a CompletableFuture that will be completed when drainage is complete */ + @Override public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { final CompletableFuture<Void> result = new CompletableFuture<>(); final QueueSize initialQueueSize = flowContext.getRootGroup().getQueueSize(); @@ -196,6 +197,11 @@ public abstract class AbstractConnector implements Connector { result.completeExceptionally(new RuntimeException("Failed to start non-source processors while draining FlowFiles", e.getCause())); } + }).exceptionally(throwable -> { + if (!result.isDone()) { + result.completeExceptionally(new RuntimeException("Failed to stop source processors while draining FlowFiles", throwable)); + } + return null; }); startNonSourceFuture.thenRun(() -> { @@ -253,6 +259,11 @@ public abstract class AbstractConnector implements Connector { if (!result.isDone()) { result.complete(null); } + }).exceptionally(throwable -> { + if (!result.isDone()) { + result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", throwable)); + } + return null; }); return result; diff --git a/src/main/java/org/apache/nifi/components/connector/Connector.java b/src/main/java/org/apache/nifi/components/connector/Connector.java index ce88079..c0dd957 100644 --- a/src/main/java/org/apache/nifi/components/connector/Connector.java +++ b/src/main/java/org/apache/nifi/components/connector/Connector.java @@ -25,6 +25,7 @@ import org.apache.nifi.flow.VersionedExternalFlow; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * <p> @@ -195,4 +196,12 @@ public interface Connector { List<AllowableValue> fetchAllowableValues(String stepName, String propertyName, FlowContext flowContext); List<AllowableValue> fetchAllowableValues(String stepName, String propertyName, FlowContext flowContext, String filter); + + /** + * Drains any in-flight FlowFiles from the flow associated with the given Flow Context by processing the existing data + * but not accepting any new data. + * @param flowContext the flow context + * @return a Future that will be completed when the draining is complete + */ + CompletableFuture<Void> drainFlowFiles(FlowContext flowContext); }
