This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new df97c696a6 NIFI-15017 Add start/stop C2 command for processors in
MiNiFi
df97c696a6 is described below
commit df97c696a692ff3c5f61b733f81a70f41311430e
Author: Mihai <[email protected]>
AuthorDate: Mon Sep 22 18:25:24 2025 +0300
NIFI-15017 Add start/stop C2 command for processors in MiNiFi
Closes #10353
Signed-off-by: Marton Szasz <[email protected]>
---
.../service/operation/ProcessorStateStrategy.java} | 30 +----
.../operation/StartProcessorOperationHandler.java | 81 ++++++++++++++
.../operation/StopProcessorOperationHandler.java | 81 ++++++++++++++
.../StartProcessorOperationHandlerTest.java | 111 ++++++++++++++++++
.../StopProcessorOperationHandlerTest.java | 111 ++++++++++++++++++
.../apache/nifi/c2/protocol/api/OperandType.java | 1 +
.../apache/nifi/c2/protocol/api/OperationType.java | 5 +-
.../nifi/c2/protocol/api/ProcessorStatus.java | 10 ++
.../apache/nifi/minifi/c2/C2NifiClientService.java | 15 ++-
.../c2/command/DefaultProcessorStateStrategy.java | 77 +++++++++++++
.../command/DefaultProcessorStateStrategyTest.java | 124 +++++++++++++++++++++
11 files changed, 618 insertions(+), 28 deletions(-)
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/ProcessorStateStrategy.java
similarity index 58%
copy from
c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/ProcessorStateStrategy.java
index 09b0ea4e5e..ea624da53c 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/ProcessorStateStrategy.java
@@ -15,31 +15,11 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.c2.client.service.operation;
-import java.util.Arrays;
-import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
-public enum OperandType {
-
- CONFIGURATION,
- CONNECTION,
- DEBUG,
- MANIFEST,
- REPOSITORY,
- PROPERTIES,
- ASSET,
- RESOURCE,
- FLOW;
-
- public static Optional<OperandType> fromString(String value) {
- return Arrays.stream(values())
- .filter(operandType -> operandType.name().equalsIgnoreCase(value))
- .findAny();
- }
-
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
+public interface ProcessorStateStrategy {
+ OperationState startProcessor(String processorId);
+ OperationState stopProcessor(String processorId);
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandler.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandler.java
new file mode 100644
index 0000000000..4a2dbdf09f
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+public class StartProcessorOperationHandler implements C2OperationHandler {
+
+ public static final String PROCESSOR_ID_ARG =
StopProcessorOperationHandler.PROCESSOR_ID_ARG;
+ public static final String NOT_APPLIED_DETAILS = "Failed to start
processor (not found or invalid state)";
+ public static final String FULLY_APPLIED_DETAILS = "Processor started";
+ public static final String PARTIALLY_APPLIED_DETAILS = "Processor start
partially applied";
+
+ private final ProcessorStateStrategy processorStateStrategy;
+
+ public StartProcessorOperationHandler(ProcessorStateStrategy
processorStateStrategy) {
+ this.processorStateStrategy = processorStateStrategy;
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return OperationType.START;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return OperandType.PROCESSOR;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String operationId =
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+ String processorId = ofNullable(operation.getArgs()).map(a ->
a.get(PROCESSOR_ID_ARG)).map(Object::toString).orElse(null);
+ C2OperationState.OperationState opState;
+ if (processorId == null) {
+ opState = NOT_APPLIED;
+ } else {
+ opState = processorStateStrategy.startProcessor(processorId);
+ }
+
+ String details = switch (opState) {
+ case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+ case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+ case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+ default -> PARTIALLY_APPLIED_DETAILS;
+ };
+
+ return operationAck(operationId, operationState(opState, details));
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandler.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandler.java
new file mode 100644
index 0000000000..eea5f4a25f
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+public class StopProcessorOperationHandler implements C2OperationHandler {
+
+ public static final String PROCESSOR_ID_ARG = "processorId";
+ public static final String NOT_APPLIED_DETAILS = "Failed to stop processor
(not found)";
+ public static final String FULLY_APPLIED_DETAILS = "Processor stopped";
+ public static final String PARTIALLY_APPLIED_DETAILS = "Processor stop
partially applied";
+
+ private final ProcessorStateStrategy processorStateStrategy;
+
+ public StopProcessorOperationHandler(ProcessorStateStrategy
processorStateStrategy) {
+ this.processorStateStrategy = processorStateStrategy;
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return OperationType.STOP;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return OperandType.PROCESSOR;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ String operationId =
ofNullable(operation.getIdentifier()).orElse(EMPTY);
+ String processorId = ofNullable(operation.getArgs()).map(a ->
a.get(PROCESSOR_ID_ARG)).map(Object::toString).orElse(null);
+ C2OperationState.OperationState opState;
+ if (processorId == null) {
+ opState = NOT_APPLIED;
+ } else {
+ opState = processorStateStrategy.stopProcessor(processorId);
+ }
+
+ String details = switch (opState) {
+ case NOT_APPLIED -> NOT_APPLIED_DETAILS;
+ case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
+ case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
+ default -> PARTIALLY_APPLIED_DETAILS;
+ };
+
+ return operationAck(operationId, operationState(opState, details));
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandlerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandlerTest.java
new file mode 100644
index 0000000000..ebffca1c1e
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StartProcessorOperationHandlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.FULLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.NOT_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROCESSOR;
+import static org.apache.nifi.c2.protocol.api.OperationType.START;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StartProcessorOperationHandlerTest {
+
+ private static final String OPERATION_ID = "operation id";
+ private static final String PROCESSOR_ID = "processor id";
+
+ @Mock
+ private ProcessorStateStrategy processorStateStrategy;
+
+ @InjectMocks
+ private StartProcessorOperationHandler victim;
+
+ @Test
+ public void testOperationAndOperandTypes() {
+ assertEquals(START, victim.getOperationType());
+ assertEquals(PROCESSOR, victim.getOperandType());
+ }
+
+ @ParameterizedTest(name = "operationId={0} ackOperationId={1} state={2}
details={3}")
+ @MethodSource("handleArguments")
+ public void testHandle(String operationId, String expectedAckOperationId,
C2OperationState.OperationState state, String expectedDetails) {
+
when(processorStateStrategy.startProcessor(PROCESSOR_ID)).thenReturn(state);
+
+ C2Operation operation = anOperation(operationId, PROCESSOR_ID);
+ C2OperationAck ack = victim.handle(operation);
+
+ assertEquals(expectedAckOperationId, ack.getOperationId());
+ assertEquals(state, ack.getOperationState().getState());
+ assertEquals(expectedDetails, ack.getOperationState().getDetails());
+ }
+
+ private static Stream<Arguments> handleArguments() {
+ return Stream.of(
+ Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, FULLY_APPLIED, FULLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, NO_OPERATION, PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED,
NOT_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED,
FULLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION,
PARTIALLY_APPLIED_DETAILS)
+ );
+ }
+
+ @Test
+ public void testHandleMissingProcessorId() {
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(OPERATION_ID);
+ C2OperationAck ack = victim.handle(operation);
+ assertEquals(OPERATION_ID, ack.getOperationId());
+ assertEquals(NOT_APPLIED, ack.getOperationState().getState());
+ assertEquals(NOT_APPLIED_DETAILS,
ack.getOperationState().getDetails());
+ }
+
+ private C2Operation anOperation(String operationId, String processorId) {
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(operationId);
+ Map<String, Object> args = new HashMap<>();
+ args.put(StartProcessorOperationHandler.PROCESSOR_ID_ARG, processorId);
+ operation.setArgs(args);
+ return operation;
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandlerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandlerTest.java
new file mode 100644
index 0000000000..ffdc468f8d
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/StopProcessorOperationHandlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler.FULLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler.NOT_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler.PARTIALLY_APPLIED_DETAILS;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROCESSOR;
+import static org.apache.nifi.c2.protocol.api.OperationType.STOP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StopProcessorOperationHandlerTest {
+
+ private static final String OPERATION_ID = "operation id";
+ private static final String PROCESSOR_ID = "processor id";
+
+ @Mock
+ private ProcessorStateStrategy processorStateStrategy;
+
+ @InjectMocks
+ private StopProcessorOperationHandler victim;
+
+ @Test
+ public void testOperationAndOperandTypes() {
+ assertEquals(STOP, victim.getOperationType());
+ assertEquals(PROCESSOR, victim.getOperandType());
+ }
+
+ @ParameterizedTest(name = "operationId={0} ackOperationId={1} state={2}
details={3}")
+ @MethodSource("handleArguments")
+ public void testHandle(String operationId, String expectedAckOperationId,
C2OperationState.OperationState state, String expectedDetails) {
+
when(processorStateStrategy.stopProcessor(PROCESSOR_ID)).thenReturn(state);
+
+ C2Operation operation = anOperation(operationId, PROCESSOR_ID);
+ C2OperationAck ack = victim.handle(operation);
+
+ assertEquals(expectedAckOperationId, ack.getOperationId());
+ assertEquals(state, ack.getOperationState().getState());
+ assertEquals(expectedDetails, ack.getOperationState().getDetails());
+ }
+
+ private static Stream<Arguments> handleArguments() {
+ return Stream.of(
+ Arguments.of(null, EMPTY, NOT_APPLIED, NOT_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, FULLY_APPLIED, FULLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(null, EMPTY, NO_OPERATION, PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NOT_APPLIED,
NOT_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, FULLY_APPLIED,
FULLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, PARTIALLY_APPLIED,
PARTIALLY_APPLIED_DETAILS),
+ Arguments.of(OPERATION_ID, OPERATION_ID, NO_OPERATION,
PARTIALLY_APPLIED_DETAILS)
+ );
+ }
+
+ @Test
+ public void testHandleMissingProcessorId() {
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(OPERATION_ID);
+ C2OperationAck ack = victim.handle(operation);
+ assertEquals(OPERATION_ID, ack.getOperationId());
+ assertEquals(NOT_APPLIED, ack.getOperationState().getState());
+ assertEquals(NOT_APPLIED_DETAILS,
ack.getOperationState().getDetails());
+ }
+
+ private C2Operation anOperation(String operationId, String processorId) {
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(operationId);
+ Map<String, Object> args = new HashMap<>();
+ args.put(StopProcessorOperationHandler.PROCESSOR_ID_ARG, processorId);
+ operation.setArgs(args);
+ return operation;
+ }
+}
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
index 09b0ea4e5e..2a0ff8552c 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
@@ -30,6 +30,7 @@ public enum OperandType {
PROPERTIES,
ASSET,
RESOURCE,
+ PROCESSOR,
FLOW;
public static Optional<OperandType> fromString(String value) {
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
index 80b0ec7145..e02116e949 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
@@ -24,6 +24,7 @@ import static
org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
import static org.apache.nifi.c2.protocol.api.OperandType.FLOW;
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROCESSOR;
import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE;
import java.util.Arrays;
@@ -41,8 +42,8 @@ public enum OperationType {
DESCRIBE(MANIFEST),
UPDATE(CONFIGURATION, ASSET, PROPERTIES),
RESTART,
- START(FLOW),
- STOP(FLOW),
+ START(FLOW, PROCESSOR),
+ STOP(FLOW, PROCESSOR),
PAUSE,
REPLICATE,
SUBSCRIBE,
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
index a172200138..3c1ab90584 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java
@@ -37,6 +37,7 @@ public class ProcessorStatus implements Serializable {
private long processingNanos;
private int activeThreadCount;
private int terminatedThreadCount;
+ private RunStatus runStatus;
@Schema(description = "The id of the processor")
public String getId() {
@@ -145,4 +146,13 @@ public class ProcessorStatus implements Serializable {
public void setTerminatedThreadCount(int terminatedThreadCount) {
this.terminatedThreadCount = terminatedThreadCount;
}
+
+ @Schema(description = "The run status of the processor (e.g. RUNNING or
STOPPED)")
+ public RunStatus getRunStatus() {
+ return runStatus;
+ }
+
+ public void setRunStatus(RunStatus runStatus) {
+ this.runStatus = runStatus;
+ }
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index f2fa27b891..35bd635b5f 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -86,10 +86,13 @@ import
org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import
org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
import
org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.FlowStateStrategy;
+import org.apache.nifi.c2.client.service.operation.ProcessorStateStrategy;
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.client.service.operation.StartFlowOperationHandler;
import org.apache.nifi.c2.client.service.operation.StopFlowOperationHandler;
+import
org.apache.nifi.c2.client.service.operation.StartProcessorOperationHandler;
+import
org.apache.nifi.c2.client.service.operation.StopProcessorOperationHandler;
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
import
org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler;
import
org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
@@ -116,6 +119,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.manifest.RuntimeManifestService;
import org.apache.nifi.manifest.StandardRuntimeManifestService;
import org.apache.nifi.minifi.c2.command.DefaultFlowStateStrategy;
+import org.apache.nifi.minifi.c2.command.DefaultProcessorStateStrategy;
import org.apache.nifi.minifi.c2.command.DefaultUpdateConfigurationStrategy;
import org.apache.nifi.minifi.c2.command.PropertiesPersister;
import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
@@ -251,6 +255,7 @@ public class C2NifiClientService {
UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider =
new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
PropertiesPersister propertiesPersister = new
PropertiesPersister(updatePropertiesPropertyProvider,
bootstrapConfigFileLocation);
FlowStateStrategy defaultFlowStateStrategy = new
DefaultFlowStateStrategy(flowController);
+ ProcessorStateStrategy defaultProcessorStateStrategy = new
DefaultProcessorStateStrategy(flowController);
FlowPropertyAssetReferenceResolver flowPropertyAssetReferenceResolver
= new
StandardFlowPropertyAssetReferenceResolverService(resourceRepository::getAbsolutePath);
FlowPropertyEncryptor flowPropertyEncryptor = new
StandardFlowPropertyEncryptor(
@@ -278,7 +283,10 @@ public class C2NifiClientService {
updateAssetCommandHelper::assetUpdatePrecondition,
updateAssetCommandHelper::assetPersistFunction),
new
UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider,
propertiesPersister::persistProperties),
SyncResourceOperationHandler.create(client, new
SyncResourcePropertyProvider(), new
DefaultSyncResourceStrategy(resourceRepository), c2Serializer),
- new StartFlowOperationHandler(defaultFlowStateStrategy), new
StopFlowOperationHandler(defaultFlowStateStrategy)
+ new StartFlowOperationHandler(defaultFlowStateStrategy),
+ new StopFlowOperationHandler(defaultFlowStateStrategy),
+ new StartProcessorOperationHandler(defaultProcessorStateStrategy),
+ new StopProcessorOperationHandler(defaultProcessorStateStrategy)
));
}
@@ -427,6 +435,11 @@ public class C2NifiClientService {
result.setProcessingNanos(processorStatus.getProcessingNanos());
result.setActiveThreadCount(processorStatus.getActiveThreadCount());
result.setTerminatedThreadCount(processorStatus.getTerminatedThreadCount());
+ result.setRunStatus(
+ processorStatus.getRunStatus() ==
org.apache.nifi.controller.status.RunStatus.Running
+ ? RUNNING
+ : STOPPED
+ );
return result;
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategy.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategy.java
new file mode 100644
index 0000000000..c8bb49d16b
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.c2.client.service.operation.ProcessorStateStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.function.BiConsumer;
+
+
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+public class DefaultProcessorStateStrategy implements ProcessorStateStrategy {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultProcessorStateStrategy.class);
+
+ private final FlowController flowController;
+
+ public DefaultProcessorStateStrategy(FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public OperationState startProcessor(String processorId) {
+ return changeState(processorId, this::start);
+ }
+
+ @Override
+ public OperationState stopProcessor(String processorId) {
+ return changeState(processorId, this::stop);
+ }
+
+ private OperationState changeState(String processorId, BiConsumer<String,
String> action) {
+ try {
+ ProcessorNode node =
flowController.getFlowManager().getProcessorNode(processorId);
+ if (node == null) {
+ LOGGER.warn("Processor with id {} not found", processorId);
+ return NOT_APPLIED;
+ }
+ String parentGroupId = node.getProcessGroupIdentifier();
+ action.accept(processorId, parentGroupId);
+ return FULLY_APPLIED;
+ } catch (Exception e) {
+ LOGGER.error("Failed to change state for processor {}",
processorId, e);
+ return NOT_APPLIED;
+ }
+ }
+
+ private void start(String processorId, String parentGroupId) {
+ flowController.startProcessor(parentGroupId, processorId, true);
+ LOGGER.info("Started processor {} (group={})", processorId,
parentGroupId);
+ }
+
+ private void stop(String processorId, String parentGroupId) {
+ flowController.stopProcessor(parentGroupId, processorId);
+ LOGGER.info("Stopped processor {} (group={})", processorId,
parentGroupId);
+ }
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategyTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategyTest.java
new file mode 100644
index 0000000000..155dfbca3d
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultProcessorStateStrategyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultProcessorStateStrategyTest {
+
+ private static final String PROCESSOR_ID =
"e2f2b9f6-1a7a-4b65-9e58-9a2b3f6a0c01";
+ private static final String GROUP_ID =
"0b0d53b7-3b5d-4d88-a3f0-2d6c5dc9d8bb";
+
+ @Mock
+ private FlowController flowController;
+
+ @Mock
+ private FlowManager flowManager;
+
+ @InjectMocks
+ private DefaultProcessorStateStrategy victim;
+
+ @BeforeEach
+ public void init() {
+ when(flowController.getFlowManager()).thenReturn(flowManager);
+ }
+
+ @Test
+ public void testStartProcessorFullyApplied() {
+ ProcessorNode node = mock(ProcessorNode.class);
+ when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+ when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+
+ OperationState result = victim.startProcessor(PROCESSOR_ID);
+
+ assertEquals(FULLY_APPLIED, result);
+ verify(flowController).startProcessor(GROUP_ID, PROCESSOR_ID, true);
+ }
+
+ @Test
+ public void testStartProcessorNotAppliedWhenMissing() {
+ when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(null);
+
+ OperationState result = victim.startProcessor(PROCESSOR_ID);
+
+ assertEquals(NOT_APPLIED, result);
+ }
+
+ @Test
+ public void testStartProcessorNotAppliedOnError() {
+ ProcessorNode node = mock(ProcessorNode.class);
+ when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+ when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+ doThrow(new
RuntimeException("boom")).when(flowController).startProcessor(GROUP_ID,
PROCESSOR_ID, true);
+
+ OperationState result = victim.startProcessor(PROCESSOR_ID);
+
+ assertEquals(NOT_APPLIED, result);
+ }
+
+ @Test
+ public void testStopProcessorFullyApplied() {
+ ProcessorNode node = mock(ProcessorNode.class);
+ when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+ when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+
+ OperationState result = victim.stopProcessor(PROCESSOR_ID);
+
+ assertEquals(FULLY_APPLIED, result);
+ verify(flowController).stopProcessor(GROUP_ID, PROCESSOR_ID);
+ }
+
+ @Test
+ public void testStopProcessorNotAppliedOnError() {
+ ProcessorNode node = mock(ProcessorNode.class);
+ when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(node);
+ when(node.getProcessGroupIdentifier()).thenReturn(GROUP_ID);
+ doThrow(new
RuntimeException("boom")).when(flowController).stopProcessor(GROUP_ID,
PROCESSOR_ID);
+
+ OperationState result = victim.stopProcessor(PROCESSOR_ID);
+
+ assertEquals(NOT_APPLIED, result);
+ }
+
+ @Test
+ public void testStopProcessorNotAppliedWhenMissing() {
+ when(flowManager.getProcessorNode(PROCESSOR_ID)).thenReturn(null);
+
+ OperationState result = victim.stopProcessor(PROCESSOR_ID);
+
+ assertEquals(NOT_APPLIED, result);
+ }
+}