This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e2225798cf NIFI-12598 Allow Starting/Stopping of PG components to be
recorded in Flow Configuration History when started/stopped at PG level
e2225798cf is described below
commit e2225798cfd02c06e1d9aeb437de04e1f0359a66
Author: Nissim Shiman <[email protected]>
AuthorDate: Fri Dec 5 18:40:27 2025 +0000
NIFI-12598 Allow Starting/Stopping of PG components to be recorded in Flow
Configuration History when started/stopped at PG level
Signed-off-by: Pierre Villard <[email protected]>
This closes #10696.
---
.../org/apache/nifi/audit/ProcessGroupAuditor.java | 38 ++++++
.../apache/nifi/audit/TestProcessGroupAuditor.java | 137 +++++++++++++++++++--
2 files changed, 165 insertions(+), 10 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
index 33259dac39..a7adad1307 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
@@ -31,8 +31,10 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
@@ -223,6 +225,7 @@ public class ProcessGroupAuditor extends NiFiAuditor {
}
saveUpdateProcessGroupAction(groupId, operation);
+ saveActions(getComponentActions(groupId, componentIds, operation),
logger);
}
/**
@@ -273,6 +276,21 @@ public class ProcessGroupAuditor extends NiFiAuditor {
port = processGroup.findOutputPort(componentId);
if (port != null) {
actions.add(generateUpdateConnectableAction(port, operation,
Component.OutputPort));
+ continue;
+ }
+
+ ProcessGroup internalProcessGroup =
processGroup.findProcessGroup(componentId);
+ if (internalProcessGroup != null) {
+
actions.add(generateUpdateProcessGroupAction(internalProcessGroup, operation));
+ continue;
+ }
+
+ RemoteGroupPort remoteGroupPort =
processGroup.findRemoteGroupPort(componentId);
+ if (remoteGroupPort != null) {
+ RemoteProcessGroup remoteProcessGroup =
remoteGroupPort.getRemoteProcessGroup();
+ if (remoteProcessGroup != null) {
+
actions.add(generateUpdateRemoteProcessGroupAction(remoteProcessGroup,
operation));
+ }
}
}
@@ -402,6 +420,26 @@ public class ProcessGroupAuditor extends NiFiAuditor {
saveAction(action, logger);
}
+ private Action generateUpdateProcessGroupAction(final ProcessGroup
processGroup, final Operation operation) {
+ final FlowChangeAction action = createFlowChangeAction();
+ action.setSourceId(processGroup.getIdentifier());
+ action.setSourceName(processGroup.getName());
+ action.setSourceType(Component.ProcessGroup);
+ action.setOperation(operation);
+
+ return action;
+ }
+
+ private Action generateUpdateRemoteProcessGroupAction(final
RemoteProcessGroup remoteProcessGroup, final Operation operation) {
+ final FlowChangeAction action = createFlowChangeAction();
+ action.setSourceId(remoteProcessGroup.getIdentifier());
+ action.setSourceName(remoteProcessGroup.getName());
+ action.setSourceType(Component.RemoteProcessGroup);
+ action.setOperation(operation);
+
+ return action;
+ }
+
private Action generateUpdateConnectableAction(final Connectable
connectable, final Operation operation, final Component component) {
final FlowChangeAction action = createFlowChangeAction();
action.setSourceId(connectable.getIdentifier());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
index 0e8d78b1a3..3d5f5bd1ac 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
@@ -34,6 +34,9 @@ import
org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.dao.impl.StandardProcessGroupDAO;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -73,10 +76,13 @@ import static org.mockito.Mockito.when;
public class TestProcessGroupAuditor {
private static final String PG_1 = "processGroup1";
+ private static final String PG_2 = "processGroup2";
private static final String PROC_1 = "processor1";
private static final String PROC_2 = "processor2";
private static final String INPUT_PORT = "inputPort";
private static final String OUTPUT_PORT = "outputPort";
+ private static final String REMOTE_PG = "remotePG";
+ private static final String REMOTE_GROUP_INPUT_PORT =
"remoteGroupInputPort";
private static final String CS_1 = "controllerService1";
private static final String USER_ID = "user-id";
@@ -117,26 +123,137 @@ public class TestProcessGroupAuditor {
void testVerifyStartProcessGroupAuditing() {
final ProcessorNode processor1 = mock(StandardProcessorNode.class);
final ProcessorNode processor2 = mock(StandardProcessorNode.class);
+ final Port inputPort = mock(Port.class);
+ final Port outputPort = mock(Port.class);
+ final ProcessGroup innerProcessGroup = mock(ProcessGroup.class);
+ final StatelessGroupNode statelessGroup =
mock(StatelessGroupNode.class);
+ final RemoteProcessGroup remoteProcessGroup =
mock(RemoteProcessGroup.class);
+ final RemoteGroupPort remoteGroupInputPort =
mock(RemoteGroupPort.class);
+
+ when(processor1.getName()).thenReturn(PROC_1);
when(processor1.getProcessGroup()).thenReturn(processGroup);
- when(processor2.getProcessGroup()).thenReturn(processGroup);
when(processor1.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+
+ when(processor2.getName()).thenReturn(PROC_2);
+ when(processor2.getProcessGroup()).thenReturn(processGroup);
when(processor2.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+ when(inputPort.getName()).thenReturn(INPUT_PORT);
+ when(inputPort.getProcessGroup()).thenReturn(processGroup);
+
when(inputPort.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT);
+
+ when(outputPort.getName()).thenReturn(OUTPUT_PORT);
+ when(outputPort.getProcessGroup()).thenReturn(processGroup);
+
when(outputPort.getConnectableType()).thenReturn(ConnectableType.OUTPUT_PORT);
+
+ when(innerProcessGroup.getName()).thenReturn(PG_2);
+ when(statelessGroup.getProcessGroup()).thenReturn(processGroup);
+
when(statelessGroup.getConnectableType()).thenReturn(ConnectableType.STATELESS_GROUP);
+
+ when(remoteProcessGroup.getName()).thenReturn(REMOTE_PG);
+ when(remoteGroupInputPort.getProcessGroup()).thenReturn(processGroup);
+
when(remoteGroupInputPort.getConnectableType()).thenReturn(ConnectableType.REMOTE_INPUT_PORT);
+
+ when(processGroup.findProcessor(PROC_1)).thenReturn(processor1);
+ when(processGroup.findProcessor(PROC_2)).thenReturn(processor2);
+
+ when(processGroup.findProcessor(INPUT_PORT)).thenReturn(null);
+ when(processGroup.findInputPort(INPUT_PORT)).thenReturn(inputPort);
+
+ when(processGroup.findProcessor(OUTPUT_PORT)).thenReturn(null);
+ when(processGroup.findInputPort(OUTPUT_PORT)).thenReturn(null);
+ when(processGroup.findOutputPort(OUTPUT_PORT)).thenReturn(outputPort);
+
+ when(processGroup.findProcessor(PG_2)).thenReturn(null);
+ when(processGroup.findInputPort(PG_2)).thenReturn(null);
+ when(processGroup.findOutputPort(PG_2)).thenReturn(null);
+
when(processGroup.findProcessGroup(PG_2)).thenReturn(innerProcessGroup);
+
+
when(processGroup.findProcessor(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+
when(processGroup.findInputPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+
when(processGroup.findOutputPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+
when(processGroup.findProcessGroup(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+
when(processGroup.findRemoteGroupPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(remoteGroupInputPort);
+
when(remoteGroupInputPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);
+
when(flowManager.getGroup(eq(PG_1))).thenReturn(processGroup);
when(flowManager.findConnectable(eq(PROC_1))).thenReturn(processor1);
when(flowManager.findConnectable(eq(PROC_2))).thenReturn(processor2);
+
when(flowManager.findConnectable(eq(INPUT_PORT))).thenReturn(inputPort);
+
when(flowManager.findConnectable(eq(OUTPUT_PORT))).thenReturn(outputPort);
+ when(flowManager.findConnectable(eq(PG_2))).thenReturn(statelessGroup);
+
when(flowManager.findConnectable(eq(REMOTE_GROUP_INPUT_PORT))).thenReturn(remoteGroupInputPort);
+
when(flowController.getFlowManager()).thenReturn(flowManager);
- processGroupDAO.scheduleComponents(PG_1, ScheduledState.RUNNING, new
HashSet<>(Arrays.asList(PROC_1, PROC_2)));
+ processGroupDAO.scheduleComponents(PG_1, ScheduledState.RUNNING, new
HashSet<>(Arrays.asList(PROC_1, PG_2, PROC_2, INPUT_PORT,
REMOTE_GROUP_INPUT_PORT, OUTPUT_PORT)));
+
+ verify(auditService,
times(2)).addActions(argumentCaptorActions.capture());
+ final List<List<Action>> actions =
argumentCaptorActions.getAllValues();
+ assertEquals(2, actions.size());
+ final Iterator<List<Action>> actionsIterator = actions.iterator();
- verify(auditService).addActions(argumentCaptorActions.capture());
- final List<Action> actions = argumentCaptorActions.getValue();
- assertEquals(1, actions.size());
- final Action action = actions.getFirst();
- assertInstanceOf(FlowChangeAction.class, action);
- assertEquals(USER_ID, action.getUserIdentity());
- assertEquals("ProcessGroup", action.getSourceType().name());
- assertEquals(Operation.Start, action.getOperation());
+ // pg started
+ final List<Action> pgActions = actionsIterator.next();
+ assertEquals(1, pgActions.size());
+ final Action pgAction = pgActions.iterator().next();
+ assertInstanceOf(FlowChangeAction.class, pgAction);
+ assertEquals(USER_ID, pgAction.getUserIdentity());
+ assertEquals("ProcessGroup", pgAction.getSourceType().name());
+ assertEquals(Operation.Start, pgAction.getOperation());
+
+ List<Action> componentActions = actionsIterator.next();
+ assertEquals(6, componentActions.size());
+
+ componentActions.sort(Comparator.comparing(Action::getSourceName));
+
+ // inputPort started
+ final Iterator<Action> actionIterator = componentActions.iterator();
+ Action componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("InputPort", componentAction.getSourceType().name());
+ assertEquals(INPUT_PORT, componentAction.getSourceName());
+ assertEquals(Operation.Start, componentAction.getOperation());
+
+ // outputPort started
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("OutputPort", componentAction.getSourceType().name());
+ assertEquals(OUTPUT_PORT, componentAction.getSourceName());
+ assertEquals(Operation.Start, componentAction.getOperation());
+
+ // inner stateless pg started
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("ProcessGroup", componentAction.getSourceType().name());
+ assertEquals(PG_2, componentAction.getSourceName());
+ assertEquals(Operation.Start, componentAction.getOperation());
+
+ // processors started
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("Processor", componentAction.getSourceType().name());
+ assertEquals(PROC_1, componentAction.getSourceName());
+ assertEquals(Operation.Start, componentAction.getOperation());
+
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("Processor", componentAction.getSourceType().name());
+ assertEquals(PROC_2, componentAction.getSourceName());
+ assertEquals(Operation.Start, componentAction.getOperation());
+
+ // remote PG started
+ componentAction = actionIterator.next();
+ assertInstanceOf(FlowChangeAction.class, componentAction);
+ assertEquals(USER_ID, componentAction.getUserIdentity());
+ assertEquals("RemoteProcessGroup",
componentAction.getSourceType().name());
+ assertEquals(REMOTE_PG, componentAction.getSourceName());
+ assertEquals(Operation.Start, componentAction.getOperation());
}
@Test