This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/NIFI-15258 by this push:
     new a65897d912 NIFI-15538: When starting/stopping components allow 
specifying whethe… (#10843)
a65897d912 is described below

commit a65897d912a7e3eaad17be68069f4463cd01f00e
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 14:17:07 2026 -0500

    NIFI-15538: When starting/stopping components allow specifying whethe… 
(#10843)
    
    * NIFI-15538: When starting/stopping components allow specifying whether or 
not the action should be recursive; code cleanup and simplification to use a 
Virtual Thread to execute code sequentially instead of chaining 
CompletableFutures.
    
    * NIFI-15538: Addressed review feedback
    
    ---------
    
    Co-authored-by: Mark Payne <[email protected]>
---
 .../StandaloneProcessGroupLifecycle.java           | 192 ++++++++---
 .../connector/StandardConnectorNodeIT.java         |   5 +-
 .../AuthorizingProcessGroupLifecycle.java          |  44 ++-
 .../tests/system/ComponentLifecycleConnector.java  | 366 +++++++++++++++++++++
 .../org.apache.nifi.components.connector.Connector |   1 +
 .../system/connectors/ConnectorLifecycleIT.java    | 222 +++++++++++++
 6 files changed, 781 insertions(+), 49 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
index ccc2a0a8f3..28649b2777 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
@@ -182,8 +182,8 @@ public class StandaloneProcessGroupLifecycle implements 
ProcessGroupLifecycle {
     }
 
     @Override
-    public CompletableFuture<Void> startProcessors() {
-        final Collection<ProcessorNode> processors = 
processGroup.getProcessors();
+    public CompletableFuture<Void> startProcessors(final boolean recursive) {
+        final Collection<ProcessorNode> processors = recursive ? 
processGroup.findAllProcessors() : processGroup.getProcessors();
         final List<CompletableFuture<Void>> startFutures = new ArrayList<>();
         for (final ProcessorNode processor : processors) {
             // If Processor is not valid, perform validation again to ensure 
that the status is up to date.
@@ -192,7 +192,7 @@ public class StandaloneProcessGroupLifecycle implements 
ProcessGroupLifecycle {
                 processor.performValidation();
             }
 
-            startFutures.add(processGroup.startProcessor(processor, true));
+            
startFutures.add(processor.getProcessGroup().startProcessor(processor, true));
         }
 
         return CompletableFuture.allOf(startFutures.toArray(new 
CompletableFuture[0]));
@@ -204,74 +204,175 @@ public class StandaloneProcessGroupLifecycle implements 
ProcessGroupLifecycle {
             return statelessGroupLifecycle.start();
         }
 
-        final CompletableFuture<Void> enableServicesFuture = 
enableControllerServices(serviceReferenceScope, 
ControllerServiceReferenceHierarchy.DIRECT_SERVICES_ONLY);
-        final CompletableFuture<Void> startAllComponents = 
enableServicesFuture.thenRunAsync(this::startPorts)
-                .thenRunAsync(this::startRemoteProcessGroups)
-                .thenCompose(v -> startProcessors());
+        final CompletableFuture<Void> result = new CompletableFuture<>();
 
-        final List<CompletableFuture<Void>> childGroupFutures = new 
ArrayList<>();
-        for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
-            final String childGroupId = 
childGroup.getVersionedComponentId().orElse(null);
-            if (childGroupId == null) {
-                logger.warn("Encountered child Process Group {} without a 
Versioned Component ID. Skipping start of child group.", 
childGroup.getIdentifier());
-                continue;
-            }
+        Thread.startVirtualThread(() -> {
+            try {
+                enableControllerServices(serviceReferenceScope, 
ControllerServiceReferenceHierarchy.DIRECT_SERVICES_ONLY).get();
+                startPorts(false).get();
+                startRemoteProcessGroups(false).get();
+                startProcessors(false).get();
 
-            final ProcessGroupLifecycle childLifecycle = 
childGroupLifecycleFactory.apply(childGroupId);
-            final CompletableFuture<Void> childFuture = 
childLifecycle.start(serviceReferenceScope);
-            childGroupFutures.add(childFuture);
-        }
+                final List<CompletableFuture<Void>> childGroupFutures = new 
ArrayList<>();
+                for (final ProcessGroup childGroup : 
processGroup.getProcessGroups()) {
+                    final String childGroupId = 
childGroup.getVersionedComponentId().orElse(null);
+                    if (childGroupId == null) {
+                        logger.warn("Encountered child Process Group {} 
without a Versioned Component ID. Skipping start of child group.", 
childGroup.getIdentifier());
+                        continue;
+                    }
+
+                    final ProcessGroupLifecycle childLifecycle = 
childGroupLifecycleFactory.apply(childGroupId);
+                    
childGroupFutures.add(childLifecycle.start(serviceReferenceScope));
+                }
 
-        final CompletableFuture<Void> compositeChildFutures = 
CompletableFuture.allOf(childGroupFutures.toArray(new CompletableFuture[0]));
-        return CompletableFuture.allOf(startAllComponents, 
compositeChildFutures);
+                CompletableFuture.allOf(childGroupFutures.toArray(new 
CompletableFuture[0])).get();
+                result.complete(null);
+            } catch (final Exception e) {
+                result.completeExceptionally(e);
+            }
+        });
+
+        return result;
     }
 
-    private void startPorts() {
-        for (final Port inputPort : processGroup.getInputPorts()) {
-            processGroup.startInputPort(inputPort);
+    @Override
+    public CompletableFuture<Void> startPorts(final boolean recursive) {
+        logger.debug("{} starting all ports", this);
+        final Collection<Port> inputPorts = recursive ? 
processGroup.findAllInputPorts() : processGroup.getInputPorts();
+        for (final Port inputPort : inputPorts) {
+            inputPort.getProcessGroup().startInputPort(inputPort);
         }
-        for (final Port outputPort : processGroup.getOutputPorts()) {
-            processGroup.startOutputPort(outputPort);
+
+        final Collection<Port> outputPorts = recursive ? 
processGroup.findAllOutputPorts() : processGroup.getOutputPorts();
+        for (final Port outputPort : outputPorts) {
+            outputPort.getProcessGroup().startOutputPort(outputPort);
         }
+
+        logger.info("{} started all ports", this);
+        return CompletableFuture.completedFuture(null);
     }
 
-    private void stopPorts() {
-        for (final Port inputPort : processGroup.getInputPorts()) {
-            processGroup.stopInputPort(inputPort);
+    @Override
+    public CompletableFuture<Void> stopPorts(final boolean recursive) {
+        logger.debug("{} stopping all ports", this);
+
+        final Collection<Port> inputPorts = recursive ? 
processGroup.findAllInputPorts() : processGroup.getInputPorts();
+        for (final Port inputPort : inputPorts) {
+            inputPort.getProcessGroup().stopInputPort(inputPort);
         }
-        for (final Port outputPort : processGroup.getOutputPorts()) {
-            processGroup.stopOutputPort(outputPort);
+
+        final Collection<Port> outputPorts = recursive ? 
processGroup.findAllOutputPorts() : processGroup.getOutputPorts();
+        for (final Port outputPort : outputPorts) {
+            outputPort.getProcessGroup().stopOutputPort(outputPort);
         }
+
+        logger.info("{} stopped all ports", this);
+        return CompletableFuture.completedFuture(null);
     }
 
-    private void startRemoteProcessGroups() {
-        for (final RemoteProcessGroup rpg : 
processGroup.getRemoteProcessGroups()) {
+    @Override
+    public CompletableFuture<Void> startRemoteProcessGroups(final boolean 
recursive) {
+        logger.debug("{} starting all Remote Process Groups", this);
+
+        final Collection<RemoteProcessGroup> rpgs = recursive ? 
processGroup.findAllRemoteProcessGroups() : 
processGroup.getRemoteProcessGroups();
+        for (final RemoteProcessGroup rpg : rpgs) {
             rpg.startTransmitting();
         }
+
+        logger.info("{} started all Remote Process Groups", this);
+        return CompletableFuture.completedFuture(null);
     }
 
-    private CompletableFuture<Void> stopRemoteProcessGroups() {
+    @Override
+    public CompletableFuture<Void> stopRemoteProcessGroups(final boolean 
recursive) {
+        logger.debug("{} stopping all Remote Process Groups", this);
         final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
 
-        for (final RemoteProcessGroup rpg : 
processGroup.getRemoteProcessGroups()) {
+        final Collection<RemoteProcessGroup> rpgs = recursive ? 
processGroup.findAllRemoteProcessGroups() : 
processGroup.getRemoteProcessGroups();
+        for (final RemoteProcessGroup rpg : rpgs) {
             stopFutures.add(rpg.stopTransmitting());
         }
 
+        logger.info("{} stopped all Remote Process Groups", this);
+        return CompletableFuture.allOf(stopFutures.toArray(new 
CompletableFuture[0]));
+    }
+
+    @Override
+    public CompletableFuture<Void> startStatelessGroups(final boolean 
recursive) {
+        logger.debug("{} starting all Stateless Process Groups", this);
+        final List<CompletableFuture<Void>> startFutures = new ArrayList<>();
+
+        final Collection<ProcessGroup> processGroups = 
processGroup.getProcessGroups();
+        for (final ProcessGroup childGroup : processGroups) {
+            final String childGroupId = 
childGroup.getVersionedComponentId().orElse(null);
+            if (childGroupId == null) {
+                logger.warn("Encountered stateless child Process Group {} 
without a Versioned Component ID. Skipping start.", childGroup.getIdentifier());
+                continue;
+            }
+
+            final ProcessGroupLifecycle childLifecycle = 
childGroupLifecycleFactory.apply(childGroupId);
+
+            if (childGroup.resolveExecutionEngine() == 
ExecutionEngine.STATELESS) {
+                
startFutures.add(childLifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY));
+            } else if (recursive) {
+                startFutures.add(childLifecycle.startStatelessGroups(true));
+            }
+        }
+
+        logger.info("{} started all Stateless Process Groups", this);
+        return CompletableFuture.allOf(startFutures.toArray(new 
CompletableFuture[0]));
+    }
+
+    @Override
+    public CompletableFuture<Void> stopStatelessGroups(final boolean 
recursive) {
+        logger.debug("{} stopping all Stateless Process Groups", this);
+        final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
+
+        for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
+            final String childGroupId = 
childGroup.getVersionedComponentId().orElse(null);
+            if (childGroupId == null) {
+                logger.warn("Encountered stateless child Process Group {} 
without a Versioned Component ID. Skipping stop.", childGroup.getIdentifier());
+                continue;
+            }
+
+            final ProcessGroupLifecycle childLifecycle = 
childGroupLifecycleFactory.apply(childGroupId);
+
+            if (childGroup.resolveExecutionEngine() == 
ExecutionEngine.STATELESS) {
+                stopFutures.add(childLifecycle.stop());
+            } else if (recursive) {
+                stopFutures.add(childLifecycle.stopStatelessGroups(true));
+            }
+        }
+
+        logger.info("{} stopped all Stateless Process Groups", this);
         return CompletableFuture.allOf(stopFutures.toArray(new 
CompletableFuture[0]));
     }
 
     @Override
     public CompletableFuture<Void> stop() {
+        logger.debug("Stopping Process Group {}", 
processGroup.getIdentifier());
         if (processGroup.resolveExecutionEngine() == 
ExecutionEngine.STATELESS) {
             return statelessGroupLifecycle.stop();
         }
 
-        final CompletableFuture<Void> stopProcessorsFuture = stopProcessors();
+        final CompletableFuture<Void> result = new CompletableFuture<>();
+
+        Thread.startVirtualThread(() -> {
+            try {
+                stopProcessors(false).get();
+                stopChildren().get();
+                stopPorts(false).get();
+                stopRemoteProcessGroups(false).get();
+                
disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
+
+                logger.info("Stopped Process Group {}", 
processGroup.getIdentifier());
+                result.complete(null);
+            } catch (final Exception e) {
+                result.completeExceptionally(e);
+            }
+        });
 
-        return stopProcessorsFuture.thenCompose(voidValue -> stopChildren())
-            .thenRunAsync(this::stopPorts)
-            .thenCompose(voidValue -> stopRemoteProcessGroups())
-            .thenCompose(voidValue -> 
disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS));
+        return result;
     }
 
     private CompletableFuture<Void> stopChildren() {
@@ -292,11 +393,11 @@ public class StandaloneProcessGroupLifecycle implements 
ProcessGroupLifecycle {
     }
 
     @Override
-    public CompletableFuture<Void> stopProcessors() {
-        final Collection<ProcessorNode> processors = 
processGroup.getProcessors();
+    public CompletableFuture<Void> stopProcessors(final boolean recursive) {
+        final Collection<ProcessorNode> processors = recursive ? 
processGroup.findAllProcessors() : processGroup.getProcessors();
         final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
         for (final ProcessorNode processor : processors) {
-            final CompletableFuture<Void> stopFuture = 
processGroup.stopProcessor(processor);
+            final CompletableFuture<Void> stopFuture = 
processor.getProcessGroup().stopProcessor(processor);
             stopFutures.add(stopFuture);
         }
 
@@ -318,4 +419,11 @@ public class StandaloneProcessGroupLifecycle implements 
ProcessGroupLifecycle {
         }
         return total;
     }
+
+    @Override
+    public String toString() {
+        return "StandaloneProcessGroupLifecycle[" +
+            "processGroup=" + processGroup.getIdentifier() +
+            "]";
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
index 53d7337248..afdf4f7062 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
@@ -496,7 +496,7 @@ public class StandardConnectorNodeIT {
     }
 
     @Test
-    public void testPurgeFlowFilesEmptiesQueues() throws FlowUpdateException, 
ExecutionException, InterruptedException, TimeoutException {
+    public void testPurgeFlowFilesEmptiesQueues() throws ExecutionException, 
InterruptedException, TimeoutException {
         final ConnectorNode connectorNode = initializeDynamicFlowConnector();
         final ProcessGroup rootGroup = 
connectorNode.getActiveFlowContext().getManagedProcessGroup();
 
@@ -511,7 +511,7 @@ public class StandardConnectorNodeIT {
     }
 
     @Test
-    public void testPurgeFlowFilesMultipleQueues() throws FlowUpdateException, 
ExecutionException, InterruptedException, TimeoutException {
+    public void testPurgeFlowFilesMultipleQueues() throws ExecutionException, 
InterruptedException, TimeoutException {
         final ConnectorNode connectorNode = initializeDynamicFlowConnector();
         final ProcessGroup rootGroup = 
connectorNode.getActiveFlowContext().getManagedProcessGroup();
 
@@ -545,7 +545,6 @@ public class StandardConnectorNodeIT {
         connectorNode.stop(componentLifecycleThreadPool);
     }
 
-
     private List<String> getConfigurationStepNames(final ConnectorNode 
connectorNode) {
         return connectorNode.getConfigurationSteps().stream()
             .map(ConfigurationStep::getName)
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
index 94f822d724..891e3e95ef 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
@@ -62,9 +62,9 @@ public class AuthorizingProcessGroupLifecycle implements 
ProcessGroupLifecycle {
     }
 
     @Override
-    public CompletableFuture<Void> startProcessors() {
+    public CompletableFuture<Void> startProcessors(final boolean recursive) {
         authContext.authorizeWrite();
-        return delegate.startProcessors();
+        return delegate.startProcessors(recursive);
     }
 
     @Override
@@ -80,9 +80,45 @@ public class AuthorizingProcessGroupLifecycle implements 
ProcessGroupLifecycle {
     }
 
     @Override
-    public CompletableFuture<Void> stopProcessors() {
+    public CompletableFuture<Void> stopProcessors(final boolean recursive) {
         authContext.authorizeWrite();
-        return delegate.stopProcessors();
+        return delegate.stopProcessors(recursive);
+    }
+
+    @Override
+    public CompletableFuture<Void> startPorts(final boolean recursive) {
+        authContext.authorizeWrite();
+        return delegate.startPorts(recursive);
+    }
+
+    @Override
+    public CompletableFuture<Void> stopPorts(final boolean recursive) {
+        authContext.authorizeWrite();
+        return delegate.stopPorts(recursive);
+    }
+
+    @Override
+    public CompletableFuture<Void> startRemoteProcessGroups(final boolean 
recursive) {
+        authContext.authorizeWrite();
+        return delegate.startRemoteProcessGroups(recursive);
+    }
+
+    @Override
+    public CompletableFuture<Void> stopRemoteProcessGroups(final boolean 
recursive) {
+        authContext.authorizeWrite();
+        return delegate.stopRemoteProcessGroups(recursive);
+    }
+
+    @Override
+    public CompletableFuture<Void> startStatelessGroups(final boolean 
recursive) {
+        authContext.authorizeWrite();
+        return delegate.startStatelessGroups(recursive);
+    }
+
+    @Override
+    public CompletableFuture<Void> stopStatelessGroups(final boolean 
recursive) {
+        authContext.authorizeWrite();
+        return delegate.stopStatelessGroups(recursive);
     }
 
     @Override
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
new file mode 100644
index 0000000000..40b4b41475
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.ExecutionEngine;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Test Connector designed to verify the complete component lifecycle.
+ * Creates a flow with:
+ * - A processor at the root level
+ * - A child process group with input and output ports
+ * - A processor within the child group
+ * - A stateless group with a processor
+ *
+ * This allows testing that start/stop operations properly handle all 
component types recursively.
+ */
+public class ComponentLifecycleConnector extends AbstractConnector {
+
+    public static final String ROOT_PROCESSOR_ID = "root-processor-id";
+    public static final String CHILD_GROUP_ID = "child-group-id";
+    public static final String CHILD_INPUT_PORT_ID = "child-input-port-id";
+    public static final String CHILD_OUTPUT_PORT_ID = "child-output-port-id";
+    public static final String CHILD_PROCESSOR_ID = "child-processor-id";
+    public static final String STATELESS_GROUP_ID = "stateless-group-id";
+    public static final String STATELESS_PROCESSOR_ID = 
"stateless-processor-id";
+    public static final String STATELESS_INPUT_PORT_ID = 
"stateless-input-port-id";
+    public static final String ROOT_CONTROLLER_SERVICE_ID = 
"root-controller-service-id";
+    public static final String CHILD_CONTROLLER_SERVICE_ID = 
"child-controller-service-id";
+
+    private static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = createBundle();
+
+    @Override
+    protected void onStepConfigured(final String stepName, final FlowContext 
workingContext) {
+    }
+
+    @Override
+    public VersionedExternalFlow getInitialFlow() {
+        final VersionedProcessGroup rootGroup = createRootGroup();
+        final VersionedExternalFlow flow = new VersionedExternalFlow();
+        flow.setFlowContents(rootGroup);
+        return flow;
+    }
+
+    private VersionedProcessGroup createRootGroup() {
+        final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+        rootGroup.setIdentifier(UUID.randomUUID().toString());
+        rootGroup.setName("Component Lifecycle Root");
+        rootGroup.setPosition(new Position(0, 0));
+        rootGroup.setProcessors(new HashSet<>());
+        rootGroup.setProcessGroups(new HashSet<>());
+        rootGroup.setConnections(new HashSet<>());
+        rootGroup.setInputPorts(new HashSet<>());
+        rootGroup.setOutputPorts(new HashSet<>());
+        rootGroup.setControllerServices(new HashSet<>());
+        rootGroup.setLabels(new HashSet<>());
+        rootGroup.setFunnels(new HashSet<>());
+        rootGroup.setRemoteProcessGroups(new HashSet<>());
+        rootGroup.setScheduledState(ScheduledState.ENABLED);
+        rootGroup.setExecutionEngine(ExecutionEngine.STANDARD);
+        rootGroup.setComponentType(ComponentType.PROCESS_GROUP);
+
+        // Create root-level Controller Service
+        final VersionedControllerService rootControllerService = 
createControllerService(ROOT_CONTROLLER_SERVICE_ID, "Root Count Service", 
rootGroup.getIdentifier());
+        rootGroup.getControllerServices().add(rootControllerService);
+
+        // Create root-level processor (GenerateFlowFile)
+        final VersionedProcessor rootProcessor = 
createProcessor(ROOT_PROCESSOR_ID, "Root GenerateFlowFile",
+            "org.apache.nifi.processors.tests.system.GenerateFlowFile", new 
Position(100, 100));
+        rootProcessor.setGroupIdentifier(rootGroup.getIdentifier());
+        rootProcessor.setSchedulingPeriod("10 sec");
+        rootGroup.getProcessors().add(rootProcessor);
+
+        // Create root-level processor (TerminateFlowFile)
+        final VersionedProcessor rootTerminateProcessor = 
createProcessor("root-terminate-processor-id", "Root TerminateFlowFile",
+            "org.apache.nifi.processors.tests.system.TerminateFlowFile", new 
Position(300, 100));
+        rootTerminateProcessor.setGroupIdentifier(rootGroup.getIdentifier());
+        rootGroup.getProcessors().add(rootTerminateProcessor);
+
+        // Create child process group with ports and processor
+        final VersionedProcessGroup childGroup = 
createChildGroup(rootGroup.getIdentifier());
+        rootGroup.getProcessGroups().add(childGroup);
+
+        // Create connection from root processor to child group's input port
+        final VersionedConnection rootToChildConnection = createConnection(
+            createConnectableComponent(ROOT_PROCESSOR_ID, "Root 
GenerateFlowFile", ConnectableComponentType.PROCESSOR, 
rootGroup.getIdentifier()),
+            Set.of("success"),
+            createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", 
ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID),
+            rootGroup.getIdentifier()
+        );
+        rootGroup.getConnections().add(rootToChildConnection);
+
+        // Create connection from child group's output port to root terminate 
processor
+        final VersionedConnection childToRootConnection = createConnection(
+            createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output", 
ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID),
+            Set.of(""),
+            createConnectableComponent("root-terminate-processor-id", "Root 
TerminateFlowFile", ConnectableComponentType.PROCESSOR, 
rootGroup.getIdentifier()),
+            rootGroup.getIdentifier()
+        );
+        rootGroup.getConnections().add(childToRootConnection);
+
+        return rootGroup;
+    }
+
+    private VersionedProcessGroup createChildGroup(final String parentGroupId) 
{
+        final VersionedProcessGroup childGroup = new VersionedProcessGroup();
+        childGroup.setIdentifier(CHILD_GROUP_ID);
+        childGroup.setName("Child Group");
+        childGroup.setPosition(new Position(100, 300));
+        childGroup.setProcessors(new HashSet<>());
+        childGroup.setProcessGroups(new HashSet<>());
+        childGroup.setConnections(new HashSet<>());
+        childGroup.setInputPorts(new HashSet<>());
+        childGroup.setOutputPorts(new HashSet<>());
+        childGroup.setControllerServices(new HashSet<>());
+        childGroup.setLabels(new HashSet<>());
+        childGroup.setFunnels(new HashSet<>());
+        childGroup.setRemoteProcessGroups(new HashSet<>());
+        childGroup.setScheduledState(ScheduledState.ENABLED);
+        childGroup.setExecutionEngine(ExecutionEngine.STANDARD);
+        childGroup.setComponentType(ComponentType.PROCESS_GROUP);
+        childGroup.setGroupIdentifier(parentGroupId);
+
+        // Create Controller Service in child group
+        final VersionedControllerService childControllerService = 
createControllerService(CHILD_CONTROLLER_SERVICE_ID, "Child Count Service", 
CHILD_GROUP_ID);
+        childGroup.getControllerServices().add(childControllerService);
+
+        // Create input port
+        final VersionedPort inputPort = createPort(CHILD_INPUT_PORT_ID, "Child 
Input", true, CHILD_GROUP_ID);
+        childGroup.getInputPorts().add(inputPort);
+
+        // Create output port
+        final VersionedPort outputPort = createPort(CHILD_OUTPUT_PORT_ID, 
"Child Output", false, CHILD_GROUP_ID);
+        childGroup.getOutputPorts().add(outputPort);
+
+        // Create processor in child group
+        final VersionedProcessor childProcessor = 
createProcessor(CHILD_PROCESSOR_ID, "Child Terminate",
+            "org.apache.nifi.processors.tests.system.PassThrough", new 
Position(100, 100));
+        childProcessor.setGroupIdentifier(CHILD_GROUP_ID);
+        childGroup.getProcessors().add(childProcessor);
+
+        // Create stateless group
+        final VersionedProcessGroup statelessGroup = 
createStatelessGroup(CHILD_GROUP_ID);
+        childGroup.getProcessGroups().add(statelessGroup);
+
+        // Connection: input port -> child processor
+        final VersionedConnection inputToProcessor = createConnection(
+            createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", 
ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID),
+            Set.of(""),
+            createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate", 
ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID),
+            CHILD_GROUP_ID
+        );
+        childGroup.getConnections().add(inputToProcessor);
+
+        // Connection: input port -> stateless group
+        final VersionedConnection inputToStateless = createConnection(
+            createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input", 
ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID),
+            Set.of(""),
+            createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless 
Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID),
+            CHILD_GROUP_ID
+        );
+        childGroup.getConnections().add(inputToStateless);
+
+        // Connection: child processor -> output port
+        final VersionedConnection processorToOutput = createConnection(
+            createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate", 
ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID),
+            Set.of("success"),
+            createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output", 
ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID),
+            CHILD_GROUP_ID
+        );
+        childGroup.getConnections().add(processorToOutput);
+
+        return childGroup;
+    }
+
+    private VersionedProcessGroup createStatelessGroup(final String 
parentGroupId) {
+        final VersionedProcessGroup statelessGroup = new 
VersionedProcessGroup();
+        statelessGroup.setIdentifier(STATELESS_GROUP_ID);
+        statelessGroup.setName("Stateless Group");
+        statelessGroup.setPosition(new Position(400, 100));
+        statelessGroup.setProcessors(new HashSet<>());
+        statelessGroup.setProcessGroups(new HashSet<>());
+        statelessGroup.setConnections(new HashSet<>());
+        statelessGroup.setInputPorts(new HashSet<>());
+        statelessGroup.setOutputPorts(new HashSet<>());
+        statelessGroup.setControllerServices(new HashSet<>());
+        statelessGroup.setLabels(new HashSet<>());
+        statelessGroup.setFunnels(new HashSet<>());
+        statelessGroup.setRemoteProcessGroups(new HashSet<>());
+        statelessGroup.setScheduledState(ScheduledState.ENABLED);
+        statelessGroup.setExecutionEngine(ExecutionEngine.STATELESS);
+        statelessGroup.setStatelessFlowTimeout("1 min");
+        statelessGroup.setComponentType(ComponentType.PROCESS_GROUP);
+        statelessGroup.setGroupIdentifier(parentGroupId);
+
+        // Create input port for stateless group
+        final VersionedPort statelessInput = 
createPort(STATELESS_INPUT_PORT_ID, "Stateless Input", true, 
STATELESS_GROUP_ID);
+        statelessGroup.getInputPorts().add(statelessInput);
+
+        // Create processor in stateless group
+        final VersionedProcessor statelessProcessor = 
createProcessor(STATELESS_PROCESSOR_ID, "Stateless Terminate",
+            "org.apache.nifi.processors.tests.system.TerminateFlowFile", new 
Position(100, 100));
+        statelessProcessor.setGroupIdentifier(STATELESS_GROUP_ID);
+        statelessGroup.getProcessors().add(statelessProcessor);
+
+        // Connection: input port -> processor
+        final VersionedConnection inputToProcessor = createConnection(
+            createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless 
Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID),
+            Set.of(""),
+            createConnectableComponent(STATELESS_PROCESSOR_ID, "Stateless 
Terminate", ConnectableComponentType.PROCESSOR, STATELESS_GROUP_ID),
+            STATELESS_GROUP_ID
+        );
+        statelessGroup.getConnections().add(inputToProcessor);
+
+        return statelessGroup;
+    }
+
+    private VersionedProcessor createProcessor(final String id, final String 
name, final String type, final Position position) {
+        final VersionedProcessor processor = new VersionedProcessor();
+        processor.setIdentifier(id);
+        processor.setName(name);
+        processor.setType(type);
+        processor.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE);
+        processor.setPosition(position);
+        processor.setProperties(Map.of());
+        processor.setPropertyDescriptors(Map.of());
+        processor.setSchedulingPeriod("0 sec");
+        processor.setSchedulingStrategy("TIMER_DRIVEN");
+        processor.setExecutionNode("ALL");
+        processor.setPenaltyDuration("30 sec");
+        processor.setYieldDuration("1 sec");
+        processor.setBulletinLevel("WARN");
+        processor.setRunDurationMillis(0L);
+        processor.setConcurrentlySchedulableTaskCount(1);
+        processor.setAutoTerminatedRelationships(Set.of());
+        processor.setScheduledState(ScheduledState.ENABLED);
+        processor.setRetryCount(0);
+        processor.setRetriedRelationships(Set.of());
+        processor.setComponentType(ComponentType.PROCESSOR);
+        return processor;
+    }
+
+    private VersionedPort createPort(final String id, final String name, final 
boolean isInput, final String groupId) {
+        final VersionedPort port = new VersionedPort();
+        port.setIdentifier(id);
+        port.setName(name);
+        port.setPosition(new Position(isInput ? 0 : 200, 0));
+        port.setType(isInput ? PortType.INPUT_PORT : PortType.OUTPUT_PORT);
+        port.setComponentType(isInput ? ComponentType.INPUT_PORT : 
ComponentType.OUTPUT_PORT);
+        port.setConcurrentlySchedulableTaskCount(1);
+        port.setScheduledState(ScheduledState.ENABLED);
+        port.setAllowRemoteAccess(false);
+        port.setGroupIdentifier(groupId);
+        return port;
+    }
+
+    private VersionedControllerService createControllerService(final String 
id, final String name, final String groupId) {
+        final VersionedControllerService service = new 
VersionedControllerService();
+        service.setIdentifier(id);
+        service.setName(name);
+        
service.setType("org.apache.nifi.cs.tests.system.StandardCountService");
+        service.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE);
+        service.setGroupIdentifier(groupId);
+        service.setProperties(Map.of());
+        service.setPropertyDescriptors(Map.of());
+        service.setScheduledState(ScheduledState.ENABLED);
+        service.setBulletinLevel("WARN");
+
+        final ControllerServiceAPI serviceApi = new ControllerServiceAPI();
+        serviceApi.setType("org.apache.nifi.cs.tests.system.CountService");
+        serviceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE);
+        
service.setControllerServiceApis(Collections.singletonList(serviceApi));
+
+        return service;
+    }
+
+    private VersionedConnection createConnection(final ConnectableComponent 
source, final Set<String> relationships,
+                                                  final ConnectableComponent 
destination, final String groupId) {
+        final VersionedConnection connection = new VersionedConnection();
+        connection.setIdentifier(UUID.randomUUID().toString());
+        connection.setName("");
+        connection.setSource(source);
+        connection.setDestination(destination);
+        connection.setSelectedRelationships(relationships);
+        connection.setBackPressureObjectThreshold(10000L);
+        connection.setBackPressureDataSizeThreshold("1 GB");
+        connection.setFlowFileExpiration("0 sec");
+        connection.setLabelIndex(0);
+        connection.setzIndex(0L);
+        connection.setComponentType(ComponentType.CONNECTION);
+        connection.setGroupIdentifier(groupId);
+        return connection;
+    }
+
+    private ConnectableComponent createConnectableComponent(final String id, 
final String name, final ConnectableComponentType type, final String groupId) {
+        final ConnectableComponent component = new ConnectableComponent();
+        component.setId(id);
+        component.setName(name);
+        component.setType(type);
+        component.setGroupId(groupId);
+        return component;
+    }
+
+    private static Bundle createBundle() {
+        final Bundle bundle = new Bundle();
+        bundle.setGroup("org.apache.nifi");
+        bundle.setArtifact("nifi-system-test-extensions-nar");
+        bundle.setVersion("2.8.0-SNAPSHOT");
+        return bundle;
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verifyConfigurationStep(final String 
stepName, final Map<String, String> propertyValueOverrides, final FlowContext 
flowContext) {
+        return List.of();
+    }
+
+    @Override
+    public List<ConfigurationStep> getConfigurationSteps() {
+        return List.of();
+    }
+
+    @Override
+    public void applyUpdate(final FlowContext workingFlowContext, final 
FlowContext activeFlowContext) throws FlowUpdateException {
+        getInitializationContext().updateFlow(activeFlowContext, 
getInitialFlow());
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index d58dae1924..d20e64c3d3 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -16,6 +16,7 @@
 org.apache.nifi.connectors.tests.system.AssetConnector
 org.apache.nifi.connectors.tests.system.BundleResolutionConnector
 org.apache.nifi.connectors.tests.system.CalculateConnector
+org.apache.nifi.connectors.tests.system.ComponentLifecycleConnector
 org.apache.nifi.connectors.tests.system.DataQueuingConnector
 org.apache.nifi.connectors.tests.system.GatedDataQueuingConnector
 org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorLifecycleIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorLifecycleIT.java
new file mode 100644
index 0000000000..5860286bcc
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorLifecycleIT.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.groups.StatelessGroupScheduledState;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System test that verifies the connector start/stop lifecycle correctly 
starts and stops
+ * all component types: processors, ports, and stateless groups.
+ */
+public class ConnectorLifecycleIT extends NiFiSystemIT {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ConnectorLifecycleIT.class);
+
+
+    @Test
+    public void testStartStopStartsAndStopsAllComponents() throws 
NiFiClientException, IOException, InterruptedException {
+        logger.info("Creating ComponentLifecycleConnector");
+        final ConnectorEntity connector = 
getClientUtil().createConnector("ComponentLifecycleConnector");
+        assertNotNull(connector);
+        final String connectorId = connector.getId();
+
+        logger.info("Applying connector configuration");
+        getClientUtil().applyConnectorUpdate(connector);
+
+        logger.info("Waiting for connector to be valid");
+        getClientUtil().waitForValidConnector(connectorId);
+
+        logger.info("Starting connector {}", connectorId);
+        getClientUtil().startConnector(connectorId);
+
+        logger.info("Verifying flow has components after start");
+        verifyFlowHasComponents(connectorId);
+
+        logger.info("Verifying all processors are running");
+        waitForAllProcessorsRunning(connectorId);
+
+        logger.info("Verifying all ports are running");
+        waitForAllPortsRunning(connectorId);
+
+        logger.info("Verifying stateless group is running");
+        waitForStatelessGroupRunning(connectorId);
+
+        logger.info("Stopping connector {}", connectorId);
+        getClientUtil().stopConnector(connectorId);
+        getClientUtil().waitForConnectorStopped(connectorId);
+
+        logger.info("Verifying all processors are stopped");
+        waitForAllProcessorsStopped(connectorId);
+
+        logger.info("Verifying all ports are stopped");
+        waitForAllPortsStopped(connectorId);
+
+        logger.info("Verifying stateless group is stopped");
+        waitForStatelessGroupStopped(connectorId);
+
+        logger.info("testStartStopStartsAndStopsAllComponents completed 
successfully");
+    }
+
+    private void verifyFlowHasComponents(final String connectorId) throws 
NiFiClientException, IOException {
+        final ProcessGroupFlowEntity flowEntity = 
getNifiClient().getConnectorClient().getFlow(connectorId);
+        final FlowDTO flowDto = flowEntity.getProcessGroupFlow().getFlow();
+
+        boolean hasProcessors = !flowDto.getProcessors().isEmpty();
+        boolean hasChildGroups = !flowDto.getProcessGroups().isEmpty();
+
+        logger.info("Root group has {} processors and {} child groups",
+            flowDto.getProcessors().size(), flowDto.getProcessGroups().size());
+
+        assertTrue(hasProcessors || hasChildGroups, "Flow should have 
processors or child groups");
+    }
+
+    private void waitForAllProcessorsRunning(final String connectorId) throws 
InterruptedException {
+        waitFor(() -> allProcessorsInState(connectorId, null, 
ScheduledState.RUNNING.name()));
+    }
+
+    private void waitForAllProcessorsStopped(final String connectorId) throws 
InterruptedException {
+        waitFor(() -> allProcessorsInState(connectorId, null, 
ScheduledState.STOPPED.name()));
+    }
+
+    private boolean allProcessorsInState(final String connectorId, final 
String groupId, final String expectedState) throws NiFiClientException, 
IOException {
+        final FlowDTO flowDto = getFlow(connectorId, groupId);
+
+        for (final ProcessorEntity processorEntity : flowDto.getProcessors()) {
+            final ProcessorDTO processorDto = processorEntity.getComponent();
+            final String state = processorDto.getState();
+            if (!expectedState.equals(state) && 
!ScheduledState.DISABLED.name().equals(state)) {
+                logger.debug("Processor {} ({}) in group {} is in state {} 
(expected {})",
+                    processorDto.getName(), processorEntity.getId(), groupId, 
state, expectedState);
+                return false;
+            }
+        }
+
+        for (final ProcessGroupEntity childGroupEntity : 
flowDto.getProcessGroups()) {
+            final ProcessGroupDTO childGroupDto = 
childGroupEntity.getComponent();
+            if (!"STATELESS".equals(childGroupDto.getExecutionEngine())) {
+                if (!allProcessorsInState(connectorId, 
childGroupEntity.getId(), expectedState)) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    private void waitForAllPortsRunning(final String connectorId) throws 
InterruptedException {
+        waitFor(() -> allPortsInState(connectorId, null, 
ScheduledState.RUNNING.name()));
+    }
+
+    private void waitForAllPortsStopped(final String connectorId) throws 
InterruptedException {
+        waitFor(() -> allPortsInState(connectorId, null, 
ScheduledState.STOPPED.name()));
+    }
+
+    private boolean allPortsInState(final String connectorId, final String 
groupId, final String expectedState) throws NiFiClientException, IOException {
+        final FlowDTO flowDto = getFlow(connectorId, groupId);
+
+        for (final PortEntity portEntity : flowDto.getInputPorts()) {
+            final PortDTO portDto = portEntity.getComponent();
+            final String state = portDto.getState();
+            if (!expectedState.equals(state) && 
!ScheduledState.DISABLED.name().equals(state)) {
+                logger.debug("Input port {} is in state {} (expected {})", 
portDto.getName(), state, expectedState);
+                return false;
+            }
+        }
+
+        for (final PortEntity portEntity : flowDto.getOutputPorts()) {
+            final PortDTO portDto = portEntity.getComponent();
+            final String state = portDto.getState();
+            if (!expectedState.equals(state) && 
!ScheduledState.DISABLED.name().equals(state)) {
+                logger.debug("Output port {} is in state {} (expected {})", 
portDto.getName(), state, expectedState);
+                return false;
+            }
+        }
+
+        for (final ProcessGroupEntity childGroupEntity : 
flowDto.getProcessGroups()) {
+            final ProcessGroupDTO childGroupDto = 
childGroupEntity.getComponent();
+            if (!"STATELESS".equals(childGroupDto.getExecutionEngine())) {
+                if (!allPortsInState(connectorId, childGroupEntity.getId(), 
expectedState)) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    private void waitForStatelessGroupRunning(final String connectorId) throws 
InterruptedException {
+        waitFor(() -> isStatelessGroupInState(connectorId, 
StatelessGroupScheduledState.RUNNING.name()));
+    }
+
+    private void waitForStatelessGroupStopped(final String connectorId) throws 
InterruptedException {
+        waitFor(() -> isStatelessGroupInState(connectorId, 
StatelessGroupScheduledState.STOPPED.name()));
+    }
+
+    private boolean isStatelessGroupInState(final String connectorId, final 
String expectedState) throws NiFiClientException, IOException {
+        return findStatelessGroupInState(connectorId, null, expectedState);
+    }
+
+    private boolean findStatelessGroupInState(final String connectorId, final 
String groupId, final String expectedState) throws NiFiClientException, 
IOException {
+        final FlowDTO flowDto = getFlow(connectorId, groupId);
+
+        for (final ProcessGroupEntity childGroupEntity : 
flowDto.getProcessGroups()) {
+            final ProcessGroupDTO childGroupDto = 
childGroupEntity.getComponent();
+            if ("STATELESS".equals(childGroupDto.getExecutionEngine())) {
+                final String actualState = 
childGroupDto.getStatelessGroupScheduledState();
+                logger.debug("Stateless group {} is in state {} (expected 
{})", childGroupDto.getName(), actualState, expectedState);
+                if (expectedState.equals(actualState)) {
+                    return true;
+                }
+            } else {
+                if (findStatelessGroupInState(connectorId, 
childGroupEntity.getId(), expectedState)) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    private FlowDTO getFlow(final String connectorId, final String groupId) 
throws NiFiClientException, IOException {
+        final ProcessGroupFlowEntity flowEntity = (groupId == null)
+            ? getNifiClient().getConnectorClient().getFlow(connectorId)
+            : getNifiClient().getConnectorClient().getFlow(connectorId, 
groupId);
+
+        return flowEntity.getProcessGroupFlow().getFlow();
+    }
+}

Reply via email to