This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi-api.git

commit 50012d9b62e99e8fec58a784b1f5c9fd9b5af8d3
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 14:16:49 2026 -0500

    NIFI-15538: When starting/stopping components allow specifying whether or 
not the action should be recursive; code cleanup and simplification to use a 
Virtual Thread to execute code sequentially instead of chaining 
CompletableFutures. (#61)
    
    Co-authored-by: Mark Payne <[email protected]>
---
 .../components/connector/AbstractConnector.java    | 138 ++++++++++++++-------
 .../components/ProcessGroupLifecycle.java          |  16 ++-
 2 files changed, 107 insertions(+), 47 deletions(-)

diff --git 
a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java 
b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
index 7689858..c1c6326 100644
--- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
+++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -54,6 +55,7 @@ public abstract class AbstractConnector implements Connector {
     private volatile ConnectorInitializationContext initializationContext;
     private volatile ComponentLog logger;
     private volatile CompletableFuture<Void> prepareUpdateFuture;
+    private String description; // effectively final
 
     protected abstract void onStepConfigured(final String stepName, final 
FlowContext workingContext) throws FlowUpdateException;
 
@@ -62,6 +64,7 @@ public abstract class AbstractConnector implements Connector {
     public final void initialize(final ConnectorInitializationContext context) 
{
         this.initializationContext = context;
         this.logger = context.getLogger();
+        this.description = getClass().getSimpleName() + "[id=" + 
context.getIdentifier() + "]";
 
         init();
     }
@@ -87,7 +90,30 @@ public abstract class AbstractConnector implements Connector 
{
     @Override
     public void start(final FlowContext context) throws FlowUpdateException {
         final ProcessGroupLifecycle lifecycle = 
context.getRootGroup().getLifecycle();
-        
lifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY);
+        final CompletableFuture<Void> enableServicesFuture = 
lifecycle.enableControllerServices(
+            ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
+            ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+
+        try {
+            enableServicesFuture.get();
+        } catch (final Exception e) {
+            
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+            throw new FlowUpdateException("Failed to enable Controller 
Services while starting Connector", e);
+        }
+
+        try {
+            lifecycle.startProcessors(true).get();
+            lifecycle.startPorts(true).get();
+            lifecycle.startStatelessGroups(true).get();
+            lifecycle.startRemoteProcessGroups(true).get();
+        } catch (final Exception e) {
+            logger.error("Failed to start components for {}", this, e);
+            try {
+                stop(context);
+            } catch (final Exception stopException) {
+                e.addSuppressed(new FlowUpdateException("Failed to stop 
Connector cleanly", stopException));
+            }
+        }
     }
 
     @Override
@@ -100,13 +126,16 @@ public abstract class AbstractConnector implements 
Connector {
     }
 
     private CompletableFuture<Void> stopAsync(final FlowContext context) {
-        final ProcessGroupFacade rootGroup = context.getRootGroup();
-        final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle();
+        final CompletableFuture<Void> result = new CompletableFuture<>();
 
-        final CompletableFuture<Void> stopProcessorsFuture = 
lifecycle.stopProcessors()
-            .orTimeout(1, TimeUnit.MINUTES)
-            .exceptionally(throwable -> {
-                if (throwable instanceof TimeoutException || 
throwable.getCause() instanceof TimeoutException) {
+        Thread.startVirtualThread(() -> {
+            try {
+                final ProcessGroupFacade rootGroup = context.getRootGroup();
+                final ProcessGroupLifecycle lifecycle = 
rootGroup.getLifecycle();
+
+                try {
+                    lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES);
+                } catch (final TimeoutException e) {
                     final List<ProcessorFacade> running = 
findProcessors(rootGroup, processor ->
                         processor.getLifecycle().getState() != 
ProcessorState.STOPPED && processor.getLifecycle().getState() != 
ProcessorState.DISABLED);
 
@@ -114,21 +143,22 @@ public abstract class AbstractConnector implements 
Connector {
                         getLogger().warn("After waiting 60 seconds for all 
Processors to stop, {} are still running. Terminating now.", running.size());
                         running.forEach(processor -> 
processor.getLifecycle().terminate());
                     }
-
-                    // Continue with the chain after handling timeout
-                    return null;
-                } else {
-                    throw new RuntimeException("Failed to stop all 
Processors", throwable);
+                } catch (final ExecutionException e) {
+                    throw new RuntimeException("Failed to stop all 
Processors", e.getCause());
                 }
-            });
 
-        return stopProcessorsFuture.thenRunAsync(() -> {
-            try {
-                
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
+                lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES);
+                lifecycle.stopRemoteProcessGroups(true).get(1, 
TimeUnit.MINUTES);
+                lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES);
+                
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2,
 TimeUnit.MINUTES);
+
+                result.complete(null);
             } catch (final Exception e) {
-                throw new RuntimeException("Failed to complete disabling of 
all Controller Services", e);
+                result.completeExceptionally(e);
             }
         });
+
+        return result;
     }
 
     @Override
@@ -162,24 +192,19 @@ public abstract class AbstractConnector implements 
Connector {
         getLogger().info("Draining {} FlowFiles ({} bytes) from Connector",
             initialQueueSize.getObjectCount(), 
NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount()));
 
-        final CompletableFuture<Void> stopProcessorsFuture = 
stopSourceProcessors(flowContext);
-
-        final CompletableFuture<Void> startNonSourceFuture = 
stopProcessorsFuture.thenRunAsync(() -> {
-            startNonSourceProcessors(result, flowContext);
-        }).exceptionally(throwable -> {
-            if (!result.isDone()) {
-                result.completeExceptionally(new RuntimeException("Failed to 
stop source processors while draining FlowFiles", throwable));
-            }
-            return null;
-        });
+        Thread.startVirtualThread(() -> {
+            try {
+                stopSourceComponents(flowContext).get();
+                startNonSourceComponents(result, flowContext);
 
-        startNonSourceFuture.thenRunAsync(() -> {
-            completeDrain(result, flowContext, initialQueueSize);
-        }).exceptionally(throwable -> {
-            if (!result.isDone()) {
-                result.completeExceptionally(new RuntimeException("Failed 
while draining FlowFiles", throwable));
+                if (!result.isDone()) {
+                    completeDrain(result, flowContext, initialQueueSize);
+                }
+            } catch (final Exception e) {
+                if (!result.isDone()) {
+                    result.completeExceptionally(new RuntimeException("Failed 
while draining FlowFiles", e));
+                }
             }
-            return null;
         });
 
         return result;
@@ -242,7 +267,7 @@ public abstract class AbstractConnector implements 
Connector {
         }
     }
 
-    private void startNonSourceProcessors(final CompletableFuture<Void> 
result, final FlowContext flowContext) {
+    private void startNonSourceComponents(final CompletableFuture<Void> 
result, final FlowContext flowContext) {
         if (result.isDone()) {
             return;
         }
@@ -256,8 +281,8 @@ public abstract class AbstractConnector implements 
Connector {
             enableServices.get();
 
             if (!result.isDone()) {
-                getLogger().info("Starting all non-source processors to 
facilitate drainage of FlowFiles");
-                startNonSourceProcessors(flowContext).get();
+                getLogger().info("Starting all non-source components to 
facilitate drainage of FlowFiles");
+                startNonSourceComponents(flowContext).get();
             }
         } catch (final Exception e) {
             try {
@@ -266,7 +291,7 @@ public abstract class AbstractConnector implements 
Connector {
                 e.addSuppressed(e1);
             }
 
-            result.completeExceptionally(new RuntimeException("Failed to start 
non-source processors while draining FlowFiles", e.getCause()));
+            result.completeExceptionally(new RuntimeException("Failed to start 
non-source components while draining FlowFiles", e.getCause()));
         }
     }
 
@@ -380,25 +405,44 @@ public abstract class AbstractConnector implements 
Connector {
         return group.getQueueSize().getObjectCount() == 0;
     }
 
-    protected CompletableFuture<Void> stopSourceProcessors(final FlowContext 
context) {
-        final List<ProcessorFacade> sourceProcessors = 
getSourceProcessors(context.getRootGroup());
+    protected CompletableFuture<Void> stopSourceComponents(final FlowContext 
context) {
+        return stopSourceComponents(context.getRootGroup());
+    }
 
+    private CompletableFuture<Void> stopSourceComponents(final 
ProcessGroupFacade group) {
         final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
+
+        final List<ProcessorFacade> sourceProcessors = 
getSourceProcessors(group);
         for (final ProcessorFacade sourceProcessor : sourceProcessors) {
-            final CompletableFuture<Void> stopFuture = 
sourceProcessor.getLifecycle().stop();
-            stopFutures.add(stopFuture);
+            stopFutures.add(sourceProcessor.getLifecycle().stop());
+        }
+
+        for (final ProcessGroupFacade childGroup : group.getProcessGroups()) {
+            stopFutures.add(stopSourceComponents(childGroup));
         }
 
         return CompletableFuture.allOf(stopFutures.toArray(new 
CompletableFuture[0]));
     }
 
-    protected CompletableFuture<Void> startNonSourceProcessors(final 
FlowContext flowContext) {
-        final List<ProcessorFacade> nonSourceProcessors = 
getNonSourceProcessors(flowContext.getRootGroup());
+    protected CompletableFuture<Void> startNonSourceComponents(final 
FlowContext flowContext) {
+        return startNonSourceComponents(flowContext.getRootGroup());
+    }
 
+    private CompletableFuture<Void> startNonSourceComponents(final 
ProcessGroupFacade group) {
         final List<CompletableFuture<Void>> startFutures = new ArrayList<>();
+
+        final List<ProcessorFacade> nonSourceProcessors = 
getNonSourceProcessors(group);
         for (final ProcessorFacade nonSourceProcessor : nonSourceProcessors) {
-            final CompletableFuture<Void> startFuture = 
nonSourceProcessor.getLifecycle().start();
-            startFutures.add(startFuture);
+            startFutures.add(nonSourceProcessor.getLifecycle().start());
+        }
+
+        final ProcessGroupLifecycle lifecycle = group.getLifecycle();
+        startFutures.add(lifecycle.startPorts(false));
+        startFutures.add(lifecycle.startRemoteProcessGroups(false));
+        startFutures.add(lifecycle.startStatelessGroups(false));
+
+        for (final ProcessGroupFacade childGroup : group.getProcessGroups()) {
+            startFutures.add(startNonSourceComponents(childGroup));
         }
 
         return CompletableFuture.allOf(startFutures.toArray(new 
CompletableFuture[0]));
@@ -693,4 +737,8 @@ public abstract class AbstractConnector implements 
Connector {
         return Collections.emptyList();
     }
 
+    @Override
+    public String toString() {
+        return description;
+    }
 }
diff --git 
a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
 
b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
index 8cc3ebb..144094a 100644
--- 
a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
+++ 
b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
@@ -30,13 +30,25 @@ public interface ProcessGroupLifecycle {
 
     CompletableFuture<Void> disableControllerServices(Collection<String> 
serviceIdentifiers);
 
-    CompletableFuture<Void> startProcessors();
+    CompletableFuture<Void> startProcessors(boolean recursive);
 
     CompletableFuture<Void> start(ControllerServiceReferenceScope 
serviceReferenceScope);
 
     CompletableFuture<Void> stop();
 
-    CompletableFuture<Void> stopProcessors();
+    CompletableFuture<Void> stopProcessors(boolean recursive);
+
+    CompletableFuture<Void> startPorts(boolean recursive);
+
+    CompletableFuture<Void> stopPorts(boolean recursive);
+
+    CompletableFuture<Void> startRemoteProcessGroups(boolean recursive);
+
+    CompletableFuture<Void> stopRemoteProcessGroups(boolean recursive);
+
+    CompletableFuture<Void> startStatelessGroups(boolean recursive);
+
+    CompletableFuture<Void> stopStatelessGroups(boolean recursive);
 
     int getActiveThreadCount();
 }

Reply via email to