This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new a65897d912 NIFI-15538: When starting/stopping components allow
specifying whethe… (#10843)
a65897d912 is described below
commit a65897d912a7e3eaad17be68069f4463cd01f00e
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 14:17:07 2026 -0500
NIFI-15538: When starting/stopping components allow specifying whethe…
(#10843)
* NIFI-15538: When starting/stopping components allow specifying whether or
not the action should be recursive; code cleanup and simplification to use a
Virtual Thread to execute code sequentially instead of chaining
CompletableFutures.
* NIFI-15538: Addressed review feedback
---------
Co-authored-by: Mark Payne <[email protected]>
---
.../StandaloneProcessGroupLifecycle.java | 192 ++++++++---
.../connector/StandardConnectorNodeIT.java | 5 +-
.../AuthorizingProcessGroupLifecycle.java | 44 ++-
.../tests/system/ComponentLifecycleConnector.java | 366 +++++++++++++++++++++
.../org.apache.nifi.components.connector.Connector | 1 +
.../system/connectors/ConnectorLifecycleIT.java | 222 +++++++++++++
6 files changed, 781 insertions(+), 49 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
index ccc2a0a8f3..28649b2777 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
@@ -182,8 +182,8 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
}
@Override
- public CompletableFuture<Void> startProcessors() {
- final Collection<ProcessorNode> processors =
processGroup.getProcessors();
+ public CompletableFuture<Void> startProcessors(final boolean recursive) {
+ final Collection<ProcessorNode> processors = recursive ?
processGroup.findAllProcessors() : processGroup.getProcessors();
final List<CompletableFuture<Void>> startFutures = new ArrayList<>();
for (final ProcessorNode processor : processors) {
// If Processor is not valid, perform validation again to ensure
that the status is up to date.
@@ -192,7 +192,7 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
processor.performValidation();
}
- startFutures.add(processGroup.startProcessor(processor, true));
+
startFutures.add(processor.getProcessGroup().startProcessor(processor, true));
}
return CompletableFuture.allOf(startFutures.toArray(new
CompletableFuture[0]));
@@ -204,74 +204,175 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
return statelessGroupLifecycle.start();
}
- final CompletableFuture<Void> enableServicesFuture =
enableControllerServices(serviceReferenceScope,
ControllerServiceReferenceHierarchy.DIRECT_SERVICES_ONLY);
- final CompletableFuture<Void> startAllComponents =
enableServicesFuture.thenRunAsync(this::startPorts)
- .thenRunAsync(this::startRemoteProcessGroups)
- .thenCompose(v -> startProcessors());
+ final CompletableFuture<Void> result = new CompletableFuture<>();
- final List<CompletableFuture<Void>> childGroupFutures = new
ArrayList<>();
- for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
- final String childGroupId =
childGroup.getVersionedComponentId().orElse(null);
- if (childGroupId == null) {
- logger.warn("Encountered child Process Group {} without a
Versioned Component ID. Skipping start of child group.",
childGroup.getIdentifier());
- continue;
- }
+ Thread.startVirtualThread(() -> {
+ try {
+ enableControllerServices(serviceReferenceScope,
ControllerServiceReferenceHierarchy.DIRECT_SERVICES_ONLY).get();
+ startPorts(false).get();
+ startRemoteProcessGroups(false).get();
+ startProcessors(false).get();
- final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroupId);
- final CompletableFuture<Void> childFuture =
childLifecycle.start(serviceReferenceScope);
- childGroupFutures.add(childFuture);
- }
+ final List<CompletableFuture<Void>> childGroupFutures = new
ArrayList<>();
+ for (final ProcessGroup childGroup :
processGroup.getProcessGroups()) {
+ final String childGroupId =
childGroup.getVersionedComponentId().orElse(null);
+ if (childGroupId == null) {
+ logger.warn("Encountered child Process Group {}
without a Versioned Component ID. Skipping start of child group.",
childGroup.getIdentifier());
+ continue;
+ }
+
+ final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroupId);
+
childGroupFutures.add(childLifecycle.start(serviceReferenceScope));
+ }
- final CompletableFuture<Void> compositeChildFutures =
CompletableFuture.allOf(childGroupFutures.toArray(new CompletableFuture[0]));
- return CompletableFuture.allOf(startAllComponents,
compositeChildFutures);
+ CompletableFuture.allOf(childGroupFutures.toArray(new
CompletableFuture[0])).get();
+ result.complete(null);
+ } catch (final Exception e) {
+ result.completeExceptionally(e);
+ }
+ });
+
+ return result;
}
- private void startPorts() {
- for (final Port inputPort : processGroup.getInputPorts()) {
- processGroup.startInputPort(inputPort);
+ @Override
+ public CompletableFuture<Void> startPorts(final boolean recursive) {
+ logger.debug("{} starting all ports", this);
+ final Collection<Port> inputPorts = recursive ?
processGroup.findAllInputPorts() : processGroup.getInputPorts();
+ for (final Port inputPort : inputPorts) {
+ inputPort.getProcessGroup().startInputPort(inputPort);
}
- for (final Port outputPort : processGroup.getOutputPorts()) {
- processGroup.startOutputPort(outputPort);
+
+ final Collection<Port> outputPorts = recursive ?
processGroup.findAllOutputPorts() : processGroup.getOutputPorts();
+ for (final Port outputPort : outputPorts) {
+ outputPort.getProcessGroup().startOutputPort(outputPort);
}
+
+ logger.info("{} started all ports", this);
+ return CompletableFuture.completedFuture(null);
}
- private void stopPorts() {
- for (final Port inputPort : processGroup.getInputPorts()) {
- processGroup.stopInputPort(inputPort);
+ @Override
+ public CompletableFuture<Void> stopPorts(final boolean recursive) {
+ logger.debug("{} stopping all ports", this);
+
+ final Collection<Port> inputPorts = recursive ?
processGroup.findAllInputPorts() : processGroup.getInputPorts();
+ for (final Port inputPort : inputPorts) {
+ inputPort.getProcessGroup().stopInputPort(inputPort);
}
- for (final Port outputPort : processGroup.getOutputPorts()) {
- processGroup.stopOutputPort(outputPort);
+
+ final Collection<Port> outputPorts = recursive ?
processGroup.findAllOutputPorts() : processGroup.getOutputPorts();
+ for (final Port outputPort : outputPorts) {
+ outputPort.getProcessGroup().stopOutputPort(outputPort);
}
+
+ logger.info("{} stopped all ports", this);
+ return CompletableFuture.completedFuture(null);
}
- private void startRemoteProcessGroups() {
- for (final RemoteProcessGroup rpg :
processGroup.getRemoteProcessGroups()) {
+ @Override
+ public CompletableFuture<Void> startRemoteProcessGroups(final boolean
recursive) {
+ logger.debug("{} starting all Remote Process Groups", this);
+
+ final Collection<RemoteProcessGroup> rpgs = recursive ?
processGroup.findAllRemoteProcessGroups() :
processGroup.getRemoteProcessGroups();
+ for (final RemoteProcessGroup rpg : rpgs) {
rpg.startTransmitting();
}
+
+ logger.info("{} started all Remote Process Groups", this);
+ return CompletableFuture.completedFuture(null);
}
- private CompletableFuture<Void> stopRemoteProcessGroups() {
+ @Override
+ public CompletableFuture<Void> stopRemoteProcessGroups(final boolean
recursive) {
+ logger.debug("{} stopping all Remote Process Groups", this);
final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
- for (final RemoteProcessGroup rpg :
processGroup.getRemoteProcessGroups()) {
+ final Collection<RemoteProcessGroup> rpgs = recursive ?
processGroup.findAllRemoteProcessGroups() :
processGroup.getRemoteProcessGroups();
+ for (final RemoteProcessGroup rpg : rpgs) {
stopFutures.add(rpg.stopTransmitting());
}
+ logger.info("{} stopped all Remote Process Groups", this);
+ return CompletableFuture.allOf(stopFutures.toArray(new
CompletableFuture[0]));
+ }
+
+ @Override
+ public CompletableFuture<Void> startStatelessGroups(final boolean
recursive) {
+ logger.debug("{} starting all Stateless Process Groups", this);
+ final List<CompletableFuture<Void>> startFutures = new ArrayList<>();
+
+ final Collection<ProcessGroup> processGroups =
processGroup.getProcessGroups();
+ for (final ProcessGroup childGroup : processGroups) {
+ final String childGroupId =
childGroup.getVersionedComponentId().orElse(null);
+ if (childGroupId == null) {
+ logger.warn("Encountered stateless child Process Group {}
without a Versioned Component ID. Skipping start.", childGroup.getIdentifier());
+ continue;
+ }
+
+ final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroupId);
+
+ if (childGroup.resolveExecutionEngine() ==
ExecutionEngine.STATELESS) {
+
startFutures.add(childLifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY));
+ } else if (recursive) {
+ startFutures.add(childLifecycle.startStatelessGroups(true));
+ }
+ }
+
+ logger.info("{} started all Stateless Process Groups", this);
+ return CompletableFuture.allOf(startFutures.toArray(new
CompletableFuture[0]));
+ }
+
+ @Override
+ public CompletableFuture<Void> stopStatelessGroups(final boolean
recursive) {
+ logger.debug("{} stopping all Stateless Process Groups", this);
+ final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
+
+ for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
+ final String childGroupId =
childGroup.getVersionedComponentId().orElse(null);
+ if (childGroupId == null) {
+ logger.warn("Encountered stateless child Process Group {}
without a Versioned Component ID. Skipping stop.", childGroup.getIdentifier());
+ continue;
+ }
+
+ final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroupId);
+
+ if (childGroup.resolveExecutionEngine() ==
ExecutionEngine.STATELESS) {
+ stopFutures.add(childLifecycle.stop());
+ } else if (recursive) {
+ stopFutures.add(childLifecycle.stopStatelessGroups(true));
+ }
+ }
+
+ logger.info("{} stopped all Stateless Process Groups", this);
return CompletableFuture.allOf(stopFutures.toArray(new
CompletableFuture[0]));
}
@Override
public CompletableFuture<Void> stop() {
+ logger.debug("Stopping Process Group {}",
processGroup.getIdentifier());
if (processGroup.resolveExecutionEngine() ==
ExecutionEngine.STATELESS) {
return statelessGroupLifecycle.stop();
}
- final CompletableFuture<Void> stopProcessorsFuture = stopProcessors();
+ final CompletableFuture<Void> result = new CompletableFuture<>();
+
+ Thread.startVirtualThread(() -> {
+ try {
+ stopProcessors(false).get();
+ stopChildren().get();
+ stopPorts(false).get();
+ stopRemoteProcessGroups(false).get();
+
disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
+
+ logger.info("Stopped Process Group {}",
processGroup.getIdentifier());
+ result.complete(null);
+ } catch (final Exception e) {
+ result.completeExceptionally(e);
+ }
+ });
- return stopProcessorsFuture.thenCompose(voidValue -> stopChildren())
- .thenRunAsync(this::stopPorts)
- .thenCompose(voidValue -> stopRemoteProcessGroups())
- .thenCompose(voidValue ->
disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS));
+ return result;
}
private CompletableFuture<Void> stopChildren() {
@@ -292,11 +393,11 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
}
@Override
- public CompletableFuture<Void> stopProcessors() {
- final Collection<ProcessorNode> processors =
processGroup.getProcessors();
+ public CompletableFuture<Void> stopProcessors(final boolean recursive) {
+ final Collection<ProcessorNode> processors = recursive ?
processGroup.findAllProcessors() : processGroup.getProcessors();
final List<CompletableFuture<Void>> stopFutures = new ArrayList<>();
for (final ProcessorNode processor : processors) {
- final CompletableFuture<Void> stopFuture =
processGroup.stopProcessor(processor);
+ final CompletableFuture<Void> stopFuture =
processor.getProcessGroup().stopProcessor(processor);
stopFutures.add(stopFuture);
}
@@ -318,4 +419,11 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
}
return total;
}
+
+ @Override
+ public String toString() {
+ return "StandaloneProcessGroupLifecycle[" +
+ "processGroup=" + processGroup.getIdentifier() +
+ "]";
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
index 53d7337248..afdf4f7062 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
@@ -496,7 +496,7 @@ public class StandardConnectorNodeIT {
}
@Test
- public void testPurgeFlowFilesEmptiesQueues() throws FlowUpdateException,
ExecutionException, InterruptedException, TimeoutException {
+ public void testPurgeFlowFilesEmptiesQueues() throws ExecutionException,
InterruptedException, TimeoutException {
final ConnectorNode connectorNode = initializeDynamicFlowConnector();
final ProcessGroup rootGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
@@ -511,7 +511,7 @@ public class StandardConnectorNodeIT {
}
@Test
- public void testPurgeFlowFilesMultipleQueues() throws FlowUpdateException,
ExecutionException, InterruptedException, TimeoutException {
+ public void testPurgeFlowFilesMultipleQueues() throws ExecutionException,
InterruptedException, TimeoutException {
final ConnectorNode connectorNode = initializeDynamicFlowConnector();
final ProcessGroup rootGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
@@ -545,7 +545,6 @@ public class StandardConnectorNodeIT {
connectorNode.stop(componentLifecycleThreadPool);
}
-
private List<String> getConfigurationStepNames(final ConnectorNode
connectorNode) {
return connectorNode.getConfigurationSteps().stream()
.map(ConfigurationStep::getName)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
index 94f822d724..891e3e95ef 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/connector/authorization/AuthorizingProcessGroupLifecycle.java
@@ -62,9 +62,9 @@ public class AuthorizingProcessGroupLifecycle implements
ProcessGroupLifecycle {
}
@Override
- public CompletableFuture<Void> startProcessors() {
+ public CompletableFuture<Void> startProcessors(final boolean recursive) {
authContext.authorizeWrite();
- return delegate.startProcessors();
+ return delegate.startProcessors(recursive);
}
@Override
@@ -80,9 +80,45 @@ public class AuthorizingProcessGroupLifecycle implements
ProcessGroupLifecycle {
}
@Override
- public CompletableFuture<Void> stopProcessors() {
+ public CompletableFuture<Void> stopProcessors(final boolean recursive) {
authContext.authorizeWrite();
- return delegate.stopProcessors();
+ return delegate.stopProcessors(recursive);
+ }
+
+ @Override
+ public CompletableFuture<Void> startPorts(final boolean recursive) {
+ authContext.authorizeWrite();
+ return delegate.startPorts(recursive);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopPorts(final boolean recursive) {
+ authContext.authorizeWrite();
+ return delegate.stopPorts(recursive);
+ }
+
+ @Override
+ public CompletableFuture<Void> startRemoteProcessGroups(final boolean
recursive) {
+ authContext.authorizeWrite();
+ return delegate.startRemoteProcessGroups(recursive);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopRemoteProcessGroups(final boolean
recursive) {
+ authContext.authorizeWrite();
+ return delegate.stopRemoteProcessGroups(recursive);
+ }
+
+ @Override
+ public CompletableFuture<Void> startStatelessGroups(final boolean
recursive) {
+ authContext.authorizeWrite();
+ return delegate.startStatelessGroups(recursive);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopStatelessGroups(final boolean
recursive) {
+ authContext.authorizeWrite();
+ return delegate.stopStatelessGroups(recursive);
}
@Override
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
new file mode 100644
index 0000000000..40b4b41475
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.ExecutionEngine;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Test Connector designed to verify the complete component lifecycle.
+ * Creates a flow with:
+ * - A processor at the root level
+ * - A child process group with input and output ports
+ * - A processor within the child group
+ * - A stateless group with a processor
+ *
+ * This allows testing that start/stop operations properly handle all
component types recursively.
+ */
+public class ComponentLifecycleConnector extends AbstractConnector {
+
+ public static final String ROOT_PROCESSOR_ID = "root-processor-id";
+ public static final String CHILD_GROUP_ID = "child-group-id";
+ public static final String CHILD_INPUT_PORT_ID = "child-input-port-id";
+ public static final String CHILD_OUTPUT_PORT_ID = "child-output-port-id";
+ public static final String CHILD_PROCESSOR_ID = "child-processor-id";
+ public static final String STATELESS_GROUP_ID = "stateless-group-id";
+ public static final String STATELESS_PROCESSOR_ID =
"stateless-processor-id";
+ public static final String STATELESS_INPUT_PORT_ID =
"stateless-input-port-id";
+ public static final String ROOT_CONTROLLER_SERVICE_ID =
"root-controller-service-id";
+ public static final String CHILD_CONTROLLER_SERVICE_ID =
"child-controller-service-id";
+
+ private static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = createBundle();
+
+ @Override
+ protected void onStepConfigured(final String stepName, final FlowContext
workingContext) {
+ }
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ final VersionedProcessGroup rootGroup = createRootGroup();
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setFlowContents(rootGroup);
+ return flow;
+ }
+
+ private VersionedProcessGroup createRootGroup() {
+ final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+ rootGroup.setIdentifier(UUID.randomUUID().toString());
+ rootGroup.setName("Component Lifecycle Root");
+ rootGroup.setPosition(new Position(0, 0));
+ rootGroup.setProcessors(new HashSet<>());
+ rootGroup.setProcessGroups(new HashSet<>());
+ rootGroup.setConnections(new HashSet<>());
+ rootGroup.setInputPorts(new HashSet<>());
+ rootGroup.setOutputPorts(new HashSet<>());
+ rootGroup.setControllerServices(new HashSet<>());
+ rootGroup.setLabels(new HashSet<>());
+ rootGroup.setFunnels(new HashSet<>());
+ rootGroup.setRemoteProcessGroups(new HashSet<>());
+ rootGroup.setScheduledState(ScheduledState.ENABLED);
+ rootGroup.setExecutionEngine(ExecutionEngine.STANDARD);
+ rootGroup.setComponentType(ComponentType.PROCESS_GROUP);
+
+ // Create root-level Controller Service
+ final VersionedControllerService rootControllerService =
createControllerService(ROOT_CONTROLLER_SERVICE_ID, "Root Count Service",
rootGroup.getIdentifier());
+ rootGroup.getControllerServices().add(rootControllerService);
+
+ // Create root-level processor (GenerateFlowFile)
+ final VersionedProcessor rootProcessor =
createProcessor(ROOT_PROCESSOR_ID, "Root GenerateFlowFile",
+ "org.apache.nifi.processors.tests.system.GenerateFlowFile", new
Position(100, 100));
+ rootProcessor.setGroupIdentifier(rootGroup.getIdentifier());
+ rootProcessor.setSchedulingPeriod("10 sec");
+ rootGroup.getProcessors().add(rootProcessor);
+
+ // Create root-level processor (TerminateFlowFile)
+ final VersionedProcessor rootTerminateProcessor =
createProcessor("root-terminate-processor-id", "Root TerminateFlowFile",
+ "org.apache.nifi.processors.tests.system.TerminateFlowFile", new
Position(300, 100));
+ rootTerminateProcessor.setGroupIdentifier(rootGroup.getIdentifier());
+ rootGroup.getProcessors().add(rootTerminateProcessor);
+
+ // Create child process group with ports and processor
+ final VersionedProcessGroup childGroup =
createChildGroup(rootGroup.getIdentifier());
+ rootGroup.getProcessGroups().add(childGroup);
+
+ // Create connection from root processor to child group's input port
+ final VersionedConnection rootToChildConnection = createConnection(
+ createConnectableComponent(ROOT_PROCESSOR_ID, "Root
GenerateFlowFile", ConnectableComponentType.PROCESSOR,
rootGroup.getIdentifier()),
+ Set.of("success"),
+ createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input",
ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID),
+ rootGroup.getIdentifier()
+ );
+ rootGroup.getConnections().add(rootToChildConnection);
+
+ // Create connection from child group's output port to root terminate
processor
+ final VersionedConnection childToRootConnection = createConnection(
+ createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output",
ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID),
+ Set.of(""),
+ createConnectableComponent("root-terminate-processor-id", "Root
TerminateFlowFile", ConnectableComponentType.PROCESSOR,
rootGroup.getIdentifier()),
+ rootGroup.getIdentifier()
+ );
+ rootGroup.getConnections().add(childToRootConnection);
+
+ return rootGroup;
+ }
+
+ private VersionedProcessGroup createChildGroup(final String parentGroupId)
{
+ final VersionedProcessGroup childGroup = new VersionedProcessGroup();
+ childGroup.setIdentifier(CHILD_GROUP_ID);
+ childGroup.setName("Child Group");
+ childGroup.setPosition(new Position(100, 300));
+ childGroup.setProcessors(new HashSet<>());
+ childGroup.setProcessGroups(new HashSet<>());
+ childGroup.setConnections(new HashSet<>());
+ childGroup.setInputPorts(new HashSet<>());
+ childGroup.setOutputPorts(new HashSet<>());
+ childGroup.setControllerServices(new HashSet<>());
+ childGroup.setLabels(new HashSet<>());
+ childGroup.setFunnels(new HashSet<>());
+ childGroup.setRemoteProcessGroups(new HashSet<>());
+ childGroup.setScheduledState(ScheduledState.ENABLED);
+ childGroup.setExecutionEngine(ExecutionEngine.STANDARD);
+ childGroup.setComponentType(ComponentType.PROCESS_GROUP);
+ childGroup.setGroupIdentifier(parentGroupId);
+
+ // Create Controller Service in child group
+ final VersionedControllerService childControllerService =
createControllerService(CHILD_CONTROLLER_SERVICE_ID, "Child Count Service",
CHILD_GROUP_ID);
+ childGroup.getControllerServices().add(childControllerService);
+
+ // Create input port
+ final VersionedPort inputPort = createPort(CHILD_INPUT_PORT_ID, "Child
Input", true, CHILD_GROUP_ID);
+ childGroup.getInputPorts().add(inputPort);
+
+ // Create output port
+ final VersionedPort outputPort = createPort(CHILD_OUTPUT_PORT_ID,
"Child Output", false, CHILD_GROUP_ID);
+ childGroup.getOutputPorts().add(outputPort);
+
+ // Create processor in child group
+ final VersionedProcessor childProcessor =
createProcessor(CHILD_PROCESSOR_ID, "Child Terminate",
+ "org.apache.nifi.processors.tests.system.PassThrough", new
Position(100, 100));
+ childProcessor.setGroupIdentifier(CHILD_GROUP_ID);
+ childGroup.getProcessors().add(childProcessor);
+
+ // Create stateless group
+ final VersionedProcessGroup statelessGroup =
createStatelessGroup(CHILD_GROUP_ID);
+ childGroup.getProcessGroups().add(statelessGroup);
+
+ // Connection: input port -> child processor
+ final VersionedConnection inputToProcessor = createConnection(
+ createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input",
ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID),
+ Set.of(""),
+ createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate",
ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID),
+ CHILD_GROUP_ID
+ );
+ childGroup.getConnections().add(inputToProcessor);
+
+ // Connection: input port -> stateless group
+ final VersionedConnection inputToStateless = createConnection(
+ createConnectableComponent(CHILD_INPUT_PORT_ID, "Child Input",
ConnectableComponentType.INPUT_PORT, CHILD_GROUP_ID),
+ Set.of(""),
+ createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless
Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID),
+ CHILD_GROUP_ID
+ );
+ childGroup.getConnections().add(inputToStateless);
+
+ // Connection: child processor -> output port
+ final VersionedConnection processorToOutput = createConnection(
+ createConnectableComponent(CHILD_PROCESSOR_ID, "Child Terminate",
ConnectableComponentType.PROCESSOR, CHILD_GROUP_ID),
+ Set.of("success"),
+ createConnectableComponent(CHILD_OUTPUT_PORT_ID, "Child Output",
ConnectableComponentType.OUTPUT_PORT, CHILD_GROUP_ID),
+ CHILD_GROUP_ID
+ );
+ childGroup.getConnections().add(processorToOutput);
+
+ return childGroup;
+ }
+
+ private VersionedProcessGroup createStatelessGroup(final String
parentGroupId) {
+ final VersionedProcessGroup statelessGroup = new
VersionedProcessGroup();
+ statelessGroup.setIdentifier(STATELESS_GROUP_ID);
+ statelessGroup.setName("Stateless Group");
+ statelessGroup.setPosition(new Position(400, 100));
+ statelessGroup.setProcessors(new HashSet<>());
+ statelessGroup.setProcessGroups(new HashSet<>());
+ statelessGroup.setConnections(new HashSet<>());
+ statelessGroup.setInputPorts(new HashSet<>());
+ statelessGroup.setOutputPorts(new HashSet<>());
+ statelessGroup.setControllerServices(new HashSet<>());
+ statelessGroup.setLabels(new HashSet<>());
+ statelessGroup.setFunnels(new HashSet<>());
+ statelessGroup.setRemoteProcessGroups(new HashSet<>());
+ statelessGroup.setScheduledState(ScheduledState.ENABLED);
+ statelessGroup.setExecutionEngine(ExecutionEngine.STATELESS);
+ statelessGroup.setStatelessFlowTimeout("1 min");
+ statelessGroup.setComponentType(ComponentType.PROCESS_GROUP);
+ statelessGroup.setGroupIdentifier(parentGroupId);
+
+ // Create input port for stateless group
+ final VersionedPort statelessInput =
createPort(STATELESS_INPUT_PORT_ID, "Stateless Input", true,
STATELESS_GROUP_ID);
+ statelessGroup.getInputPorts().add(statelessInput);
+
+ // Create processor in stateless group
+ final VersionedProcessor statelessProcessor =
createProcessor(STATELESS_PROCESSOR_ID, "Stateless Terminate",
+ "org.apache.nifi.processors.tests.system.TerminateFlowFile", new
Position(100, 100));
+ statelessProcessor.setGroupIdentifier(STATELESS_GROUP_ID);
+ statelessGroup.getProcessors().add(statelessProcessor);
+
+ // Connection: input port -> processor
+ final VersionedConnection inputToProcessor = createConnection(
+ createConnectableComponent(STATELESS_INPUT_PORT_ID, "Stateless
Input", ConnectableComponentType.INPUT_PORT, STATELESS_GROUP_ID),
+ Set.of(""),
+ createConnectableComponent(STATELESS_PROCESSOR_ID, "Stateless
Terminate", ConnectableComponentType.PROCESSOR, STATELESS_GROUP_ID),
+ STATELESS_GROUP_ID
+ );
+ statelessGroup.getConnections().add(inputToProcessor);
+
+ return statelessGroup;
+ }
+
+ private VersionedProcessor createProcessor(final String id, final String
name, final String type, final Position position) {
+ final VersionedProcessor processor = new VersionedProcessor();
+ processor.setIdentifier(id);
+ processor.setName(name);
+ processor.setType(type);
+ processor.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE);
+ processor.setPosition(position);
+ processor.setProperties(Map.of());
+ processor.setPropertyDescriptors(Map.of());
+ processor.setSchedulingPeriod("0 sec");
+ processor.setSchedulingStrategy("TIMER_DRIVEN");
+ processor.setExecutionNode("ALL");
+ processor.setPenaltyDuration("30 sec");
+ processor.setYieldDuration("1 sec");
+ processor.setBulletinLevel("WARN");
+ processor.setRunDurationMillis(0L);
+ processor.setConcurrentlySchedulableTaskCount(1);
+ processor.setAutoTerminatedRelationships(Set.of());
+ processor.setScheduledState(ScheduledState.ENABLED);
+ processor.setRetryCount(0);
+ processor.setRetriedRelationships(Set.of());
+ processor.setComponentType(ComponentType.PROCESSOR);
+ return processor;
+ }
+
+ private VersionedPort createPort(final String id, final String name, final
boolean isInput, final String groupId) {
+ final VersionedPort port = new VersionedPort();
+ port.setIdentifier(id);
+ port.setName(name);
+ port.setPosition(new Position(isInput ? 0 : 200, 0));
+ port.setType(isInput ? PortType.INPUT_PORT : PortType.OUTPUT_PORT);
+ port.setComponentType(isInput ? ComponentType.INPUT_PORT :
ComponentType.OUTPUT_PORT);
+ port.setConcurrentlySchedulableTaskCount(1);
+ port.setScheduledState(ScheduledState.ENABLED);
+ port.setAllowRemoteAccess(false);
+ port.setGroupIdentifier(groupId);
+ return port;
+ }
+
+ private VersionedControllerService createControllerService(final String
id, final String name, final String groupId) {
+ final VersionedControllerService service = new
VersionedControllerService();
+ service.setIdentifier(id);
+ service.setName(name);
+
service.setType("org.apache.nifi.cs.tests.system.StandardCountService");
+ service.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE);
+ service.setGroupIdentifier(groupId);
+ service.setProperties(Map.of());
+ service.setPropertyDescriptors(Map.of());
+ service.setScheduledState(ScheduledState.ENABLED);
+ service.setBulletinLevel("WARN");
+
+ final ControllerServiceAPI serviceApi = new ControllerServiceAPI();
+ serviceApi.setType("org.apache.nifi.cs.tests.system.CountService");
+ serviceApi.setBundle(SYSTEM_TEST_EXTENSIONS_BUNDLE);
+
service.setControllerServiceApis(Collections.singletonList(serviceApi));
+
+ return service;
+ }
+
+ private VersionedConnection createConnection(final ConnectableComponent
source, final Set<String> relationships,
+ final ConnectableComponent
destination, final String groupId) {
+ final VersionedConnection connection = new VersionedConnection();
+ connection.setIdentifier(UUID.randomUUID().toString());
+ connection.setName("");
+ connection.setSource(source);
+ connection.setDestination(destination);
+ connection.setSelectedRelationships(relationships);
+ connection.setBackPressureObjectThreshold(10000L);
+ connection.setBackPressureDataSizeThreshold("1 GB");
+ connection.setFlowFileExpiration("0 sec");
+ connection.setLabelIndex(0);
+ connection.setzIndex(0L);
+ connection.setComponentType(ComponentType.CONNECTION);
+ connection.setGroupIdentifier(groupId);
+ return connection;
+ }
+
+ private ConnectableComponent createConnectableComponent(final String id,
final String name, final ConnectableComponentType type, final String groupId) {
+ final ConnectableComponent component = new ConnectableComponent();
+ component.setId(id);
+ component.setName(name);
+ component.setType(type);
+ component.setGroupId(groupId);
+ return component;
+ }
+
+ private static Bundle createBundle() {
+ final Bundle bundle = new Bundle();
+ bundle.setGroup("org.apache.nifi");
+ bundle.setArtifact("nifi-system-test-extensions-nar");
+ bundle.setVersion("2.8.0-SNAPSHOT");
+ return bundle;
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final Map<String, String> propertyValueOverrides, final FlowContext
flowContext) {
+ return List.of();
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of();
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingFlowContext, final
FlowContext activeFlowContext) throws FlowUpdateException {
+ getInitializationContext().updateFlow(activeFlowContext,
getInitialFlow());
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index d58dae1924..d20e64c3d3 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -16,6 +16,7 @@
org.apache.nifi.connectors.tests.system.AssetConnector
org.apache.nifi.connectors.tests.system.BundleResolutionConnector
org.apache.nifi.connectors.tests.system.CalculateConnector
+org.apache.nifi.connectors.tests.system.ComponentLifecycleConnector
org.apache.nifi.connectors.tests.system.DataQueuingConnector
org.apache.nifi.connectors.tests.system.GatedDataQueuingConnector
org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorLifecycleIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorLifecycleIT.java
new file mode 100644
index 0000000000..5860286bcc
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorLifecycleIT.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.groups.StatelessGroupScheduledState;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System test that verifies the connector start/stop lifecycle correctly
starts and stops
+ * all component types: processors, ports, and stateless groups.
+ */
+public class ConnectorLifecycleIT extends NiFiSystemIT {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ConnectorLifecycleIT.class);
+
+
+ @Test
+ public void testStartStopStartsAndStopsAllComponents() throws
NiFiClientException, IOException, InterruptedException {
+ logger.info("Creating ComponentLifecycleConnector");
+ final ConnectorEntity connector =
getClientUtil().createConnector("ComponentLifecycleConnector");
+ assertNotNull(connector);
+ final String connectorId = connector.getId();
+
+ logger.info("Applying connector configuration");
+ getClientUtil().applyConnectorUpdate(connector);
+
+ logger.info("Waiting for connector to be valid");
+ getClientUtil().waitForValidConnector(connectorId);
+
+ logger.info("Starting connector {}", connectorId);
+ getClientUtil().startConnector(connectorId);
+
+ logger.info("Verifying flow has components after start");
+ verifyFlowHasComponents(connectorId);
+
+ logger.info("Verifying all processors are running");
+ waitForAllProcessorsRunning(connectorId);
+
+ logger.info("Verifying all ports are running");
+ waitForAllPortsRunning(connectorId);
+
+ logger.info("Verifying stateless group is running");
+ waitForStatelessGroupRunning(connectorId);
+
+ logger.info("Stopping connector {}", connectorId);
+ getClientUtil().stopConnector(connectorId);
+ getClientUtil().waitForConnectorStopped(connectorId);
+
+ logger.info("Verifying all processors are stopped");
+ waitForAllProcessorsStopped(connectorId);
+
+ logger.info("Verifying all ports are stopped");
+ waitForAllPortsStopped(connectorId);
+
+ logger.info("Verifying stateless group is stopped");
+ waitForStatelessGroupStopped(connectorId);
+
+ logger.info("testStartStopStartsAndStopsAllComponents completed
successfully");
+ }
+
+ private void verifyFlowHasComponents(final String connectorId) throws
NiFiClientException, IOException {
+ final ProcessGroupFlowEntity flowEntity =
getNifiClient().getConnectorClient().getFlow(connectorId);
+ final FlowDTO flowDto = flowEntity.getProcessGroupFlow().getFlow();
+
+ boolean hasProcessors = !flowDto.getProcessors().isEmpty();
+ boolean hasChildGroups = !flowDto.getProcessGroups().isEmpty();
+
+ logger.info("Root group has {} processors and {} child groups",
+ flowDto.getProcessors().size(), flowDto.getProcessGroups().size());
+
+ assertTrue(hasProcessors || hasChildGroups, "Flow should have
processors or child groups");
+ }
+
+ private void waitForAllProcessorsRunning(final String connectorId) throws
InterruptedException {
+ waitFor(() -> allProcessorsInState(connectorId, null,
ScheduledState.RUNNING.name()));
+ }
+
+ private void waitForAllProcessorsStopped(final String connectorId) throws
InterruptedException {
+ waitFor(() -> allProcessorsInState(connectorId, null,
ScheduledState.STOPPED.name()));
+ }
+
+ private boolean allProcessorsInState(final String connectorId, final
String groupId, final String expectedState) throws NiFiClientException,
IOException {
+ final FlowDTO flowDto = getFlow(connectorId, groupId);
+
+ for (final ProcessorEntity processorEntity : flowDto.getProcessors()) {
+ final ProcessorDTO processorDto = processorEntity.getComponent();
+ final String state = processorDto.getState();
+ if (!expectedState.equals(state) &&
!ScheduledState.DISABLED.name().equals(state)) {
+ logger.debug("Processor {} ({}) in group {} is in state {}
(expected {})",
+ processorDto.getName(), processorEntity.getId(), groupId,
state, expectedState);
+ return false;
+ }
+ }
+
+ for (final ProcessGroupEntity childGroupEntity :
flowDto.getProcessGroups()) {
+ final ProcessGroupDTO childGroupDto =
childGroupEntity.getComponent();
+ if (!"STATELESS".equals(childGroupDto.getExecutionEngine())) {
+ if (!allProcessorsInState(connectorId,
childGroupEntity.getId(), expectedState)) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private void waitForAllPortsRunning(final String connectorId) throws
InterruptedException {
+ waitFor(() -> allPortsInState(connectorId, null,
ScheduledState.RUNNING.name()));
+ }
+
+ private void waitForAllPortsStopped(final String connectorId) throws
InterruptedException {
+ waitFor(() -> allPortsInState(connectorId, null,
ScheduledState.STOPPED.name()));
+ }
+
+ private boolean allPortsInState(final String connectorId, final String
groupId, final String expectedState) throws NiFiClientException, IOException {
+ final FlowDTO flowDto = getFlow(connectorId, groupId);
+
+ for (final PortEntity portEntity : flowDto.getInputPorts()) {
+ final PortDTO portDto = portEntity.getComponent();
+ final String state = portDto.getState();
+ if (!expectedState.equals(state) &&
!ScheduledState.DISABLED.name().equals(state)) {
+ logger.debug("Input port {} is in state {} (expected {})",
portDto.getName(), state, expectedState);
+ return false;
+ }
+ }
+
+ for (final PortEntity portEntity : flowDto.getOutputPorts()) {
+ final PortDTO portDto = portEntity.getComponent();
+ final String state = portDto.getState();
+ if (!expectedState.equals(state) &&
!ScheduledState.DISABLED.name().equals(state)) {
+ logger.debug("Output port {} is in state {} (expected {})",
portDto.getName(), state, expectedState);
+ return false;
+ }
+ }
+
+ for (final ProcessGroupEntity childGroupEntity :
flowDto.getProcessGroups()) {
+ final ProcessGroupDTO childGroupDto =
childGroupEntity.getComponent();
+ if (!"STATELESS".equals(childGroupDto.getExecutionEngine())) {
+ if (!allPortsInState(connectorId, childGroupEntity.getId(),
expectedState)) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private void waitForStatelessGroupRunning(final String connectorId) throws
InterruptedException {
+ waitFor(() -> isStatelessGroupInState(connectorId,
StatelessGroupScheduledState.RUNNING.name()));
+ }
+
+ private void waitForStatelessGroupStopped(final String connectorId) throws
InterruptedException {
+ waitFor(() -> isStatelessGroupInState(connectorId,
StatelessGroupScheduledState.STOPPED.name()));
+ }
+
+ private boolean isStatelessGroupInState(final String connectorId, final
String expectedState) throws NiFiClientException, IOException {
+ return findStatelessGroupInState(connectorId, null, expectedState);
+ }
+
+ private boolean findStatelessGroupInState(final String connectorId, final
String groupId, final String expectedState) throws NiFiClientException,
IOException {
+ final FlowDTO flowDto = getFlow(connectorId, groupId);
+
+ for (final ProcessGroupEntity childGroupEntity :
flowDto.getProcessGroups()) {
+ final ProcessGroupDTO childGroupDto =
childGroupEntity.getComponent();
+ if ("STATELESS".equals(childGroupDto.getExecutionEngine())) {
+ final String actualState =
childGroupDto.getStatelessGroupScheduledState();
+ logger.debug("Stateless group {} is in state {} (expected
{})", childGroupDto.getName(), actualState, expectedState);
+ if (expectedState.equals(actualState)) {
+ return true;
+ }
+ } else {
+ if (findStatelessGroupInState(connectorId,
childGroupEntity.getId(), expectedState)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private FlowDTO getFlow(final String connectorId, final String groupId)
throws NiFiClientException, IOException {
+ final ProcessGroupFlowEntity flowEntity = (groupId == null)
+ ? getNifiClient().getConnectorClient().getFlow(connectorId)
+ : getNifiClient().getConnectorClient().getFlow(connectorId,
groupId);
+
+ return flowEntity.getProcessGroupFlow().getFlow();
+ }
+}