This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi-api.git
commit 50012d9b62e99e8fec58a784b1f5c9fd9b5af8d3 Author: Mark Payne <[email protected]> AuthorDate: Tue Feb 3 14:16:49 2026 -0500 NIFI-15538: When starting/stopping components allow specifying whether or not the action should be recursive; code cleanup and simplification to use a Virtual Thread to execute code sequentially instead of chaining CompletableFutures. (#61) Co-authored-by: Mark Payne <[email protected]> --- .../components/connector/AbstractConnector.java | 138 ++++++++++++++------- .../components/ProcessGroupLifecycle.java | 16 ++- 2 files changed, 107 insertions(+), 47 deletions(-) diff --git a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java index 7689858..c1c6326 100644 --- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java +++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -54,6 +55,7 @@ public abstract class AbstractConnector implements Connector { private volatile ConnectorInitializationContext initializationContext; private volatile ComponentLog logger; private volatile CompletableFuture<Void> prepareUpdateFuture; + private String description; // effectively final protected abstract void onStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException; @@ -62,6 +64,7 @@ public abstract class AbstractConnector implements Connector { public final void initialize(final ConnectorInitializationContext context) { this.initializationContext = context; this.logger = context.getLogger(); + this.description = getClass().getSimpleName() + "[id=" + context.getIdentifier() + "]"; init(); } @@ -87,7 +90,30 @@ public abstract class AbstractConnector implements Connector { @Override public void start(final FlowContext context) throws FlowUpdateException { final ProcessGroupLifecycle lifecycle = context.getRootGroup().getLifecycle(); - lifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY); + final CompletableFuture<Void> enableServicesFuture = lifecycle.enableControllerServices( + ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, + ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + + try { + enableServicesFuture.get(); + } catch (final Exception e) { + lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + throw new FlowUpdateException("Failed to enable Controller Services while starting Connector", e); + } + + try { + lifecycle.startProcessors(true).get(); + lifecycle.startPorts(true).get(); + lifecycle.startStatelessGroups(true).get(); + lifecycle.startRemoteProcessGroups(true).get(); + } catch (final Exception e) { + logger.error("Failed to start components for {}", this, e); + try { + stop(context); + } catch (final Exception stopException) { + e.addSuppressed(new FlowUpdateException("Failed to stop Connector cleanly", stopException)); + } + } } @Override @@ -100,13 +126,16 @@ public abstract class AbstractConnector implements Connector { } private CompletableFuture<Void> stopAsync(final FlowContext context) { - final ProcessGroupFacade rootGroup = context.getRootGroup(); - final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle(); + final CompletableFuture<Void> result = new CompletableFuture<>(); - final CompletableFuture<Void> stopProcessorsFuture = lifecycle.stopProcessors() - .orTimeout(1, TimeUnit.MINUTES) - .exceptionally(throwable -> { - if (throwable instanceof TimeoutException || throwable.getCause() instanceof TimeoutException) { + Thread.startVirtualThread(() -> { + try { + final ProcessGroupFacade rootGroup = context.getRootGroup(); + final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle(); + + try { + lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES); + } catch (final TimeoutException e) { final List<ProcessorFacade> running = findProcessors(rootGroup, processor -> processor.getLifecycle().getState() != ProcessorState.STOPPED && processor.getLifecycle().getState() != ProcessorState.DISABLED); @@ -114,21 +143,22 @@ public abstract class AbstractConnector implements Connector { getLogger().warn("After waiting 60 seconds for all Processors to stop, {} are still running. Terminating now.", running.size()); running.forEach(processor -> processor.getLifecycle().terminate()); } - - // Continue with the chain after handling timeout - return null; - } else { - throw new RuntimeException("Failed to stop all Processors", throwable); + } catch (final ExecutionException e) { + throw new RuntimeException("Failed to stop all Processors", e.getCause()); } - }); - return stopProcessorsFuture.thenRunAsync(() -> { - try { - lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); + lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES); + lifecycle.stopRemoteProcessGroups(true).get(1, TimeUnit.MINUTES); + lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES); + lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2, TimeUnit.MINUTES); + + result.complete(null); } catch (final Exception e) { - throw new RuntimeException("Failed to complete disabling of all Controller Services", e); + result.completeExceptionally(e); } }); + + return result; } @Override @@ -162,24 +192,19 @@ public abstract class AbstractConnector implements Connector { getLogger().info("Draining {} FlowFiles ({} bytes) from Connector", initialQueueSize.getObjectCount(), NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount())); - final CompletableFuture<Void> stopProcessorsFuture = stopSourceProcessors(flowContext); - - final CompletableFuture<Void> startNonSourceFuture = stopProcessorsFuture.thenRunAsync(() -> { - startNonSourceProcessors(result, flowContext); - }).exceptionally(throwable -> { - if (!result.isDone()) { - result.completeExceptionally(new RuntimeException("Failed to stop source processors while draining FlowFiles", throwable)); - } - return null; - }); + Thread.startVirtualThread(() -> { + try { + stopSourceComponents(flowContext).get(); + startNonSourceComponents(result, flowContext); - startNonSourceFuture.thenRunAsync(() -> { - completeDrain(result, flowContext, initialQueueSize); - }).exceptionally(throwable -> { - if (!result.isDone()) { - result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", throwable)); + if (!result.isDone()) { + completeDrain(result, flowContext, initialQueueSize); + } + } catch (final Exception e) { + if (!result.isDone()) { + result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", e)); + } } - return null; }); return result; @@ -242,7 +267,7 @@ public abstract class AbstractConnector implements Connector { } } - private void startNonSourceProcessors(final CompletableFuture<Void> result, final FlowContext flowContext) { + private void startNonSourceComponents(final CompletableFuture<Void> result, final FlowContext flowContext) { if (result.isDone()) { return; } @@ -256,8 +281,8 @@ public abstract class AbstractConnector implements Connector { enableServices.get(); if (!result.isDone()) { - getLogger().info("Starting all non-source processors to facilitate drainage of FlowFiles"); - startNonSourceProcessors(flowContext).get(); + getLogger().info("Starting all non-source components to facilitate drainage of FlowFiles"); + startNonSourceComponents(flowContext).get(); } } catch (final Exception e) { try { @@ -266,7 +291,7 @@ public abstract class AbstractConnector implements Connector { e.addSuppressed(e1); } - result.completeExceptionally(new RuntimeException("Failed to start non-source processors while draining FlowFiles", e.getCause())); + result.completeExceptionally(new RuntimeException("Failed to start non-source components while draining FlowFiles", e.getCause())); } } @@ -380,25 +405,44 @@ public abstract class AbstractConnector implements Connector { return group.getQueueSize().getObjectCount() == 0; } - protected CompletableFuture<Void> stopSourceProcessors(final FlowContext context) { - final List<ProcessorFacade> sourceProcessors = getSourceProcessors(context.getRootGroup()); + protected CompletableFuture<Void> stopSourceComponents(final FlowContext context) { + return stopSourceComponents(context.getRootGroup()); + } + private CompletableFuture<Void> stopSourceComponents(final ProcessGroupFacade group) { final List<CompletableFuture<Void>> stopFutures = new ArrayList<>(); + + final List<ProcessorFacade> sourceProcessors = getSourceProcessors(group); for (final ProcessorFacade sourceProcessor : sourceProcessors) { - final CompletableFuture<Void> stopFuture = sourceProcessor.getLifecycle().stop(); - stopFutures.add(stopFuture); + stopFutures.add(sourceProcessor.getLifecycle().stop()); + } + + for (final ProcessGroupFacade childGroup : group.getProcessGroups()) { + stopFutures.add(stopSourceComponents(childGroup)); } return CompletableFuture.allOf(stopFutures.toArray(new CompletableFuture[0])); } - protected CompletableFuture<Void> startNonSourceProcessors(final FlowContext flowContext) { - final List<ProcessorFacade> nonSourceProcessors = getNonSourceProcessors(flowContext.getRootGroup()); + protected CompletableFuture<Void> startNonSourceComponents(final FlowContext flowContext) { + return startNonSourceComponents(flowContext.getRootGroup()); + } + private CompletableFuture<Void> startNonSourceComponents(final ProcessGroupFacade group) { final List<CompletableFuture<Void>> startFutures = new ArrayList<>(); + + final List<ProcessorFacade> nonSourceProcessors = getNonSourceProcessors(group); for (final ProcessorFacade nonSourceProcessor : nonSourceProcessors) { - final CompletableFuture<Void> startFuture = nonSourceProcessor.getLifecycle().start(); - startFutures.add(startFuture); + startFutures.add(nonSourceProcessor.getLifecycle().start()); + } + + final ProcessGroupLifecycle lifecycle = group.getLifecycle(); + startFutures.add(lifecycle.startPorts(false)); + startFutures.add(lifecycle.startRemoteProcessGroups(false)); + startFutures.add(lifecycle.startStatelessGroups(false)); + + for (final ProcessGroupFacade childGroup : group.getProcessGroups()) { + startFutures.add(startNonSourceComponents(childGroup)); } return CompletableFuture.allOf(startFutures.toArray(new CompletableFuture[0])); @@ -693,4 +737,8 @@ public abstract class AbstractConnector implements Connector { return Collections.emptyList(); } + @Override + public String toString() { + return description; + } } diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java index 8cc3ebb..144094a 100644 --- a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java @@ -30,13 +30,25 @@ public interface ProcessGroupLifecycle { CompletableFuture<Void> disableControllerServices(Collection<String> serviceIdentifiers); - CompletableFuture<Void> startProcessors(); + CompletableFuture<Void> startProcessors(boolean recursive); CompletableFuture<Void> start(ControllerServiceReferenceScope serviceReferenceScope); CompletableFuture<Void> stop(); - CompletableFuture<Void> stopProcessors(); + CompletableFuture<Void> stopProcessors(boolean recursive); + + CompletableFuture<Void> startPorts(boolean recursive); + + CompletableFuture<Void> stopPorts(boolean recursive); + + CompletableFuture<Void> startRemoteProcessGroups(boolean recursive); + + CompletableFuture<Void> stopRemoteProcessGroups(boolean recursive); + + CompletableFuture<Void> startStatelessGroups(boolean recursive); + + CompletableFuture<Void> stopStatelessGroups(boolean recursive); int getActiveThreadCount(); }
