This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi-api.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 8fda9f1 NIFI-15427: Added drainFlowFiles(FlowContext) method to
Connector (#42)
8fda9f1 is described below
commit 8fda9f1a7829db374668258a11616a0444e5dd7a
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jan 9 13:01:26 2026 -0500
NIFI-15427: Added drainFlowFiles(FlowContext) method to Connector (#42)
* NIFI-15427: Added drainFlowFiles(FlowContext) method to Connector
* NIFI-15427: Added missing @Override annotation
* NIFI-15427: Ensured proper error handling when draining FlowFiles
---
.../apache/nifi/components/connector/AbstractConnector.java | 11 +++++++++++
.../java/org/apache/nifi/components/connector/Connector.java | 9 +++++++++
2 files changed, 20 insertions(+)
diff --git
a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
index 8045bc6..d965699 100644
--- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
+++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
@@ -156,6 +156,7 @@ public abstract class AbstractConnector implements
Connector {
* @param flowContext the FlowContext to use for drainage
* @return a CompletableFuture that will be completed when drainage is
complete
*/
+ @Override
public CompletableFuture<Void> drainFlowFiles(final FlowContext
flowContext) {
final CompletableFuture<Void> result = new CompletableFuture<>();
final QueueSize initialQueueSize =
flowContext.getRootGroup().getQueueSize();
@@ -196,6 +197,11 @@ public abstract class AbstractConnector implements
Connector {
result.completeExceptionally(new RuntimeException("Failed to
start non-source processors while draining FlowFiles", e.getCause()));
}
+ }).exceptionally(throwable -> {
+ if (!result.isDone()) {
+ result.completeExceptionally(new RuntimeException("Failed to
stop source processors while draining FlowFiles", throwable));
+ }
+ return null;
});
startNonSourceFuture.thenRun(() -> {
@@ -253,6 +259,11 @@ public abstract class AbstractConnector implements
Connector {
if (!result.isDone()) {
result.complete(null);
}
+ }).exceptionally(throwable -> {
+ if (!result.isDone()) {
+ result.completeExceptionally(new RuntimeException("Failed
while draining FlowFiles", throwable));
+ }
+ return null;
});
return result;
diff --git a/src/main/java/org/apache/nifi/components/connector/Connector.java
b/src/main/java/org/apache/nifi/components/connector/Connector.java
index ce88079..c0dd957 100644
--- a/src/main/java/org/apache/nifi/components/connector/Connector.java
+++ b/src/main/java/org/apache/nifi/components/connector/Connector.java
@@ -25,6 +25,7 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
* <p>
@@ -195,4 +196,12 @@ public interface Connector {
List<AllowableValue> fetchAllowableValues(String stepName, String
propertyName, FlowContext flowContext);
List<AllowableValue> fetchAllowableValues(String stepName, String
propertyName, FlowContext flowContext, String filter);
+
+ /**
+ * Drains any in-flight FlowFiles from the flow associated with the given
Flow Context by processing the existing data
+ * but not accepting any new data.
+ * @param flowContext the flow context
+ * @return a Future that will be completed when the draining is complete
+ */
+ CompletableFuture<Void> drainFlowFiles(FlowContext flowContext);
}