This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new df97c696a6 NIFI-15017 Add start/stop C2 command for processors in 
MiNiFi
df97c696a6 is described below

commit df97c696a692ff3c5f61b733f81a70f41311430e
Author: Mihai <[email protected]>
AuthorDate: Mon Sep 22 18:25:24 2025 +0300

    NIFI-15017 Add start/stop C2 command for processors in MiNiFi
    
    Closes #10353
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../service/operation/ProcessorStateStrategy.java} |  30 +----
 .../operation/StartProcessorOperationHandler.java  |  81 ++++++++++++++
 .../operation/StopProcessorOperationHandler.java   |  81 ++++++++++++++
 .../StartProcessorOperationHandlerTest.java        | 111 ++++++++++++++++++
 .../StopProcessorOperationHandlerTest.java         | 111 ++++++++++++++++++
 .../apache/nifi/c2/protocol/api/OperandType.java   |   1 +
 .../apache/nifi/c2/protocol/api/OperationType.java |   5 +-
 .../nifi/c2/protocol/api/ProcessorStatus.java      |  10 ++
 .../apache/nifi/minifi/c2/C2NifiClientService.java |  15 ++-
 .../c2/command/DefaultProcessorStateStrategy.java  |  77 +++++++++++++
 .../command/DefaultProcessorStateStrategyTest.java | 124 +++++++++++++++++++++
 11 files changed, 618 insertions(+), 28 deletions(-)

diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/ProcessorStateStrategy.java
similarity index 58%
copy from 
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to 
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/ProcessorStateStrategy.java
index 09b0ea4e5e..ea624da53c 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/ProcessorStateStrategy.java
@@ -15,31 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.c2.client.service.operation;
 
-import java.util.Arrays;
-import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
 
