This is an automated email from the ASF dual-hosted git repository.
briansolo1985 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6f958fe068 NIFI-14230 Implement start/stop flow C2 operations in MiNiFi
6f958fe068 is described below
commit 6f958fe068d67e96f93830d3244c57dda753f9d2
Author: Peter Kedvessy <[email protected]>
AuthorDate: Tue Feb 4 09:01:26 2025 +0100
NIFI-14230 Implement start/stop flow C2 operations in MiNiFi
This closes #9694.
Signed-off-by: Ferenc Kis <[email protected]>
---
.../nifi/c2/client/service/C2HeartbeatFactory.java | 15 +-
.../client/service/model/RuntimeInfoWrapper.java | 9 +-
.../service/operation/FlowStateStrategy.java} | 27 +---
.../operation/StartFlowOperationHandler.java | 77 ++++++++++
.../operation/StopFlowOperationHandler.java | 77 ++++++++++
.../c2/client/service/C2HeartbeatFactoryTest.java | 13 +-
.../DescribeManifestOperationHandlerTest.java | 3 +-
.../operation/StartFlowOperationHandlerTest.java | 95 ++++++++++++
.../operation/StopFlowOperationHandlerTest.java | 95 ++++++++++++
.../org/apache/nifi/c2/protocol/api/FlowInfo.java | 10 ++
.../apache/nifi/c2/protocol/api/OperandType.java | 3 +-
.../apache/nifi/c2/protocol/api/OperationType.java | 5 +-
.../api/{OperandType.java => RunStatus.java} | 27 +---
.../apache/nifi/minifi/c2/C2NifiClientService.java | 42 +++++-
.../c2/command/DefaultFlowStateStrategy.java | 103 +++++++++++++
.../c2/command/DefaultFlowStateStrategyTest.java | 165 +++++++++++++++++++++
16 files changed, 695 insertions(+), 71 deletions(-)
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
index 240a3b84d7..1689cbf4a8 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
@@ -35,7 +35,6 @@ import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collections;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -48,13 +47,10 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
import org.apache.nifi.c2.protocol.api.AgentStatus;
-import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
-import org.apache.nifi.c2.protocol.api.ProcessorStatus;
import org.apache.nifi.c2.protocol.api.ResourceInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.DeviceInfo;
import org.apache.nifi.c2.protocol.api.FlowInfo;
-import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.NetworkInfo;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
@@ -92,7 +88,7 @@ public class C2HeartbeatFactory {
heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(),
runtimeInfoWrapper.getManifest()));
heartbeat.setDeviceInfo(generateDeviceInfo());
- heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(),
runtimeInfoWrapper.getProcessorBulletins(),
runtimeInfoWrapper.getProcessorStatus()));
+ heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper));
heartbeat.setCreated(System.currentTimeMillis());
ResourceInfo resourceInfo = new ResourceInfo();
@@ -102,11 +98,12 @@ public class C2HeartbeatFactory {
return heartbeat;
}
- private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus,
List<ProcessorBulletin> processorBulletins, List<ProcessorStatus>
processorStatus) {
+ private FlowInfo getFlowInfo(RuntimeInfoWrapper runtimeInfoWrapper) {
FlowInfo flowInfo = new FlowInfo();
- flowInfo.setQueues(queueStatus);
- flowInfo.setProcessorBulletins(processorBulletins);
- flowInfo.setProcessorStatuses(processorStatus);
+ flowInfo.setQueues(runtimeInfoWrapper.getQueueStatus());
+
flowInfo.setProcessorBulletins(runtimeInfoWrapper.getProcessorBulletins());
+ flowInfo.setProcessorStatuses(runtimeInfoWrapper.getProcessorStatus());
+ flowInfo.setRunStatus(runtimeInfoWrapper.getRunStatus());
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
return flowInfo;
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
index 7cf064408c..3237fa8e67 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
@@ -22,6 +22,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
+import org.apache.nifi.c2.protocol.api.RunStatus;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
public class RuntimeInfoWrapper {
@@ -30,14 +31,16 @@ public class RuntimeInfoWrapper {
final Map<String, FlowQueueStatus> queueStatus;
final List<ProcessorBulletin> processorBulletins;
final List<ProcessorStatus> processorStatus;
+ final RunStatus runStatus;
public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest
manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin>
processorBulletins,
- List<ProcessorStatus> processorStatus) {
+ List<ProcessorStatus> processorStatus, RunStatus
runStatus) {
this.repos = repos;
this.manifest = manifest;
this.queueStatus = queueStatus;
this.processorBulletins = processorBulletins;
this.processorStatus = processorStatus;
+ this.runStatus = runStatus;
}
public AgentRepositories getAgentRepositories() {
@@ -59,4 +62,8 @@ public class RuntimeInfoWrapper {
public List<ProcessorStatus> getProcessorStatus() {
return processorStatus;
}
+
+ public RunStatus getRunStatus() {
+ return runStatus;
+ }
}
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/FlowStateStrategy.java
similarity index 59%
copy from
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/FlowStateStrategy.java
index f2fcff3f27..074365ad56 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/FlowStateStrategy.java
@@ -15,30 +15,13 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.c2.client.service.operation;
-import java.util.Arrays;
-import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
-public enum OperandType {
+public interface FlowStateStrategy {
- CONFIGURATION,
- CONNECTION,
- DEBUG,
- MANIFEST,
- REPOSITORY,
- PROPERTIES,
- ASSET,
- RESOURCE;
+ OperationState start();
- public static Optional<OperandType> fromString(String value) {
- return Arrays.stream(values())
- .filter(operandType -> operandType.name().equalsIgnoreCase(value))
- .findAny();
- }
-
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
+ OperationState stop();
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandler.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandler.java
new file mode 100644
index 0000000000..478e6b1b7d
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
+public class StartFlowOperationHandler implements C2OperationHandler {
+
+ public static final String NOT_APPLIED_DETAILS = "Failed to start flow,
please check the log for errors";
+ public static final String PARTIALLY_APPLIED_DETAILS = "Some components
failed to start, please check the log for errors";
+ public static final String FULLY_APPLIED_DETAILS = "Flow started";
+ public static final String UNEXPECTED_DETAILS = "Unexpected status, please
check the log for errors";
+
+ private final FlowStateStrategy flowStateStrategy;
+
+ public StartFlowOperationHandler(FlowStateStrategy flowStateStrategy) {
+ this.flowStateStrategy = flowStateStrategy;
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return OperationType.START;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return OperandType.FLOW;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String operationId =
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+ C2OperationState.OperationState operationState =
flowStateStrategy.start();
+
+ C2OperationState resultState = operationState(
+ operationState,
+ switch (operationState) {
+ case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+ case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+ case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+ default -> UNEXPECTED_DETAILS;
+ }
+ );
+
+ return operationAck(operationId, resultState);
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandler.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandler.java
new file mode 100644
index 0000000000..29afb6e0ab
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
+public class StopFlowOperationHandler implements C2OperationHandler {
+
+ public static final String NOT_APPLIED_DETAILS = "Failed to stop flow,
please check the log for errors";
+ public static final String FULLY_APPLIED_DETAILS = "Flow stopped";
+ public static final String PARTIALLY_APPLIED_DETAILS = "Some components
failed to stop, please check the log for errors";
+ public static final String UNEXPECTED_DETAILS = "Unexpected status, please
check the log for errors";
+
+ private final FlowStateStrategy flowStateStrategy;
+
+ public StopFlowOperationHandler(FlowStateStrategy flowStateStrategy) {
+ this.flowStateStrategy = flowStateStrategy;
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return OperationType.STOP;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return OperandType.FLOW;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String operationId =
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+ C2OperationState.OperationState operationState =
flowStateStrategy.stop();
+
+ C2OperationState resultState = operationState(
+ operationState,
+ switch (operationState) {
+ case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+ case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+ case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+ default -> UNEXPECTED_DETAILS;
+ }
+ );
+
+ return operationAck(operationId, resultState);
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
index fdc4056929..bbbdd54c7a 100644
---
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.c2.client.service;
+import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -124,13 +125,14 @@ public class C2HeartbeatFactoryTest {
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
List<ProcessorStatus> processorStatus = new ArrayList<>();
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins,
processorStatus));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins,
processorStatus, RUNNING));
assertEquals(repos,
heartbeat.getAgentInfo().getStatus().getRepositories());
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins,
heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(processorStatus,
heartbeat.getFlowInfo().getProcessorStatuses());
+ assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@@ -145,13 +147,14 @@ public class C2HeartbeatFactoryTest {
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
List<ProcessorStatus> processorStatus = new ArrayList<>();
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins,
processorStatus));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins,
processorStatus, RUNNING));
assertEquals(repos,
heartbeat.getAgentInfo().getStatus().getRepositories());
assertNull(heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins,
heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(processorStatus,
heartbeat.getFlowInfo().getProcessorStatuses());
+ assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@@ -168,10 +171,11 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(),
Collections.emptySet())).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new
ArrayList<>(), new ArrayList<>()));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new
ArrayList<>(), new ArrayList<>(), RUNNING));
assertEquals(MANIFEST_HASH,
heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
+ assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
}
@Test
@@ -184,10 +188,11 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(),
supportedOperations)).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new
ArrayList<>(), new ArrayList<>()));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new
ArrayList<>(), new ArrayList<>(), RUNNING));
assertEquals(MANIFEST_HASH,
heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
+ assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
}
private RuntimeManifest createManifest() {
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
index aeb7d55917..0de0a94142 100644
---
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.c2.client.service.operation;
+import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@@ -58,7 +59,7 @@ public class DescribeManifestOperationHandlerTest {
void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
RuntimeManifest manifest = new RuntimeManifest();
manifest.setIdentifier("manifestId");
- RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null,
manifest, null, null, null);
+ RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null,
manifest, null, null, null, RUNNING);
C2Heartbeat heartbeat = new C2Heartbeat();
AgentInfo agentInfo = new AgentInfo();
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandlerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandlerTest.java
new file mode 100644
index 0000000000..be07253f76
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartFlowOperationHandlerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.stream.Stream;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.FULLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.NOT_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler.UNEXPECTED_DETAILS;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
+import static org.apache.nifi.c2.protocol.api.OperationType.START;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StartFlowOperationHandlerTest {
+
+ private static final String OPERATION_ID = "operation id";
+
+ @Mock
+ private FlowStateStrategy flowStateStrategy;
+
+ @InjectMocks
+ private StartFlowOperationHandler victim;
+
+ @Test
+ public void testOperationAndOperandTypes() {
+ assertEquals(START, victim.getOperationType());
+ assertEquals(FLOW, victim.getOperandType());
+ }
+
+ @ParameterizedTest(name = "operationId={0} ackOperationId={1} ackState={2}
ackDetails={3}")
+ @MethodSource("testHandleArguments")
+ public void testHandle(String operationId, String ackOperationId,
OperationState ackState, String ackDetails) {
+ when(flowStateStrategy.start()).thenReturn(ackState);
+
+ C2OperationAck result = victim.handle(anOperation(operationId));
+
+ assertEquals(ackOperationId, result.getOperationId());
+ assertEquals(ackState, result.getOperationState().getState());
+ assertEquals(ackDetails, result.getOperationState().getDetails());
+ }
+
+ private C2Operation anOperation(String identifier) {
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(identifier);
+
+ return operation;
+ }
+
+ private static Stream<Arguments> testHandleArguments() {
+ return Stream.of(
+ Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, FULLY_APPLIED,
FULLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, NO_OPERATION, UNEXPECTED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED,
NOT_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED,
FULLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION,
UNEXPECTED_DETAILS)
+ );
+ }
+}
\ No newline at end of file
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandlerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandlerTest.java
new file mode 100644
index 0000000000..348bc6c34e
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopFlowOperationHandlerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.stream.Stream;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.FULLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.NOT_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler.UNEXPECTED_DETAILS;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
+import static org.apache.nifi.c2.protocol.api.OperationType.STOP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StopFlowOperationHandlerTest {
+
+ private static final String OPERATION_ID = "operation id";
+
+ @Mock
+ private FlowStateStrategy flowStateStrategy;
+
+ @InjectMocks
+ private StopFlowOperationHandler victim;
+
+ @Test
+ public void testOperationAndOperandTypes() {
+ assertEquals(STOP, victim.getOperationType());
+ assertEquals(FLOW, victim.getOperandType());
+ }
+
+ @ParameterizedTest(name = "operationId={0} ackOperationId={1} ackState={2}
ackDetails={3}")
+ @MethodSource("testHandleArguments")
+ public void testHandle(String operationId, String ackOperationId,
C2OperationState.OperationState ackState, String ackDetails) {
+ when(flowStateStrategy.stop()).thenReturn(ackState);
+
+ C2OperationAck result = victim.handle(anOperation(operationId));
+
+ assertEquals(ackOperationId, result.getOperationId());
+ assertEquals(ackState, result.getOperationState().getState());
+ assertEquals(ackDetails, result.getOperationState().getDetails());
+ }
+
+ private C2Operation anOperation(String identifier) {
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(identifier);
+
+ return operation;
+ }
+
+ private static Stream<Arguments> testHandleArguments() {
+ return Stream.of(
+ Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, FULLY_APPLIED,
FULLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, NO_OPERATION, UNEXPECTED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED,
NOT_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED,
FULLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION,
UNEXPECTED_DETAILS)
+ );
+ }
+}
\ No newline at end of file
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
index 0060ce98d7..d10b877f5a 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
@@ -32,6 +32,7 @@ public class FlowInfo implements Serializable {
private Map<String, FlowQueueStatus> queues;
private List<ProcessorBulletin> processorBulletins;
private List<ProcessorStatus> processorStatuses;
+ private RunStatus runStatus;
@Schema(description = "A unique identifier of the flow currently deployed
on the agent")
public String getFlowId() {
@@ -86,4 +87,13 @@ public class FlowInfo implements Serializable {
public void setProcessorStatuses(List<ProcessorStatus> processorStatuses) {
this.processorStatuses = processorStatuses;
}
+
+ @Schema(description = "Run status of the flow")
+ public RunStatus getRunStatus() {
+ return runStatus;
+ }
+
+ public void setRunStatus(RunStatus runStatus) {
+ this.runStatus = runStatus;
+ }
}
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
index f2fcff3f27..09b0ea4e5e 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
@@ -29,7 +29,8 @@ public enum OperandType {
REPOSITORY,
PROPERTIES,
ASSET,
- RESOURCE;
+ RESOURCE,
+ FLOW;
public static Optional<OperandType> fromString(String value) {
return Arrays.stream(values())
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
index e948e31f64..80b0ec7145 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
@@ -21,6 +21,7 @@ import static
org.apache.nifi.c2.protocol.api.OperandType.ASSET;
import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION;
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
+import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE;
@@ -40,8 +41,8 @@ public enum OperationType {
DESCRIBE(MANIFEST),
UPDATE(CONFIGURATION, ASSET, PROPERTIES),
RESTART,
- START,
- STOP,
+ START(FLOW),
+ STOP(FLOW),
PAUSE,
REPLICATE,
SUBSCRIBE,
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/RunStatus.java
similarity index 62%
copy from
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/RunStatus.java
index f2fcff3f27..49f78e2803 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/RunStatus.java
@@ -17,28 +17,7 @@
package org.apache.nifi.c2.protocol.api;
-import java.util.Arrays;
-import java.util.Optional;
-
-public enum OperandType {
-
- CONFIGURATION,
- CONNECTION,
- DEBUG,
- MANIFEST,
- REPOSITORY,
- PROPERTIES,
- ASSET,
- RESOURCE;
-
- public static Optional<OperandType> fromString(String value) {
- return Arrays.stream(values())
- .filter(operandType -> operandType.name().equalsIgnoreCase(value))
- .findAny();
- }
-
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
+public enum RunStatus {
+ RUNNING,
+ STOPPED,
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 793af58951..4493f6194e 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -25,6 +25,8 @@ import static
java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;
+import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
+import static org.apache.nifi.c2.protocol.api.RunStatus.STOPPED;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_CLASS;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENTIFIER;
@@ -82,8 +84,11 @@ import
org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import
org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
import
org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
+import org.apache.nifi.c2.client.service.operation.FlowStateStrategy;
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
+import org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler;
+import org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler;
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
import
org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler;
import
org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
@@ -97,15 +102,19 @@ import
org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
+import org.apache.nifi.c2.protocol.api.RunStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.encrypt.PropertyEncryptorBuilder;
-import org.apache.nifi.extension.manifest.parser.ExtensionManifestParser;
import
org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParser;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.manifest.RuntimeManifestService;
import org.apache.nifi.manifest.StandardRuntimeManifestService;
+import org.apache.nifi.minifi.c2.command.DefaultFlowStateStrategy;
import org.apache.nifi.minifi.c2.command.DefaultUpdateConfigurationStrategy;
import org.apache.nifi.minifi.c2.command.PropertiesPersister;
import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
@@ -138,7 +147,6 @@ public class C2NifiClientService {
private final ExecutorService operationManagerExecutorService;
private final FlowController flowController;
- private final ExtensionManifestParser extensionManifestParser;
private final RuntimeManifestService runtimeManifestService;
private final SupportedOperationsProvider supportedOperationsProvider;
private final C2HeartbeatManager c2HeartbeatManager;
@@ -150,13 +158,11 @@ public class C2NifiClientService {
this.heartbeatManagerExecutorService = newScheduledThreadPool(1);
this.operationManagerExecutorService = newSingleThreadExecutor();
- this.extensionManifestParser = new JAXBExtensionManifestParser();
-
C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
this.runtimeManifestService = new StandardRuntimeManifestService(
ExtensionManagerHolder.getExtensionManager(),
- extensionManifestParser,
+ new JAXBExtensionManifestParser(),
clientConfig.getRuntimeManifestIdentifier(),
clientConfig.getRuntimeType()
);
@@ -239,6 +245,7 @@ public class C2NifiClientService {
updateAssetCommandHelper.createAssetDirectory();
UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider =
new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
PropertiesPersister propertiesPersister = new
PropertiesPersister(updatePropertiesPropertyProvider,
bootstrapConfigFileLocation);
+ FlowStateStrategy defaultFlowStateStrategy = new
DefaultFlowStateStrategy(flowController);
FlowPropertyEncryptor flowPropertyEncryptor = new
StandardFlowPropertyEncryptor(
new
PropertyEncryptorBuilder(niFiProperties.getProperty(SENSITIVE_PROPS_KEY))
@@ -259,7 +266,8 @@ public class C2NifiClientService {
UpdateAssetOperationHandler.create(client,
emptyOperandPropertiesProvider,
updateAssetCommandHelper::assetUpdatePrecondition,
updateAssetCommandHelper::assetPersistFunction),
new
UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider,
propertiesPersister::persistProperties),
- SyncResourceOperationHandler.create(client,
emptyOperandPropertiesProvider, new
DefaultSyncResourceStrategy(resourceRepository), c2Serializer)
+ SyncResourceOperationHandler.create(client,
emptyOperandPropertiesProvider, new
DefaultSyncResourceStrategy(resourceRepository), c2Serializer),
+ new StartFlowOperationHandler(defaultFlowStateStrategy), new
StopFlowOperationHandler(defaultFlowStateStrategy)
));
}
@@ -298,7 +306,9 @@ public class C2NifiClientService {
agentManifest,
getQueueStatus(),
getBulletins(processorBulletinLimit),
- getProcessorStatus(processorStatusEnabled));
+ getProcessorStatus(processorStatusEnabled),
+ getRunStatus()
+ );
}
private AgentRepositories getAgentRepositories() {
@@ -408,4 +418,22 @@ public class C2NifiClientService {
result.setTerminatedThreadCount(processorStatus.getTerminatedThreadCount());
return result;
}
+
+ private RunStatus getRunStatus() {
+ ProcessGroup processGroup =
flowController.getFlowManager().getRootGroup();
+ return isProcessGroupRunning(processGroup)
+ ||
processGroup.getProcessGroups().stream().anyMatch(this::isProcessGroupRunning)
? RUNNING : STOPPED;
+ }
+
+ private boolean isProcessGroupRunning(ProcessGroup processGroup) {
+ return anyProcessorRunning(processGroup) ||
anyRemoteProcessGroupTransmitting(processGroup);
+ }
+
+ private boolean anyProcessorRunning(ProcessGroup processGroup) {
+ return
processGroup.getProcessors().stream().anyMatch(Triggerable::isRunning);
+ }
+
+ private boolean anyRemoteProcessGroupTransmitting(ProcessGroup
processGroup) {
+ return
processGroup.getRemoteProcessGroups().stream().anyMatch(RemoteProcessGroup::isTransmitting);
+ }
}
\ No newline at end of file
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategy.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategy.java
new file mode 100644
index 0000000000..48f0faf391
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategy.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.client.service.operation.FlowStateStrategy;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+
+public class DefaultFlowStateStrategy implements FlowStateStrategy {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultFlowStateStrategy.class);
+
+ private final FlowController flowController;
+
+ public DefaultFlowStateStrategy(FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public OperationState start() {
+ try {
+ ProcessGroup rootProcessGroup =
flowController.getFlowManager().getRootGroup();
+ if (rootProcessGroup != null) {
+ startProcessGroup(rootProcessGroup);
+ LOGGER.info("Flow started");
+ return FULLY_APPLIED;
+ } else {
+ LOGGER.error("Failed to start flow as the root process group
is null.");
+ return NOT_APPLIED;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to start flow as one or more components
failed to stop.", e);
+ return PARTIALLY_APPLIED;
+ }
+ }
+
+ @Override
+ public OperationState stop() {
+ try {
+ ProcessGroup rootProcessGroup =
flowController.getFlowManager().getRootGroup();
+ if (rootProcessGroup != null) {
+ stopProcessGroup(rootProcessGroup);
+ LOGGER.info("Flow stopped");
+ return FULLY_APPLIED;
+ } else {
+ LOGGER.error("Failed to stop flow as the root process group is
null.");
+ return NOT_APPLIED;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to stop flow as one or more components failed
to stop.", e);
+ return PARTIALLY_APPLIED;
+ }
+ }
+
+ private void startProcessGroup(ProcessGroup processGroup) {
+ processGroup.startProcessing();
+
processGroup.getRemoteProcessGroups().forEach(RemoteProcessGroup::startTransmitting);
+
+ processGroup.getProcessGroups().forEach(this::startProcessGroup);
+ }
+
+ private void stopProcessGroup(ProcessGroup processGroup) {
+ waitForStopOrLogTimeOut(processGroup.stopProcessing());
+ processGroup.getRemoteProcessGroups().stream()
+ .map(RemoteProcessGroup::stopTransmitting)
+ .forEach(this::waitForStopOrLogTimeOut);
+
+ processGroup.getProcessGroups().forEach(this::stopProcessGroup);
+ }
+
+ private void waitForStopOrLogTimeOut(Future<?> future) {
+ try {
+ future.get(10, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOGGER.warn("Unable to stop component within defined interval", e);
+ }
+ }
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategyTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategyTest.java
new file mode 100644
index 0000000000..b8dc847395
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultFlowStateStrategyTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultFlowStateStrategyTest {
+
+ @Mock
+ private FlowController flowController;
+ @Mock
+ private FlowManager flowManager;
+
+ @InjectMocks
+ private DefaultFlowStateStrategy victim;
+
+ @BeforeEach
+ public void initTests() {
+ when(flowController.getFlowManager()).thenReturn(flowManager);
+ }
+
+ @Test
+ public void testStartFullyApplied() {
+ ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+ ProcessGroup nestedProcessGroup = mock(ProcessGroup.class);
+ RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+ Set<RemoteProcessGroup> remoteProcessGroups =
Set.of(remoteProcessGroup);
+ RemoteProcessGroup nestedRemoteProcessGroup =
mock(RemoteProcessGroup.class);
+ Set<RemoteProcessGroup> nestedRemoteProcessGroups =
Set.of(nestedRemoteProcessGroup);
+
+ when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+
when(rootProcessGroup.getProcessGroups()).thenReturn(Set.of(nestedProcessGroup));
+
when(nestedProcessGroup.getRemoteProcessGroups()).thenReturn(nestedRemoteProcessGroups);
+
+ OperationState result = victim.start();
+
+ assertEquals(FULLY_APPLIED, result);
+ verify(rootProcessGroup).startProcessing();
+ verify(remoteProcessGroup).startTransmitting();
+ verify(nestedProcessGroup).startProcessing();
+ verify(nestedRemoteProcessGroup).startTransmitting();
+ }
+
+ @Test
+ public void testStartPartiallyApplied() {
+ ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+ RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+ Set<RemoteProcessGroup> remoteProcessGroups =
Set.of(remoteProcessGroup);
+
+ when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+ doThrow(new
RuntimeException()).when(remoteProcessGroup).startTransmitting();
+
+ OperationState result = victim.start();
+
+ assertEquals(PARTIALLY_APPLIED, result);
+ }
+
+ @Test
+ public void testStartNotApplied() {
+ when(flowManager.getRootGroup()).thenReturn(null);
+
+ OperationState result = victim.start();
+
+ assertEquals(NOT_APPLIED, result);
+ }
+
+ @Test
+ public void testStopFullyApplied() {
+ ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+ RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+ Set<RemoteProcessGroup> remoteProcessGroups =
Set.of(remoteProcessGroup);
+ CompletableFuture<Void> rootProcessGroupStopFuture =
mock(CompletableFuture.class);
+ Future remoteProcessGroupStopFuture = mock(Future.class);
+ ProcessGroup nestedProcessGroup = mock(ProcessGroup.class);
+ RemoteProcessGroup nestedRemoteProcessGroup =
mock(RemoteProcessGroup.class);
+ Set<RemoteProcessGroup> nestedRemoteProcessGroups =
Set.of(nestedRemoteProcessGroup);
+ CompletableFuture<Void> nestedProcessGroupStopFuture =
mock(CompletableFuture.class);
+ Future nestedRemoteProcessGroupStopFuture = mock(Future.class);
+
+ when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+
when(rootProcessGroup.stopProcessing()).thenReturn(rootProcessGroupStopFuture);
+
when(remoteProcessGroup.stopTransmitting()).thenReturn(remoteProcessGroupStopFuture);
+
when(rootProcessGroup.getProcessGroups()).thenReturn(Set.of(nestedProcessGroup));
+
when(nestedProcessGroup.getRemoteProcessGroups()).thenReturn(nestedRemoteProcessGroups);
+
when(nestedProcessGroup.stopProcessing()).thenReturn(nestedProcessGroupStopFuture);
+
when(nestedRemoteProcessGroup.stopTransmitting()).thenReturn(nestedRemoteProcessGroupStopFuture);
+
+ OperationState result = victim.stop();
+
+ assertEquals(FULLY_APPLIED, result);
+ assertDoesNotThrow(() -> rootProcessGroupStopFuture.get());
+ assertDoesNotThrow(() -> remoteProcessGroupStopFuture.get());
+ assertDoesNotThrow(() -> nestedProcessGroupStopFuture.get());
+ assertDoesNotThrow(() -> nestedRemoteProcessGroupStopFuture.get());
+ }
+
+ @Test
+ public void testStopPartiallyApplied() {
+ ProcessGroup rootProcessGroup = mock(ProcessGroup.class);
+ RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+ Set<RemoteProcessGroup> remoteProcessGroups =
Set.of(remoteProcessGroup);
+ CompletableFuture<Void> rootProcessGroupStopFuture =
mock(CompletableFuture.class);
+
+ when(flowManager.getRootGroup()).thenReturn(rootProcessGroup);
+
when(rootProcessGroup.getRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+
when(rootProcessGroup.stopProcessing()).thenReturn(rootProcessGroupStopFuture);
+ when(remoteProcessGroup.stopTransmitting()).thenThrow(new
RuntimeException());
+
+ OperationState result = victim.stop();
+
+ assertEquals(PARTIALLY_APPLIED, result);
+ }
+
+ @Test
+ public void testStopNotApplied() {
+ when(flowManager.getRootGroup()).thenReturn(null);
+
+ OperationState result = victim.stop();
+
+ assertEquals(NOT_APPLIED, result);
+ }
+}
\ No newline at end of file