This is an automated email from the ASF dual-hosted git repository.

briansolo1985 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f958fe068 NIFI-14230 Implement start/stop flow C2 operations in MiNiFi
6f958fe068 is described below

commit 6f958fe068d67e96f93830d3244c57dda753f9d2
Author: Peter Kedvessy <[email protected]>
AuthorDate: Tue Feb 4 09:01:26 2025 +0100

    NIFI-14230 Implement start/stop flow C2 operations in MiNiFi
    
    This closes #9694.
    
    Signed-off-by: Ferenc Kis <[email protected]>
---
 .../nifi/c2/client/service/C2HeartbeatFactory.java |  15 +-
 .../client/service/model/RuntimeInfoWrapper.java   |   9 +-
 .../service/operation/FlowStateStrategy.java}      |  27 +---
 .../operation/StartFlowOperationHandler.java       |  77 ++++++++++
 .../operation/StopFlowOperationHandler.java        |  77 ++++++++++
 .../c2/client/service/C2HeartbeatFactoryTest.java  |  13 +-
 .../DescribeManifestOperationHandlerTest.java      |   3 +-
 .../operation/StartFlowOperationHandlerTest.java   |  95 ++++++++++++
 .../operation/StopFlowOperationHandlerTest.java    |  95 ++++++++++++
 .../org/apache/nifi/c2/protocol/api/FlowInfo.java  |  10 ++
 .../apache/nifi/c2/protocol/api/OperandType.java   |   3 +-
 .../apache/nifi/c2/protocol/api/OperationType.java |   5 +-
 .../api/{OperandType.java => RunStatus.java}       |  27 +---
 .../apache/nifi/minifi/c2/C2NifiClientService.java |  42 +++++-
 .../c2/command/DefaultFlowStateStrategy.java       | 103 +++++++++++++
 .../c2/command/DefaultFlowStateStrategyTest.java   | 165 +++++++++++++++++++++
 16 files changed, 695 insertions(+), 71 deletions(-)

diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
index 240a3b84d7..1689cbf4a8 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
@@ -35,7 +35,6 @@ import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -48,13 +47,10 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
 import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
 import org.apache.nifi.c2.protocol.api.AgentStatus;
-import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
-import org.apache.nifi.c2.protocol.api.ProcessorStatus;
 import org.apache.nifi.c2.protocol.api.ResourceInfo;
 import org.apache.nifi.c2.protocol.api.C2Heartbeat;
 import org.apache.nifi.c2.protocol.api.DeviceInfo;
 import org.apache.nifi.c2.protocol.api.FlowInfo;
-import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
 import org.apache.nifi.c2.protocol.api.NetworkInfo;
 import org.apache.nifi.c2.protocol.api.SupportedOperation;
 import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
@@ -92,7 +88,7 @@ public class C2HeartbeatFactory {
 
         
heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), 
runtimeInfoWrapper.getManifest()));
         heartbeat.setDeviceInfo(generateDeviceInfo());
-        heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), 
runtimeInfoWrapper.getProcessorBulletins(), 
runtimeInfoWrapper.getProcessorStatus()));
+        heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper));
         heartbeat.setCreated(System.currentTimeMillis());
 
         ResourceInfo resourceInfo = new ResourceInfo();
@@ -102,11 +98,12 @@ public class C2HeartbeatFactory {
         return heartbeat;
     }
 
