This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi-api.git

commit a63bc4e80276f52e5f5da024b5be6655d51818a9
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jan 9 13:01:26 2026 -0500

    NIFI-15427: Added drainFlowFiles(FlowContext) method to Connector (#42)
    
    * NIFI-15427: Added drainFlowFiles(FlowContext) method to Connector
    
    * NIFI-15427: Added missing @Override annotation
    
    * NIFI-15427: Ensured proper error handling when draining FlowFiles
---
 .../apache/nifi/components/connector/AbstractConnector.java   | 11 +++++++++++
 .../java/org/apache/nifi/components/connector/Connector.java  |  9 +++++++++
 2 files changed, 20 insertions(+)

diff --git 
a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java 
b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
index 8045bc6..d965699 100644
--- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
+++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
@@ -156,6 +156,7 @@ public abstract class AbstractConnector implements 
Connector {
      * @param flowContext the FlowContext to use for drainage
      * @return a CompletableFuture that will be completed when drainage is 
complete
      */
+    @Override
     public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
         final CompletableFuture<Void> result = new CompletableFuture<>();
         final QueueSize initialQueueSize = 
flowContext.getRootGroup().getQueueSize();
@@ -196,6 +197,11 @@ public abstract class AbstractConnector implements 
Connector {
 
                 result.completeExceptionally(new RuntimeException("Failed to 
start non-source processors while draining FlowFiles", e.getCause()));
             }
+        }).exceptionally(throwable -> {
+            if (!result.isDone()) {
+                result.completeExceptionally(new RuntimeException("Failed to 
stop source processors while draining FlowFiles", throwable));
+            }
+            return null;
         });
 
         startNonSourceFuture.thenRun(() -> {
@@ -253,6 +259,11 @@ public abstract class AbstractConnector implements 
Connector {
             if (!result.isDone()) {
                 result.complete(null);
             }
+        }).exceptionally(throwable -> {
+            if (!result.isDone()) {
+                result.completeExceptionally(new RuntimeException("Failed 
while draining FlowFiles", throwable));
+            }
+            return null;
         });
 
         return result;
diff --git a/src/main/java/org/apache/nifi/components/connector/Connector.java 
b/src/main/java/org/apache/nifi/components/connector/Connector.java
index ce88079..c0dd957 100644
--- a/src/main/java/org/apache/nifi/components/connector/Connector.java
+++ b/src/main/java/org/apache/nifi/components/connector/Connector.java
@@ -25,6 +25,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * <p>
@@ -195,4 +196,12 @@ public interface Connector {
     List<AllowableValue> fetchAllowableValues(String stepName, String 
propertyName, FlowContext flowContext);
 
     List<AllowableValue> fetchAllowableValues(String stepName, String 
propertyName, FlowContext flowContext, String filter);
+
+    /**
+     * Drains any in-flight FlowFiles from the flow associated with the given 
Flow Context by processing the existing data
+     * but not accepting any new data.
+     * @param flowContext the flow context
+     * @return a Future that will be completed when the draining is complete
+     */
+    CompletableFuture<Void> drainFlowFiles(FlowContext flowContext);
 }

Reply via email to