-public enum OperandType {
-
-    CONFIGURATION,
-    CONNECTION,
-    DEBUG,
-    MANIFEST,
-    REPOSITORY,
-    PROPERTIES,
-    ASSET,
-    RESOURCE,
-    FLOW;
-
-    public static Optional<OperandType> fromString(String value) {
-        return Arrays.stream(values())
-            .filter(operandType -> operandType.name().equalsIgnoreCase(value))
-            .findAny();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString().toLowerCase();
-    }
+public interface ProcessorStateStrategy {
+    OperationState startProcessor(String processorId);
+    OperationState stopProcessor(String processorId);
 }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandler.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandler.java
new file mode 100644
index 0000000000..4a2dbdf09f
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+public class StartProcessorOperationHandler implements C2OperationHandler {
+
+    public static final String PROCESSOR_ID_ARG = 
StopProcessorOperationHandler.PROCESSOR_ID_ARG;
+    public static final String NOT_APPLIED_DETAILS = "Failed to start 
processor (not found or invalid state)";
+    public static final String FULLY_APPLIED_DETAILS = "Processor started";
+    public static final String PARTIALLY_APPLIED_DETAILS = "Processor start 
partially applied";
+
+    private final ProcessorStateStrategy processorStateStrategy;
+
+    public StartProcessorOperationHandler(ProcessorStateStrategy 
processorStateStrategy) {
+        this.processorStateStrategy = processorStateStrategy;
+    }
+
+    @Override
+    public OperationType getOperationType() {
+        return OperationType.START;
+    }
+
+    @Override
+    public OperandType getOperandType() {
+        return OperandType.PROCESSOR;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public C2OperationAck handle(C2Operation operation) {
+        String operationId = 
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+        String processorId = ofNullable(operation.getArgs()).map(a -> 
a.get(PROCESSOR_ID_ARG)).map(Object::toString).orElse(null);
+        C2OperationState.OperationState opState;
+        if (processorId == null) {
+            opState = NOT_APPLIED;
+        } else {
+            opState = processorStateStrategy.startProcessor(processorId);
+        }
+
+        String details = switch (opState) {
+            case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+            case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+            case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+            default -> PARTIALLY_APPLIED_DETAILS;
+        };
+
+        return operationAck(operationId, operationState(opState, details));
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandler.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandler.java
new file mode 100644
index 0000000000..eea5f4a25f
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+public class StopProcessorOperationHandler implements C2OperationHandler {
+
+    public static final String PROCESSOR_ID_ARG = "processorId";
+    public static final String NOT_APPLIED_DETAILS = "Failed to stop processor 
(not found)";
+    public static final String FULLY_APPLIED_DETAILS = "Processor stopped";
+    public static final String PARTIALLY_APPLIED_DETAILS = "Processor stop 
partially applied";
+
+    private final ProcessorStateStrategy processorStateStrategy;
+
+    public StopProcessorOperationHandler(ProcessorStateStrategy 
processorStateStrategy) {
+        this.processorStateStrategy = processorStateStrategy;
+    }
+
+    @Override
+    public OperationType getOperationType() {
+        return OperationType.STOP;
+    }
+
+    @Override
+    public OperandType getOperandType() {
+        return OperandType.PROCESSOR;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public C2OperationAck handle(C2Operation operation) {
+        String operationId = 
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+        String processorId = ofNullable(operation.getArgs()).map(a -> 
a.get(PROCESSOR_ID_ARG)).map(Object::toString).orElse(null);
+        C2OperationState.OperationState opState;
+        if (processorId == null) {
+            opState = NOT_APPLIED;
+        } else {
+            opState = processorStateStrategy.stopProcessor(processorId);
+        }
+
+        String details = switch (opState) {
+            case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+            case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+            case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+            default -> PARTIALLY_APPLIED_DETAILS;
+        };
+
+        return operationAck(operationId, operationState(opState, details));
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandlerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandlerTest.java
new file mode 100644
index 0000000000..ebffca1c1e
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static 
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.FULLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.NOT_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROCESSOR;
+import static org.apache.nifi.c2.protocol.api.OperationType.START;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StartProcessorOperationHandlerTest {
+
+    private static final String OPERATION_ID = "operation id";
+    private static final String PROCESSOR_ID = "processor id";
+
+    @Mock
+    private ProcessorStateStrategy processorStateStrategy;
+
+    @InjectMocks
+    private StartProcessorOperationHandler victim;
+
+    @Test
+    public void testOperationAndOperandTypes() {
+        assertEquals(START, victim.getOperationType());
+        assertEquals(PROCESSOR, victim.getOperandType());
+    }
+
+    @ParameterizedTest(name = "operationId={0} ackOperationId={1} state={2} 
details={3}")
+    @MethodSource("handleArguments")
+    public void testHandle(String operationId, String expectedAckOperationId, 
C2OperationState.OperationState state, String expectedDetails) {
+        
when(processorStateStrategy.startProcessor(PROCESSOR_ID)).thenReturn(state);
+
+        C2Operation operation = anOperation(operationId, PROCESSOR_ID);
+        C2OperationAck ack = victim.handle(operation);
+
+        assertEquals(expectedAckOperationId, ack.getOperationId());
+        assertEquals(state, ack.getOperationState().getState());
+        assertEquals(expectedDetails, ack.getOperationState().getDetails());
+    }
+
+    private static Stream<Arguments> handleArguments() {
+        return Stream.of(
+            Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+            Arguments.of(null, EMPTY, FULLY_APPLIED, FULLY_APPLIED_DETAILS),
+            Arguments.of(null, EMPTY, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+            Arguments.of(null, EMPTY, NO_OPERATION, PARTIALLY_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED, 
NOT_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED, 
FULLY_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION, 
PARTIALLY_APPLIED_DETAILS)
+        );
+    }
+
+    @Test
+    public void testHandleMissingProcessorId() {
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(OPERATION_ID);
+        C2OperationAck ack = victim.handle(operation);
+        assertEquals(OPERATION_ID, ack.getOperationId());
+        assertEquals(NOT_APPLIED, ack.getOperationState().getState());
+        assertEquals(NOT_APPLIED_DETAILS, 
ack.getOperationState().getDetails());
+    }
+
+    private C2Operation anOperation(String operationId, String processorId) {
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(operationId);
+        Map<String, Object> args = new HashMap<>();
+        args.put(StartProcessorOperationHandler.PROCESSOR_ID_ARG, processorId);
+        operation.setArgs(args);
+        return operation;
+    }
+}
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandlerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandlerTest.java
new file mode 100644
index 0000000000..ffdc468f8d
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static 
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler.FULLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler.NOT_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROCESSOR;
+import static org.apache.nifi.c2.protocol.api.OperationType.STOP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StopProcessorOperationHandlerTest {
+
+    private static final String OPERATION_ID = "operation id";
+    private static final String PROCESSOR_ID = "processor id";
+
+    @Mock
+    private ProcessorStateStrategy processorStateStrategy;
+
+    @InjectMocks
+    private StopProcessorOperationHandler victim;
+
+    @Test
+    public void testOperationAndOperandTypes() {
+        assertEquals(STOP, victim.getOperationType());
+        assertEquals(PROCESSOR, victim.getOperandType());
+    }
+
+    @ParameterizedTest(name = "operationId={0} ackOperationId={1} state={2} 
details={3}")
+    @MethodSource("handleArguments")
+    public void testHandle(String operationId, String expectedAckOperationId, 
C2OperationState.OperationState state, String expectedDetails) {
+        
when(processorStateStrategy.stopProcessor(PROCESSOR_ID)).thenReturn(state);
+
+        C2Operation operation = anOperation(operationId, PROCESSOR_ID);
+        C2OperationAck ack = victim.handle(operation);
+
+        assertEquals(expectedAckOperationId, ack.getOperationId());
+        assertEquals(state, ack.getOperationState().getState());
+        assertEquals(expectedDetails, ack.getOperationState().getDetails());
+    }
+
+    private static Stream<Arguments> handleArguments() {
+        return Stream.of(
+            Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+            Arguments.of(null, EMPTY, FULLY_APPLIED, FULLY_APPLIED_DETAILS),
+            Arguments.of(null, EMPTY, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+            Arguments.of(null, EMPTY, NO_OPERATION, PARTIALLY_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED, 
NOT_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED, 
FULLY_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED, 
PARTIALLY_APPLIED_DETAILS),
+            Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION, 
PARTIALLY_APPLIED_DETAILS)
+        );
+    }
+
+    @Test
+    public void testHandleMissingProcessorId() {
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(OPERATION_ID);
+        C2OperationAck ack = victim.handle(operation);
+        assertEquals(OPERATION_ID, ack.getOperationId());
+        assertEquals(NOT_APPLIED, ack.getOperationState().getState());
+        assertEquals(NOT_APPLIED_DETAILS, 
ack.getOperationState().getDetails());
+    }
+
+    private C2Operation anOperation(String operationId, String processorId) {
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(operationId);
+        Map<String, Object> args = new HashMap<>();
+        args.put(StopProcessorOperationHandler.PROCESSOR_ID_ARG, processorId);
+        operation.setArgs(args);
+        return operation;
+    }
+}
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
index 09b0ea4e5e..2a0ff8552c 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
@@ -30,6 +30,7 @@ public enum OperandType {
     PROPERTIES,
     ASSET,
     RESOURCE,
+    PROCESSOR,
     FLOW;
 
     public static Optional<OperandType> fromString(String value) {
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
index 80b0ec7145..e02116e949 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
@@ -24,6 +24,7 @@ import static 
org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
 import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
 import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
 import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROCESSOR;
 import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE;
 
 import java.util.Arrays;
@@ -41,8 +42,8 @@ public enum OperationType {
     DESCRIBE(MANIFEST),
     UPDATE(CONFIGURATION, ASSET, PROPERTIES),
     RESTART,
-    START(FLOW),
-    STOP(FLOW),
+    START(FLOW, PROCESSOR),
+    STOP(FLOW, PROCESSOR),
     PAUSE,
     REPLICATE,
     SUBSCRIBE,
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
index a172200138..3c1ab90584 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
@@ -37,6 +37,7 @@ public class ProcessorStatus implements Serializable {
     private long processingNanos;
     private int activeThreadCount;
     private int terminatedThreadCount;
+    private RunStatus runStatus;
 
     @Schema(description = "The id of the processor")
     public String getId() {
@@ -145,4 +146,13 @@ public class ProcessorStatus implements Serializable {
     public void setTerminatedThreadCount(int terminatedThreadCount) {
         this.terminatedThreadCount = terminatedThreadCount;
     }
+
+    @Schema(description = "The run status of the processor (e.g. RUNNING or 
STOPPED)")
+    public RunStatus getRunStatus() {
+        return runStatus;
+    }
+
+    public void setRunStatus(RunStatus runStatus) {
+        this.runStatus = runStatus;
+    }
 }
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index f2fa27b891..35bd635b5f 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -86,10 +86,13 @@ import 
org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
 import 
org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
 import 
org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
 import org.apache.nifi.c2.client.service.operation.FlowStateStrategy;
+import org.apache.nifi.c2.client.service.operation.ProcessorStateStrategy;
 import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
 import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
 import org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler;
 import org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler;
+import 
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler;
+import 
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler;
 import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
 import 
org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler;
 import 
org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
@@ -116,6 +119,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.manifest.RuntimeManifestService;
 import org.apache.nifi.manifest.StandardRuntimeManifestService;
 import org.apache.nifi.minifi.c2.command.DefaultFlowStateStrategy;
+import org.apache.nifi.minifi.c2.command.DefaultProcessorStateStrategy;
 import org.apache.nifi.minifi.c2.command.DefaultUpdateConfigurationStrategy;
 import org.apache.nifi.minifi.c2.command.PropertiesPersister;
 import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
@@ -251,6 +255,7 @@ public class C2NifiClientService {
         UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider = 
new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
         PropertiesPersister propertiesPersister = new 
PropertiesPersister(updatePropertiesPropertyProvider, 
bootstrapConfigFileLocation);
         FlowStateStrategy defaultFlowStateStrategy = new 
DefaultFlowStateStrategy(flowController);
+        ProcessorStateStrategy defaultProcessorStateStrategy = new 
DefaultProcessorStateStrategy(flowController);
         FlowPropertyAssetReferenceResolver flowPropertyAssetReferenceResolver 
= new 
StandardFlowPropertyAssetReferenceResolverService(resourceRepository::getAbsolutePath);
 
         FlowPropertyEncryptor flowPropertyEncryptor = new 
StandardFlowPropertyEncryptor(
@@ -278,7 +283,10 @@ public class C2NifiClientService {
                 updateAssetCommandHelper::assetUpdatePrecondition, 
updateAssetCommandHelper::assetPersistFunction),
             new 
UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, 
propertiesPersister::persistProperties),
             SyncResourceOperationHandler.create(client, new 
SyncResourcePropertyProvider(), new 
DefaultSyncResourceStrategy(resourceRepository), c2Serializer),
-            new StartFlowOperationHandler(defaultFlowStateStrategy), new 
StopFlowOperationHandler(defaultFlowStateStrategy)
+            new StartFlowOperationHandler(defaultFlowStateStrategy),
+            new StopFlowOperationHandler(defaultFlowStateStrategy),
+            new StartProcessorOperationHandler(defaultProcessorStateStrategy),
+            new StopProcessorOperationHandler(defaultProcessorStateStrategy)
         ));
     }
 
@@ -427,6 +435,11 @@ public class C2NifiClientService {
         result.setProcessingNanos(processorStatus.getProcessingNanos());
         result.setActiveThreadCount(processorStatus.getActiveThreadCount());
         
result.setTerminatedThreadCount(processorStatus.getTerminatedThreadCount());
+        result.setRunStatus(
+                processorStatus.getRunStatus() == 
org.apache.nifi.controller.status.RunStatus.Running
+                        ? RUNNING
+                        : STOPPED
+        );
         return result;
     }
 
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategy.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategy.java
new file mode 100644
index 0000000000..c8bb49d16b
--- /dev/null
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.client.service.operation.ProcessorStateStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.function.BiConsumer;
+
+
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+public class DefaultProcessorStateStrategy implements ProcessorStateStrategy {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultProcessorStateStrategy.class);
+
+    private final FlowController flowController;
+
+    public DefaultProcessorStateStrategy(FlowController flowController) {
+        this.flowController = flowController;
+    }
+
+    @Override
+    public OperationState startProcessor(String processorId) {
+        return changeState(processorId, this::start);
+    }
+
+    @Override
+    public OperationState stopProcessor(String processorId) {
+        return changeState(processorId, this::stop);
+    }
+
+    private OperationState changeState(String processorId, BiConsumer<String, 
String> action) {
+        try {
+            ProcessorNode node = 
flowController.getFlowManager().getProcessorNode(processorId);
+            if (node == null) {
+                LOGGER.warn("Processor with id {} not found", processorId);
+                return NOT_APPLIED;
+            }
+            String parentGroupId = node.getProcessGroupIdentifier();
+            action.accept(processorId, parentGroupId);
+            return FULLY_APPLIED;
+        } catch (Exception e) {
+            LOGGER.error("Failed to change state for processor {}", 
processorId, e);
+            return NOT_APPLIED;
+        }
+    }
+
+    private void start(String processorId, String parentGroupId) {
+        flowController.startProcessor(parentGroupId, processorId, true);
+        LOGGER.info("Started processor {} (group={})", processorId, 
parentGroupId);
+    }
+
+    private void stop(String processorId, String parentGroupId) {
+        flowController.stopProcessor(parentGroupId, processorId);
+        LOGGER.info("Stopped processor {} (group={})", processorId, 
parentGroupId);
+    }
+}
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategyTest.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategyTest.java
new file mode 100644
index 0000000000..155dfbca3d
--- /dev/null
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static 
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultProcessorStateStrategyTest {
+
+    private static final String PROCESSOR_ID = 
"e2f2b9f6-1a7a-4b65-9e58-9a2b3f6a0c01";
+    private static final String GROUP_ID = 
"0b0d53b7-3b5d-4d88-a3f0-2d6c5dc9d8bb";
+
+    @Mock
+    private FlowController flowController;
+
+    @Mock
+    private FlowManager flowManager;
+
+    @InjectMocks
+    private DefaultProcessorStateStrategy victim;
+
+    @BeforeEach
+    public void init() {
+        when(flowController.getFlowManager()).thenReturn(flowManager);
+    }
+
+    @Test
+    public void testStartProcessorFullyApplied() {
+        ProcessorNode node = mock(ProcessorNode.class);
+        when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+        when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+
+        OperationState result = victim.startProcessor(PROCESSOR_ID);
+
+        assertEquals(FULLY_APPLIED, result);
+        verify(flowController).startProcessor(GROUP_ID, PROCESSOR_ID, true);
+    }
+
+    @Test
+    public void testStartProcessorNotAppliedWhenMissing() {
+        when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(null);
+
+        OperationState result = victim.startProcessor(PROCESSOR_ID);
+
+        assertEquals(NOT_APPLIED, result);
+    }
+
+    @Test
+    public void testStartProcessorNotAppliedOnError() {
+        ProcessorNode node = mock(ProcessorNode.class);
+        when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+        when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+        doThrow(new 
RuntimeException("boom")).when(flowController).startProcessor(GROUP_ID, 
PROCESSOR_ID, true);
+
+        OperationState result = victim.startProcessor(PROCESSOR_ID);
+
+        assertEquals(NOT_APPLIED, result);
+    }
+
+    @Test
+    public void testStopProcessorFullyApplied() {
+        ProcessorNode node = mock(ProcessorNode.class);
+        when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+        when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+
+        OperationState result = victim.stopProcessor(PROCESSOR_ID);
+
+        assertEquals(FULLY_APPLIED, result);
+        verify(flowController).stopProcessor(GROUP_ID, PROCESSOR_ID);
+    }
+
+    @Test
+    public void testStopProcessorNotAppliedOnError() {
+        ProcessorNode node = mock(ProcessorNode.class);
+        when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+        when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+        doThrow(new 
RuntimeException("boom")).when(flowController).stopProcessor(GROUP_ID, 
PROCESSOR_ID);
+
+        OperationState result = victim.stopProcessor(PROCESSOR_ID);
+
+        assertEquals(NOT_APPLIED, result);
+    }
+
+    @Test
+    public void testStopProcessorNotAppliedWhenMissing() {
+        when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(null);
+
+        OperationState result = victim.stopProcessor(PROCESSOR_ID);
+
+        assertEquals(NOT_APPLIED, result);
+    }
+}


Reply via email to