This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi-api.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 80de11f NIFI-15538: When starting/stopping components allow
specifying whether or not the action should be recursive; code cleanup and
simplification to use a Virtual Thread to execute code sequentially instead of
chaining CompletableFutures. (#61)
80de11f is described below
commit 80de11fcfd1b6ba4dd08c55f592d1e42b4b4095f
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 14:16:49 2026 -0500
NIFI-15538: When starting/stopping components allow specifying whether or
not the action should be recursive; code cleanup and simplification to use a
Virtual Thread to execute code sequentially instead of chaining
CompletableFutures. (#61)
Co-authored-by: Mark Payne <[email protected]>
---
.../components/connector/AbstractConnector.java | 138 ++++++++++++++-------
.../components/ProcessGroupLifecycle.java | 16 ++-
2 files changed, 107 insertions(+), 47 deletions(-)
diff --git
a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
index 7689858..c1c6326 100644
--- a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
+++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@@ -54,6 +55,7 @@ public abstract class AbstractConnector implements Connector {
private volatile ConnectorInitializationContext initializationContext;
private volatile ComponentLog logger;
private volatile CompletableFuture<Void> prepareUpdateFuture;
+ private String description; // effectively final
protected abstract void onStepConfigured(final String stepName, final
FlowContext workingContext) throws FlowUpdateException;
@@ -62,6 +64,7 @@ public abstract class AbstractConnector implements Connector {
public final void initialize(final ConnectorInitializationContext context)
{
this.initializationContext = context;
this.logger = context.getLogger();
+ this.description = getClass().getSimpleName() + "[id=" +
context.getIdentifier() + "]";
init();
}
@@ -87,7 +90,30 @@ public abstract class AbstractConnector implements Connector
{
@Override
public void start(final FlowContext context) throws FlowUpdateException {
final ProcessGroupLifecycle lifecycle =
context.getRootGroup().getLifecycle();
-
lifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY);
+ final CompletableFuture<Void> enableServicesFuture =
lifecycle.enableControllerServices(
+ ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
+ ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+
+ try {
+ enableServicesFuture.get();
+ } catch (final Exception e) {
+
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+ throw new FlowUpdateException("Failed to enable Controller
Services while starting Connector", e);
+ }
+
+ try {
+ lifecycle.startProcessors(true).get();
+ lifecycle.startPorts(true).get();
+ lifecycle.startStatelessGroups(true).get();
+ lifecycle.startRemoteProcessGroups(true).get();
+ } catch (final Exception e) {
+ logger.error("Failed to start components for {}", this, e);
+ try {
+ stop(context);
+ } catch (final Exception stopException) {
+ e.addSuppressed(new FlowUpdateException("Failed to stop
Connector cleanly", stopException));
+ }
+ }
}
@Override
@@ -100,13 +126,16 @@ public abstract class AbstractConnector implements
Connector {
}
private CompletableFuture<Void> stopAsync(final FlowContext context) {
- final ProcessGroupFacade rootGroup = context.getRootGroup();
- final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle();
+ final CompletableFuture<Void> result = new CompletableFuture<>();
- final CompletableFuture<Void> stopProcessorsFuture =
lifecycle.stopProcessors()
- .orTimeout(1, TimeUnit.MINUTES)
- .exceptionally(throwable -> {
- if (throwable instanceof TimeoutException ||
throwable.getCause() instanceof TimeoutException) {
+ Thread.startVirtualThread(() -> {
+ try {
+ final ProcessGroupFacade rootGroup = context.getRootGroup();
+ final ProcessGroupLifecycle lifecycle =
rootGroup.getLifecycle();
+
+ try {
+ lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES);
+ } catch (final TimeoutException e) {
final List<ProcessorFacade> running =
findProcessors(rootGroup, processor ->
processor.getLifecycle().getState() !=
ProcessorState.STOPPED && processor.getLifecycle().getState() !=
ProcessorState.DISABLED);
@@ -114,21 +143,22 @@ public abstract class AbstractConnector implements
Connector {
getLogger().warn("After waiting 60 seconds for all
Processors to stop, {} are still running. Terminating now.", running.size());
running.forEach(processor ->
processor.getLifecycle().terminate());
}
-
- // Continue with the chain after handling timeout
- return null;
- } else {
- throw new RuntimeException("Failed to stop all
Processors", throwable);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException("Failed to stop all
Processors", e.getCause());
}
- });
- return stopProcessorsFuture.thenRunAsync(() -> {
- try {
-
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
+ lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES);
+ lifecycle.stopRemoteProcessGroups(true).get(1,
TimeUnit.MINUTES);
+ lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES);
+
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2,
TimeUnit.MINUTES);
+
+ result.complete(null);
} catch (final Exception e) {
- throw new RuntimeException("Failed to complete disabling of
all Controller Services", e);
+ result.completeExceptionally(e);
}
});
+
+ return result;
}
@Override
@@ -162,24 +192,19 @@ public abstract class AbstractConnector implements
Connector {
getLogger().info("Draining {} FlowFiles ({} bytes) from Connector",
initialQueueSize.getObjectCount(),
NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount()));
- final CompletableFuture<Void> stopProcessorsFuture =
stopSourceProcessors(flowContext);
-
- final CompletableFuture<Void> startNonSourceFuture =
stopProcessorsFuture.thenRunAsync(() -> {
- startNonSourceProcessors(result, flowContext);
- }).exceptionally(throwable -> {
- if (!result.isDone()) {
- result.completeExceptionally(new RuntimeException("Failed to
stop source processors while draining FlowFiles", throwable));
- }
- return null;
- });
+ Thread.startVirtualThread(() -> {
+ try {
+ stopSourceComponents(flowContext).get();
+ startNonSourceComponents(result, flowContext);
- startNonSourceFuture.thenRunAsync(() -> {
- completeDrain(result, flowContext, initialQueueSize);
- }).exceptionally(throwable -> {
- if (!result.isDone()) {
- result.completeExceptionally(new RuntimeException("Failed
while draining FlowFiles", throwable));
+ if (!result.isDone()) {
+ completeDrain(result, flowContext, initialQueueSize);
+ }
+ } catch (final Exception e) {
+ if (!result.isDone()) {
+ result.completeExceptionally(new RuntimeException("Failed
while draining FlowFiles", e));
+ }
}
- return null;
});
return result;
@@ -242,7 +267,7 @@ public abstract class AbstractConnector implements
Connector {
}
}
- private void startNonSourceProcessors(final CompletableFuture<Void>
result, final FlowContext flowContext) {
+ private void startNonSourceComponents(final CompletableFuture<Void>
result, final FlowContext flowContext) {
if (result.isDone()) {
return;
}
@@ -256,8 +281,8 @@ public abstract class AbstractConnector implements
Connector {
enableServices.get();
if (!result.isDone()) {
- getLogger().info("Starting all non-source processors to
facilitate drainage of FlowFiles");
- startNonSourceProcessors(flowContext).get();
+ getLogger().info("Starting all non-source components to
facilitate drainage of FlowFiles");
+ startNonSourceComponents(flowContext).get();
}
} catch (final Exception e) {
try {
@@ -266,7 +291,7 @@ public abstract class AbstractConnector implements
Connector {
e.addSuppressed(e1);
}
- result.completeExceptionally(new RuntimeException("Failed to start
non-source processors while draining FlowFiles", e.getCause()));
+ result.completeExceptionally(new RuntimeException("Failed to start
non-source components while draining FlowFiles", e.getCause()));
}
}
@@ -380,25 +405,44 @@ public abstract class AbstractConnector implements
Connector {
return group.getQueueSize().getObjectCount() == 0;
}
- protected CompletableFuture<Void> stopSourceProcessors(final FlowContext
context) {
- final List<ProcessorFacade> sourceProcessors =
getSourceProcessors(context.getRootGroup());
+ protected CompletableFuture<Void> stopSourceComponents(final FlowContext
context) {
+ return stopSourceComponents(context.getRootGroup());
+ }
+ private CompletableFuture<Void> stopSourceComponents(final
ProcessGroupFacade group) {
final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
+
+ final List<ProcessorFacade> sourceProcessors =
getSourceProcessors(group);
for (final ProcessorFacade sourceProcessor : sourceProcessors) {
- final CompletableFuture<Void> stopFuture =
sourceProcessor.getLifecycle().stop();
- stopFutures.add(stopFuture);
+ stopFutures.add(sourceProcessor.getLifecycle().stop());
+ }
+
+ for (final ProcessGroupFacade childGroup : group.getProcessGroups()) {
+ stopFutures.add(stopSourceComponents(childGroup));
}
return CompletableFuture.allOf(stopFutures.toArray(new
CompletableFuture[0]));
}
- protected CompletableFuture<Void> startNonSourceProcessors(final
FlowContext flowContext) {
- final List<ProcessorFacade> nonSourceProcessors =
getNonSourceProcessors(flowContext.getRootGroup());
+ protected CompletableFuture<Void> startNonSourceComponents(final
FlowContext flowContext) {
+ return startNonSourceComponents(flowContext.getRootGroup());
+ }
+ private CompletableFuture<Void> startNonSourceComponents(final
ProcessGroupFacade group) {
final List<CompletableFuture<Void>> startFutures = new ArrayList<>();
+
+ final List<ProcessorFacade> nonSourceProcessors =
getNonSourceProcessors(group);
for (final ProcessorFacade nonSourceProcessor : nonSourceProcessors) {
- final CompletableFuture<Void> startFuture =
nonSourceProcessor.getLifecycle().start();
- startFutures.add(startFuture);
+ startFutures.add(nonSourceProcessor.getLifecycle().start());
+ }
+
+ final ProcessGroupLifecycle lifecycle = group.getLifecycle();
+ startFutures.add(lifecycle.startPorts(false));
+ startFutures.add(lifecycle.startRemoteProcessGroups(false));
+ startFutures.add(lifecycle.startStatelessGroups(false));
+
+ for (final ProcessGroupFacade childGroup : group.getProcessGroups()) {
+ startFutures.add(startNonSourceComponents(childGroup));
}
return CompletableFuture.allOf(startFutures.toArray(new
CompletableFuture[0]));
@@ -693,4 +737,8 @@ public abstract class AbstractConnector implements
Connector {
return Collections.emptyList();
}
+ @Override
+ public String toString() {
+ return description;
+ }
}
diff --git
a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
index 8cc3ebb..144094a 100644
---
a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
+++
b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java
@@ -30,13 +30,25 @@ public interface ProcessGroupLifecycle {
CompletableFuture<Void> disableControllerServices(Collection<String>
serviceIdentifiers);
- CompletableFuture<Void> startProcessors();
+ CompletableFuture<Void> startProcessors(boolean recursive);
CompletableFuture<Void> start(ControllerServiceReferenceScope
serviceReferenceScope);
CompletableFuture<Void> stop();
- CompletableFuture<Void> stopProcessors();
+ CompletableFuture<Void> stopProcessors(boolean recursive);
+
+ CompletableFuture<Void> startPorts(boolean recursive);
+
+ CompletableFuture<Void> stopPorts(boolean recursive);
+
+ CompletableFuture<Void> startRemoteProcessGroups(boolean recursive);
+
+ CompletableFuture<Void> stopRemoteProcessGroups(boolean recursive);
+
+ CompletableFuture<Void> startStatelessGroups(boolean recursive);
+
+ CompletableFuture<Void> stopStatelessGroups(boolean recursive);
int getActiveThreadCount();
}