This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi-api.git

commit 8541f0f8d3b1beda69d7cfeabd6d17c5cc18517b
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 10:22:32 2026 -0500

    NIFI-15514: Bug fix - ensure that when AbstractConnector is started, it 
includes all components, not just processors and controller services; code 
cleanup (#57)
---
 .../components/connector/AbstractConnector.java    | 161 +++++++++++----------
 1 file changed, 81 insertions(+), 80 deletions(-)

diff --git 
a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java 
b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
index d837f7b..7689858 100644
--- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
+++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
@@ -87,14 +87,7 @@ public abstract class AbstractConnector implements Connector 
{
     @Override
     public void start(final FlowContext context) throws FlowUpdateException {
         final ProcessGroupLifecycle lifecycle = 
context.getRootGroup().getLifecycle();
-
-        try {
-            
lifecycle.enableControllerServices(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
 ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
-        } catch (final Exception e) {
-            throw new FlowUpdateException("Failed to enable Controller 
Services", e);
-        }
-
-        lifecycle.startProcessors();
+        
lifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY);
     }
 
     @Override
@@ -129,7 +122,7 @@ public abstract class AbstractConnector implements 
Connector {
                 }
             });
 
-        return stopProcessorsFuture.thenRun(() -> {
+        return stopProcessorsFuture.thenRunAsync(() -> {
             try {
                 
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
             } catch (final Exception e) {
@@ -171,32 +164,8 @@ public abstract class AbstractConnector implements 
Connector {
 
         final CompletableFuture<Void> stopProcessorsFuture = 
stopSourceProcessors(flowContext);
 
-        final CompletableFuture<Void> startNonSourceFuture = 
stopProcessorsFuture.thenRun(() -> {
-            if (result.isDone()) {
-                return;
-            }
-
-            final CompletableFuture<Void> enableServices = 
flowContext.getRootGroup().getLifecycle().enableControllerServices(
-                
ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
-                ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
-
-            try {
-                // Wait for all referenced services to be enabled.
-                enableServices.get();
-
-                if (!result.isDone()) {
-                    getLogger().info("Starting all non-source processors to 
facilitate drainage of FlowFiles");
-                    startNonSourceProcessors(flowContext).get();
-                }
-            } catch (final Exception e) {
-                try {
-                    
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
-                } catch (final Exception e1) {
-                    e.addSuppressed(e1);
-                }
-
-                result.completeExceptionally(new RuntimeException("Failed to 
start non-source processors while draining FlowFiles", e.getCause()));
-            }
+        final CompletableFuture<Void> startNonSourceFuture = 
stopProcessorsFuture.thenRunAsync(() -> {
+            startNonSourceProcessors(result, flowContext);
         }).exceptionally(throwable -> {
             if (!result.isDone()) {
                 result.completeExceptionally(new RuntimeException("Failed to 
stop source processors while draining FlowFiles", throwable));
@@ -205,68 +174,100 @@ public abstract class AbstractConnector implements 
Connector {
         });
 
         startNonSourceFuture.thenRunAsync(() -> {
-            try {
-                ensureDrainageUnblocked();
-            } catch (final Exception e) {
-                getLogger().warn("Failed to ensure drainage is unblocked when 
draining FlowFiles", e);
+            completeDrain(result, flowContext, initialQueueSize);
+        }).exceptionally(throwable -> {
+            if (!result.isDone()) {
+                result.completeExceptionally(new RuntimeException("Failed 
while draining FlowFiles", throwable));
             }
+            return null;
+        });
 
-            Exception failureReason = null;
-            int iterations = 0;
-            while (!isGroupDrained(flowContext.getRootGroup())) {
-                if (result.isDone()) {
-                    getLogger().info("Drainage has been cancelled; will no 
longer wait for FlowFiles to drain");
-                    break;
-                }
+        return result;
+    }
 
-                // Log the current queue size every 10 seconds (20 iterations 
of 500ms) so that it's clear
-                // whether or not progress is being made.
-                if (iterations++ % 20 == 0) {
-                    final QueueSize queueSize = 
flowContext.getRootGroup().getQueueSize();
-                    getLogger().info("Waiting for {} FlowFiles ({} bytes) to 
drain",
-                        queueSize.getObjectCount(), 
NumberFormat.getNumberInstance().format(queueSize.getByteCount()));
-                }
+    private void completeDrain(final CompletableFuture<Void> result, final 
FlowContext flowContext, final QueueSize initialQueueSize) {
+        try {
+            ensureDrainageUnblocked();
+        } catch (final Exception e) {
+            getLogger().warn("Failed to ensure drainage is unblocked when 
draining FlowFiles", e);
+        }
 
-                try {
-                    Thread.sleep(500);
-                } catch (final InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    failureReason = e;
-                    break;
-                }
+        Exception failureReason = null;
+        int iterations = 0;
+        while (!isGroupDrained(flowContext.getRootGroup())) {
+            if (result.isDone()) {
+                getLogger().info("Drainage has been cancelled; will no longer 
wait for FlowFiles to drain");
+                break;
             }
 
-            // Log completion unless the result was completed exceptionally or 
cancelled.
-            if (!result.isDone()) {
-                getLogger().info("All {} FlowFiles have drained from 
Connector", initialQueueSize.getObjectCount());
+            // Log the current queue size every 10 seconds (20 iterations of 
500ms) so that it's clear
+            // whether or not progress is being made.
+            if (iterations++ % 20 == 0) {
+                final QueueSize queueSize = 
flowContext.getRootGroup().getQueueSize();
+                getLogger().info("Waiting for {} FlowFiles ({} bytes) to 
drain",
+                    queueSize.getObjectCount(), 
NumberFormat.getNumberInstance().format(queueSize.getByteCount()));
             }
 
             try {
-                stop(flowContext);
-            } catch (final Exception e) {
-                getLogger().warn("Failed to stop source Processors after 
draining FlowFiles", e);
-                if (failureReason == null) {
-                    failureReason = e;
-                } else {
-                    failureReason.addSuppressed(e);
-                }
+                Thread.sleep(500);
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                failureReason = e;
+                break;
             }
+        }
+
+        // Log completion unless the result was completed exceptionally or 
cancelled.
+        if (!result.isDone()) {
+            getLogger().info("All {} FlowFiles have drained from Connector", 
initialQueueSize.getObjectCount());
+        }
 
-            if (failureReason != null && !result.isDone()) {
-                result.completeExceptionally(new RuntimeException("Interrupted 
while waiting for " + AbstractConnector.this + " to drain", failureReason));
+        try {
+            stop(flowContext);
+        } catch (final Exception e) {
+            getLogger().warn("Failed to stop source Processors after draining 
FlowFiles", e);
+            if (failureReason == null) {
+                failureReason = e;
+            } else {
+                failureReason.addSuppressed(e);
             }
+        }
+
+        if (failureReason != null && !result.isDone()) {
+            result.completeExceptionally(new RuntimeException("Interrupted 
while waiting for " + AbstractConnector.this + " to drain", failureReason));
+        }
+
+        if (!result.isDone()) {
+            result.complete(null);
+        }
+    }
+
+    private void startNonSourceProcessors(final CompletableFuture<Void> 
result, final FlowContext flowContext) {
+        if (result.isDone()) {
+            return;
+        }
+
+        final CompletableFuture<Void> enableServices = 
flowContext.getRootGroup().getLifecycle().enableControllerServices(
+            ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
+            ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+
+        try {
+            // Wait for all referenced services to be enabled.
+            enableServices.get();
 
             if (!result.isDone()) {
-                result.complete(null);
+                getLogger().info("Starting all non-source processors to 
facilitate drainage of FlowFiles");
+                startNonSourceProcessors(flowContext).get();
             }
-        }).exceptionally(throwable -> {
-            if (!result.isDone()) {
-                result.completeExceptionally(new RuntimeException("Failed 
while draining FlowFiles", throwable));
+        } catch (final Exception e) {
+            try {
+                
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
+            } catch (final Exception e1) {
+                e.addSuppressed(e1);
             }
-            return null;
-        });
 
-        return result;
+            result.completeExceptionally(new RuntimeException("Failed to start 
non-source processors while draining FlowFiles", e.getCause()));
+        }
     }
 
     /**

Reply via email to