-    private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus, 
List<ProcessorBulletin> processorBulletins, List<ProcessorStatus> 
processorStatus) {
+    private FlowInfo getFlowInfo(RuntimeInfoWrapper runtimeInfoWrapper) {
         FlowInfo flowInfo = new FlowInfo();
-        flowInfo.setQueues(queueStatus);
-        flowInfo.setProcessorBulletins(processorBulletins);
-        flowInfo.setProcessorStatuses(processorStatus);
+        flowInfo.setQueues(runtimeInfoWrapper.getQueueStatus());
+        
flowInfo.setProcessorBulletins(runtimeInfoWrapper.getProcessorBulletins());
+        flowInfo.setProcessorStatuses(runtimeInfoWrapper.getProcessorStatus());
+        flowInfo.setRunStatus(runtimeInfoWrapper.getRunStatus());
         
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
         return flowInfo;
     }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
index 7cf064408c..3237fa8e67 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
@@ -22,6 +22,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
 import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
 import org.apache.nifi.c2.protocol.api.ProcessorStatus;
+import org.apache.nifi.c2.protocol.api.RunStatus;
 import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
 
 public class RuntimeInfoWrapper {
@@ -30,14 +31,16 @@ public class RuntimeInfoWrapper {
     final Map<String, FlowQueueStatus> queueStatus;
     final List<ProcessorBulletin> processorBulletins;
     final List<ProcessorStatus> processorStatus;
+    final RunStatus runStatus;
 
     public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest 
manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> 
processorBulletins,
-                              List<ProcessorStatus> processorStatus) {
+                              List<ProcessorStatus> processorStatus, RunStatus 
runStatus) {
         this.repos = repos;
         this.manifest = manifest;
         this.queueStatus = queueStatus;
         this.processorBulletins = processorBulletins;
         this.processorStatus = processorStatus;
+        this.runStatus = runStatus;
     }
 
     public AgentRepositories getAgentRepositories() {
@@ -59,4 +62,8 @@ public class RuntimeInfoWrapper {
     public List<ProcessorStatus> getProcessorStatus() {
         return processorStatus;
     }
+
+    public RunStatus getRunStatus() {
+        return runStatus;
+    }
 }
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/FlowStateStrategy.java
similarity index 59%
copy from 
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to 
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/FlowStateStrategy.java
index f2fcff3f27..074365ad56 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/FlowStateStrategy.java
@@ -15,30 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.c2.client.service.operation;
 
-import java.util.Arrays;
-import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
 
-public enum OperandType {
+public interface FlowStateStrategy {
 
-    CONFIGURATION,
-    CONNECTION,
-    DEBUG,
-    MANIFEST,
-    REPOSITORY,
-    PROPERTIES,
-    ASSET,
-    RESOURCE;
+    OperationState start();
 
-    public static Optional<OperandType> fromString(String value) {
-        return Arrays.stream(values())
-            .filter(operandType -> operandType.name().equalsIgnoreCase(value))
-            .findAny();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString().toLowerCase();
-    }
+    OperationState stop();
 }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandler.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandler.java
new file mode 100644
index 0000000000..478e6b1b7d
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
+public class StartFlowOperationHandler implements C2OperationHandler {
+
+    public static final String NOT_APPLIED_DETAILS = "Failed to start flow, 
please check the log for errors";
+    public static final String PARTIALLY_APPLIED_DETAILS = "Some components 
failed to start, please check the log for errors";
+    public static final String FULLY_APPLIED_DETAILS = "Flow started";
+    public static final String UNEXPECTED_DETAILS = "Unexpected status, please 
check the log for errors";
+
+    private final FlowStateStrategy flowStateStrategy;
+
+    public StartFlowOperationHandler(FlowStateStrategy flowStateStrategy) {
+        this.flowStateStrategy = flowStateStrategy;
+    }
+
+    @Override
+    public OperationType getOperationType() {
+        return OperationType.START;
+    }
+
+    @Override
+    public OperandType getOperandType() {
+        return OperandType.FLOW;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public C2OperationAck handle(C2Operation operation) {
+        String operationId = 
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+        C2OperationState.OperationState operationState = 
flowStateStrategy.start();
+
+        C2OperationState resultState = operationState(
+                operationState,
+                switch (operationState) {
+                    case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+                    case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+                    case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+                    default -> UNEXPECTED_DETAILS;
+                }
+        );
+
+        return operationAck(operationId, resultState);
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandler.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandler.java
new file mode 100644
index 0000000000..29afb6e0ab
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
+public class StopFlowOperationHandler implements C2OperationHandler {
+
+    public static final String NOT_APPLIED_DETAILS = "Failed to stop flow, 
please check the log for errors";
+    public static final String FULLY_APPLIED_DETAILS = "Flow stopped";
+    public static final String PARTIALLY_APPLIED_DETAILS = "Some components 
failed to stop, please check the log for errors";
+    public static final String UNEXPECTED_DETAILS = "Unexpected status, please 
check the log for errors";
+
+    private final FlowStateStrategy flowStateStrategy;
+
+    public StopFlowOperationHandler(FlowStateStrategy flowStateStrategy) {
+        this.flowStateStrategy = flowStateStrategy;
+    }
+
+    @Override
+    public OperationType getOperationType() {
+        return OperationType.STOP;
+    }
+
+    @Override
+    public OperandType getOperandType() {
+        return OperandType.FLOW;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public C2OperationAck handle(C2Operation operation) {
+        String operationId = 
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+        C2OperationState.OperationState operationState = 
flowStateStrategy.stop();
+
+        C2OperationState resultState = operationState(
+                operationState,
+                switch (operationState) {
+                    case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+                    case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+                    case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+                    default -> UNEXPECTED_DETAILS;
+                }
+        );
+
+        return operationAck(operationId, resultState);
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
index fdc4056929..bbbdd54c7a 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.c2.client.service;
 
+import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -124,13 +125,14 @@ public class C2HeartbeatFactoryTest {
         List<ProcessorBulletin> processorBulletins = new ArrayList<>();
         List<ProcessorStatus> processorStatus = new ArrayList<>();
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, 
processorStatus));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, 
processorStatus, RUNNING));
 
         assertEquals(repos, 
heartbeat.getAgentInfo().getStatus().getRepositories());
         assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
         assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
         assertEquals(processorBulletins, 
heartbeat.getFlowInfo().getProcessorBulletins());
         assertEquals(processorStatus, 
heartbeat.getFlowInfo().getProcessorStatuses());
+        assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
     }
 
@@ -145,13 +147,14 @@ public class C2HeartbeatFactoryTest {
         List<ProcessorBulletin> processorBulletins = new ArrayList<>();
         List<ProcessorStatus> processorStatus = new ArrayList<>();
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, 
processorStatus));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, 
processorStatus, RUNNING));
 
         assertEquals(repos, 
heartbeat.getAgentInfo().getStatus().getRepositories());
         assertNull(heartbeat.getAgentInfo().getAgentManifest());
         assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
         assertEquals(processorBulletins, 
heartbeat.getFlowInfo().getProcessorBulletins());
         assertEquals(processorStatus, 
heartbeat.getFlowInfo().getProcessorStatuses());
+        assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
     }
 
@@ -168,10 +171,11 @@ public class C2HeartbeatFactoryTest {
         when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), 
Collections.emptySet())).thenReturn(MANIFEST_HASH);
         
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new 
ArrayList<>(), new ArrayList<>()));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new 
ArrayList<>(), new ArrayList<>(), RUNNING));
 
         assertEquals(MANIFEST_HASH, 
heartbeat.getAgentInfo().getAgentManifestHash());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
+        assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
     }
 
     @Test
@@ -184,10 +188,11 @@ public class C2HeartbeatFactoryTest {
         when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), 
supportedOperations)).thenReturn(MANIFEST_HASH);
         
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new 
ArrayList<>(), new ArrayList<>()));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new 
ArrayList<>(), new ArrayList<>(), RUNNING));
 
         assertEquals(MANIFEST_HASH, 
heartbeat.getAgentInfo().getAgentManifestHash());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
+        assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
     }
 
     private RuntimeManifest createManifest() {
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
index aeb7d55917..0de0a94142 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.c2.client.service.operation;
 
+import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.when;
 
@@ -58,7 +59,7 @@ public class DescribeManifestOperationHandlerTest {
     void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
         RuntimeManifest manifest = new RuntimeManifest();
         manifest.setIdentifier("manifestId");
-        RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, 
manifest, null, null, null);
+        RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, 
manifest, null, null, null, RUNNING);
 
         C2Heartbeat heartbeat = new C2Heartbeat();
         AgentInfo agentInfo = new AgentInfo();
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandlerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandlerTest.java
new file mode 100644
index 0000000000..be07253f76
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandlerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.stream.Stream;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static 
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.FULLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.NOT_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.UNEXPECTED_DETAILS;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
+import static org.apache.nifi.c2.protocol.api.OperationType.START;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StartFlowOperationHandlerTest {
+
+    private static final String OPERATION_ID = "operation id";
+
+    @Mock
+    private FlowStateStrategy flowStateStrategy;
+
+    @InjectMocks
+    private StartFlowOperationHandler victim;
+
+    @Test
+    public void testOperationAndOperandTypes() {
+        assertEquals(START, victim.getOperationType());
+        assertEquals(FLOW, victim.getOperandType());
+    }
+
+    @ParameterizedTest(name = "operationId={0} ackOperationId={1} ackState={2} 
ackDetails={3}")
+    @MethodSource("testHandleArguments")
+    public void testHandle(String operationId, String ackOperationId, 
OperationState ackState, String ackDetails) {
+        when(flowStateStrategy.start()).thenReturn(ackState);
+
+        C2OperationAck result = victim.handle(anOperation(operationId));
+
+        assertEquals(ackOperationId, result.getOperationId());
+        assertEquals(ackState, result.getOperationState().getState());
+        assertEquals(ackDetails, result.getOperationState().getDetails());
+    }
+
+    private C2Operation anOperation(String identifier) {
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(identifier);
+
+        return operation;
+    }
+
+    private static Stream<Arguments> testHandleArguments() {
+        return Stream.of(
+                Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+                Arguments.of(null, EMPTY, FULLY_APPLIED, 
FULLY_APPLIED_DETAILS),
+                Arguments.of(null, EMPTY, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+                Arguments.of(null, EMPTY, NO_OPERATION, UNEXPECTED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED, 
NOT_APPLIED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED, 
FULLY_APPLIED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION, 
UNEXPECTED_DETAILS)
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandlerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandlerTest.java
new file mode 100644
index 0000000000..348bc6c34e
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandlerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.stream.Stream;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static 
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.FULLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.NOT_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.UNEXPECTED_DETAILS;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
+import static org.apache.nifi.c2.protocol.api.OperationType.STOP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StopFlowOperationHandlerTest {
+
+    private static final String OPERATION_ID = "operation id";
+
+    @Mock
+    private FlowStateStrategy flowStateStrategy;
+
+    @InjectMocks
+    private StopFlowOperationHandler victim;
+
+    @Test
+    public void testOperationAndOperandTypes() {
+        assertEquals(STOP, victim.getOperationType());
+        assertEquals(FLOW, victim.getOperandType());
+    }
+
+    @ParameterizedTest(name = "operationId={0} ackOperationId={1} ackState={2} 
ackDetails={3}")
+    @MethodSource("testHandleArguments")
+    public void testHandle(String operationId, String ackOperationId, 
C2OperationState.OperationState ackState, String ackDetails) {
+        when(flowStateStrategy.stop()).thenReturn(ackState);
+
+        C2OperationAck result = victim.handle(anOperation(operationId));
+
+        assertEquals(ackOperationId, result.getOperationId());
+        assertEquals(ackState, result.getOperationState().getState());
+        assertEquals(ackDetails, result.getOperationState().getDetails());
+    }
+
+    private C2Operation anOperation(String identifier) {
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(identifier);
+
+        return operation;
+    }
+
+    private static Stream<Arguments> testHandleArguments() {
+        return Stream.of(
+                Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+                Arguments.of(null, EMPTY, FULLY_APPLIED, 
FULLY_APPLIED_DETAILS),
+                Arguments.of(null, EMPTY, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+                Arguments.of(null, EMPTY, NO_OPERATION, UNEXPECTED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED, 
NOT_APPLIED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED, 
FULLY_APPLIED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+                Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION, 
UNEXPECTED_DETAILS)
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
index 0060ce98d7..d10b877f5a 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
@@ -32,6 +32,7 @@ public class FlowInfo implements Serializable {
     private Map<String, FlowQueueStatus> queues;
     private List<ProcessorBulletin> processorBulletins;
     private List<ProcessorStatus> processorStatuses;
+    private RunStatus runStatus;
 
     @Schema(description = "A unique identifier of the flow currently deployed 
on the agent")
     public String getFlowId() {
@@ -86,4 +87,13 @@ public class FlowInfo implements Serializable {
     public void setProcessorStatuses(List<ProcessorStatus> processorStatuses) {
         this.processorStatuses = processorStatuses;
     }
+
+    @Schema(description = "Run status of the flow")
+    public RunStatus getRunStatus() {
+        return runStatus;
+    }
+
+    public void setRunStatus(RunStatus runStatus) {
+        this.runStatus = runStatus;
+    }
 }
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
index f2fcff3f27..09b0ea4e5e 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
@@ -29,7 +29,8 @@ public enum OperandType {
     REPOSITORY,
     PROPERTIES,
     ASSET,
-    RESOURCE;
+    RESOURCE,
+    FLOW;
 
     public static Optional<OperandType> fromString(String value) {
         return Arrays.stream(values())
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
index e948e31f64..80b0ec7145 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
@@ -21,6 +21,7 @@ import static 
org.apache.nifi.c2.protocol.api.OperandType.ASSET;
 import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
 import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION;
 import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
 import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
 import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
 import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE;
@@ -40,8 +41,8 @@ public enum OperationType {
     DESCRIBE(MANIFEST),
     UPDATE(CONFIGURATION, ASSET, PROPERTIES),
     RESTART,
-    START,
-    STOP,
+    START(FLOW),
+    STOP(FLOW),
     PAUSE,
     REPLICATE,
     SUBSCRIBE,
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/RunStatus.java
similarity index 62%
copy from 
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to 
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/RunStatus.java
index f2fcff3f27..49f78e2803 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/RunStatus.java
@@ -17,28 +17,7 @@
 
 package org.apache.nifi.c2.protocol.api;
 
-import java.util.Arrays;
-import java.util.Optional;
-
-public enum OperandType {
-
-    CONFIGURATION,
-    CONNECTION,
-    DEBUG,
-    MANIFEST,
-    REPOSITORY,
-    PROPERTIES,
-    ASSET,
-    RESOURCE;
-
-    public static Optional<OperandType> fromString(String value) {
-        return Arrays.stream(values())
-            .filter(operandType -> operandType.name().equalsIgnoreCase(value))
-            .findAny();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString().toLowerCase();
-    }
+public enum RunStatus {
+    RUNNING,
+    STOPPED,
 }
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 793af58951..4493f6194e 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -25,6 +25,8 @@ import static 
java.util.concurrent.Executors.newScheduledThreadPool;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
+import static org.apache.nifi.c2.protocol.api.RunStatus.STOPPED;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_CLASS;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENTIFIER;
@@ -82,8 +84,11 @@ import 
org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
 import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
 import 
org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
 import 
org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
+import org.apache.nifi.c2.client.service.operation.FlowStateStrategy;
 import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
 import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
+import org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler;
+import org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler;
 import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
 import 
org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler;
 import 
org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
@@ -97,15 +102,19 @@ import 
org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
 import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
 import org.apache.nifi.c2.protocol.api.ProcessorStatus;
+import org.apache.nifi.c2.protocol.api.RunStatus;
 import org.apache.nifi.c2.serializer.C2JacksonSerializer;
 import org.apache.nifi.c2.serializer.C2Serializer;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.Triggerable;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.encrypt.PropertyEncryptorBuilder;
-import org.apache.nifi.extension.manifest.parser.ExtensionManifestParser;
 import 
org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParser;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.manifest.RuntimeManifestService;
 import org.apache.nifi.manifest.StandardRuntimeManifestService;
+import org.apache.nifi.minifi.c2.command.DefaultFlowStateStrategy;
 import org.apache.nifi.minifi.c2.command.DefaultUpdateConfigurationStrategy;
 import org.apache.nifi.minifi.c2.command.PropertiesPersister;
 import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
@@ -138,7 +147,6 @@ public class C2NifiClientService {
     private final ExecutorService operationManagerExecutorService;
 
     private final FlowController flowController;
-    private final ExtensionManifestParser extensionManifestParser;
     private final RuntimeManifestService runtimeManifestService;
     private final SupportedOperationsProvider supportedOperationsProvider;
     private final C2HeartbeatManager c2HeartbeatManager;
@@ -150,13 +158,11 @@ public class C2NifiClientService {
         this.heartbeatManagerExecutorService = newScheduledThreadPool(1);
         this.operationManagerExecutorService = newSingleThreadExecutor();
 
-        this.extensionManifestParser = new JAXBExtensionManifestParser();
-
         C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
 
         this.runtimeManifestService = new StandardRuntimeManifestService(
             ExtensionManagerHolder.getExtensionManager(),
-            extensionManifestParser,
+            new JAXBExtensionManifestParser(),
             clientConfig.getRuntimeManifestIdentifier(),
             clientConfig.getRuntimeType()
         );
@@ -239,6 +245,7 @@ public class C2NifiClientService {
         updateAssetCommandHelper.createAssetDirectory();
         UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider = 
new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
         PropertiesPersister propertiesPersister = new 
PropertiesPersister(updatePropertiesPropertyProvider, 
bootstrapConfigFileLocation);
+        FlowStateStrategy defaultFlowStateStrategy = new 
DefaultFlowStateStrategy(flowController);
 
         FlowPropertyEncryptor flowPropertyEncryptor = new 
StandardFlowPropertyEncryptor(
             new 
PropertyEncryptorBuilder(niFiProperties.getProperty(SENSITIVE_PROPS_KEY))
@@ -259,7 +266,8 @@ public class C2NifiClientService {
             UpdateAssetOperationHandler.create(client, 
emptyOperandPropertiesProvider,
                 updateAssetCommandHelper::assetUpdatePrecondition, 
updateAssetCommandHelper::assetPersistFunction),
             new 
UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, 
propertiesPersister::persistProperties),
-            SyncResourceOperationHandler.create(client, 
emptyOperandPropertiesProvider, new 
DefaultSyncResourceStrategy(resourceRepository), c2Serializer)
+            SyncResourceOperationHandler.create(client, 
emptyOperandPropertiesProvider, new 
DefaultSyncResourceStrategy(resourceRepository), c2Serializer),
+            new StartFlowOperationHandler(defaultFlowStateStrategy), new 
StopFlowOperationHandler(defaultFlowStateStrategy)
         ));
     }
 
@@ -298,7 +306,9 @@ public class C2NifiClientService {
                 agentManifest,
                 getQueueStatus(),
                 getBulletins(processorBulletinLimit),
-                getProcessorStatus(processorStatusEnabled));
+                getProcessorStatus(processorStatusEnabled),
+                getRunStatus()
+        );
     }
 
     private AgentRepositories getAgentRepositories() {
@@ -408,4 +418,22 @@ public class C2NifiClientService {
         
result.setTerminatedThreadCount(processorStatus.getTerminatedThreadCount());
         return result;
     }
+
+    private RunStatus getRunStatus() {
+        ProcessGroup processGroup = 
flowController.getFlowManager().getRootGroup();
+        return isProcessGroupRunning(processGroup)
+                || 
processGroup.getProcessGroups().stream().anyMatch(this::isProcessGroupRunning) 
? RUNNING : STOPPED;
+    }
+
+    private boolean isProcessGroupRunning(ProcessGroup processGroup) {
+        return anyProcessorRunning(processGroup) || 
anyRemoteProcessGroupTransmitting(processGroup);
+    }
+
+    private boolean anyProcessorRunning(ProcessGroup processGroup) {
+        return 
processGroup.getProcessors().stream().anyMatch(Triggerable::isRunning);
+    }
+
+    private boolean anyRemoteProcessGroupTransmitting(ProcessGroup 
processGroup) {
+        return 
processGroup.getRemoteProcessGroups().stream().anyMatch(RemoteProcessGroup::isTransmitting);
+    }
 }
\ No newline at end of file
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategy.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategy.java
new file mode 100644
index 0000000000..48f0faf391
--- /dev/null
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategy.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.client.service.operation.FlowStateStrategy;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+
+public class DefaultFlowStateStrategy implements FlowStateStrategy {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultFlowStateStrategy.class);
+
+    private final FlowController flowController;
+
+    public DefaultFlowStateStrategy(FlowController flowController) {
+        this.flowController = flowController;
+    }
+
+    @Override
+    public OperationState start() {
+        try {
+            ProcessGroup rootProcessGroup = 
flowController.getFlowManager().getRootGroup();
+            if (rootProcessGroup != null) {
+                startProcessGroup(rootProcessGroup);
+                LOGGER.info("Flow started");
+                return FULLY_APPLIED;
+            } else {
+                LOGGER.error("Failed to start flow as the root process group 
is null.");
+                return NOT_APPLIED;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to start flow as one or more components 
failed to stop.", e);
+            return PARTIALLY_APPLIED;
+        }
+    }
+
+    @Override
+    public OperationState stop() {
+        try {
+            ProcessGroup rootProcessGroup = 
flowController.getFlowManager().getRootGroup();
+            if (rootProcessGroup != null) {
+                stopProcessGroup(rootProcessGroup);
+                LOGGER.info("Flow stopped");
+                return FULLY_APPLIED;
+            } else {
+                LOGGER.error("Failed to stop flow as the root process group is 
null.");
+                return NOT_APPLIED;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to stop flow as one or more components failed 
to stop.", e);
+            return PARTIALLY_APPLIED;
+        }
+    }
+
+    private void startProcessGroup(ProcessGroup processGroup) {
+        processGroup.startProcessing();
+        
processGroup.getRemoteProcessGroups().forEach(RemoteProcessGroup::startTransmitting);
+
+        processGroup.getProcessGroups().forEach(this::startProcessGroup);
+    }
+
+    private void stopProcessGroup(ProcessGroup processGroup) {
+        waitForStopOrLogTimeOut(processGroup.stopProcessing());
+        processGroup.getRemoteProcessGroups().stream()
+                .map(RemoteProcessGroup::stopTransmitting)
+                .forEach(this::waitForStopOrLogTimeOut);
+
+        processGroup.getProcessGroups().forEach(this::stopProcessGroup);
+    }
+
+    private void waitForStopOrLogTimeOut(Future<?> future) {
+        try {
+            future.get(10, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            LOGGER.warn("Unable to stop component within defined interval", e);
+        }
+    }
+}
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategyTest.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategyTest.java
new file mode 100644
index 0000000000..b8dc847395
--- /dev/null
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategyTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultFlowStateStrategyTest {
+
+    @Mock
+    private FlowController flowController;
+    @Mock
+    private FlowManager flowManager;
+
+    @InjectMocks
+    private DefaultFlowStateStrategy victim;
+
+    @BeforeEach
+    public void initTests() {
+        when(flowController.getFlowManager()).thenReturn(flowManager);
+    }
+
+    @Test
+    public void testStartFullyApplied() {
+        ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+        ProcessGroup nestedProcessGroup = mock(ProcessGroup.class);
+        RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+        Set<RemoteProcessGroup> remoteProcessGroups = 
Set.of(remoteProcessGroup);
+        RemoteProcessGroup nestedRemoteProcessGroup = 
mock(RemoteProcessGroup.class);
+        Set<RemoteProcessGroup> nestedRemoteProcessGroups = 
Set.of(nestedRemoteProcessGroup);
+
+        when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+        
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+        
when(rootProcessGroup.getProcessGroups()).thenReturn(Set.of(nestedProcessGroup));
+        
when(nestedProcessGroup.getRemoteProcessGroups()).thenReturn(nestedRemoteProcessGroups);
+
+        OperationState result = victim.start();
+
+        assertEquals(FULLY_APPLIED, result);
+        verify(rootProcessGroup).startProcessing();
+        verify(remoteProcessGroup).startTransmitting();
+        verify(nestedProcessGroup).startProcessing();
+        verify(nestedRemoteProcessGroup).startTransmitting();
+    }
+
+    @Test
+    public void testStartPartiallyApplied() {
+        ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+        RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+        Set<RemoteProcessGroup> remoteProcessGroups = 
Set.of(remoteProcessGroup);
+
+        when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+        
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+        doThrow(new 
RuntimeException()).when(remoteProcessGroup).startTransmitting();
+
+        OperationState result = victim.start();
+
+        assertEquals(PARTIALLY_APPLIED, result);
+    }
+
+    @Test
+    public void testStartNotApplied() {
+        when(flowManager.getRootGroup()).thenReturn(null);
+
+        OperationState result = victim.start();
+
+        assertEquals(NOT_APPLIED, result);
+    }
+
+    @Test
+    public void testStopFullyApplied() {
+        ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+        RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+        Set<RemoteProcessGroup> remoteProcessGroups = 
Set.of(remoteProcessGroup);
+        CompletableFuture<Void> rootProcessGroupStopFuture = 
mock(CompletableFuture.class);
+        Future remoteProcessGroupStopFuture = mock(Future.class);
+        ProcessGroup nestedProcessGroup = mock(ProcessGroup.class);
+        RemoteProcessGroup nestedRemoteProcessGroup = 
mock(RemoteProcessGroup.class);
+        Set<RemoteProcessGroup> nestedRemoteProcessGroups = 
Set.of(nestedRemoteProcessGroup);
+        CompletableFuture<Void> nestedProcessGroupStopFuture = 
mock(CompletableFuture.class);
+        Future nestedRemoteProcessGroupStopFuture = mock(Future.class);
+
+        when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+        
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+        
when(rootProcessGroup.stopProcessing()).thenReturn(rootProcessGroupStopFuture);
+        
when(remoteProcessGroup.stopTransmitting()).thenReturn(remoteProcessGroupStopFuture);
+        
when(rootProcessGroup.getProcessGroups()).thenReturn(Set.of(nestedProcessGroup));
+        
when(nestedProcessGroup.getRemoteProcessGroups()).thenReturn(nestedRemoteProcessGroups);
+        
when(nestedProcessGroup.stopProcessing()).thenReturn(nestedProcessGroupStopFuture);
+        
when(nestedRemoteProcessGroup.stopTransmitting()).thenReturn(nestedRemoteProcessGroupStopFuture);
+
+        OperationState result = victim.stop();
+
+        assertEquals(FULLY_APPLIED, result);
+        assertDoesNotThrow(() -> rootProcessGroupStopFuture.get());
+        assertDoesNotThrow(() -> remoteProcessGroupStopFuture.get());
+        assertDoesNotThrow(() -> nestedProcessGroupStopFuture.get());
+        assertDoesNotThrow(() -> nestedRemoteProcessGroupStopFuture.get());
+    }
+
+    @Test
+    public void testStopPartiallyApplied() {
+        ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+        RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+        Set<RemoteProcessGroup> remoteProcessGroups = 
Set.of(remoteProcessGroup);
+        CompletableFuture<Void> rootProcessGroupStopFuture = 
mock(CompletableFuture.class);
+
+        when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+        
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+        
when(rootProcessGroup.stopProcessing()).thenReturn(rootProcessGroupStopFuture);
+        when(remoteProcessGroup.stopTransmitting()).thenThrow(new 
RuntimeException());
+
+        OperationState result = victim.stop();
+
+        assertEquals(PARTIALLY_APPLIED, result);
+    }
+
+    @Test
+    public void testStopNotApplied() {
+        when(flowManager.getRootGroup()).thenReturn(null);
+
+        OperationState result = victim.stop();
+
+        assertEquals(NOT_APPLIED, result);
+    }
+}
\ No newline at end of file

Reply via email to