This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e2225798cf NIFI-12598 Allow Starting/Stopping of PG components to be 
recorded in Flow Configuration History when started/stopped at PG level
e2225798cf is described below

commit e2225798cfd02c06e1d9aeb437de04e1f0359a66
Author: Nissim Shiman <[email protected]>
AuthorDate: Fri Dec 5 18:40:27 2025 +0000

    NIFI-12598 Allow Starting/Stopping of PG components to be recorded in Flow 
Configuration History when started/stopped at PG level
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10696.
---
 .../org/apache/nifi/audit/ProcessGroupAuditor.java |  38 ++++++
 .../apache/nifi/audit/TestProcessGroupAuditor.java | 137 +++++++++++++++++++--
 2 files changed, 165 insertions(+), 10 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
index 33259dac39..a7adad1307 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
@@ -31,8 +31,10 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.parameter.ParameterContext;
 import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
@@ -223,6 +225,7 @@ public class ProcessGroupAuditor extends NiFiAuditor {
         }
 
         saveUpdateProcessGroupAction(groupId, operation);
+        saveActions(getComponentActions(groupId, componentIds, operation), 
logger);
     }
 
     /**
@@ -273,6 +276,21 @@ public class ProcessGroupAuditor extends NiFiAuditor {
             port = processGroup.findOutputPort(componentId);
             if (port != null) {
                 actions.add(generateUpdateConnectableAction(port, operation, 
Component.OutputPort));
+                continue;
+            }
+
+            ProcessGroup internalProcessGroup = 
processGroup.findProcessGroup(componentId);
+            if (internalProcessGroup != null) {
+                
actions.add(generateUpdateProcessGroupAction(internalProcessGroup, operation));
+                continue;
+            }
+
+            RemoteGroupPort remoteGroupPort = 
processGroup.findRemoteGroupPort(componentId);
+            if (remoteGroupPort != null) {
+                RemoteProcessGroup remoteProcessGroup = 
remoteGroupPort.getRemoteProcessGroup();
+                if (remoteProcessGroup != null) {
+                    
actions.add(generateUpdateRemoteProcessGroupAction(remoteProcessGroup, 
operation));
+                }
             }
         }
 
@@ -402,6 +420,26 @@ public class ProcessGroupAuditor extends NiFiAuditor {
         saveAction(action, logger);
     }
 
+    private Action generateUpdateProcessGroupAction(final ProcessGroup 
processGroup, final Operation operation) {
+        final FlowChangeAction action = createFlowChangeAction();
+        action.setSourceId(processGroup.getIdentifier());
+        action.setSourceName(processGroup.getName());
+        action.setSourceType(Component.ProcessGroup);
+        action.setOperation(operation);
+
+        return action;
+    }
+
+    private Action generateUpdateRemoteProcessGroupAction(final 
RemoteProcessGroup remoteProcessGroup, final Operation operation) {
+        final FlowChangeAction action = createFlowChangeAction();
+        action.setSourceId(remoteProcessGroup.getIdentifier());
+        action.setSourceName(remoteProcessGroup.getName());
+        action.setSourceType(Component.RemoteProcessGroup);
+        action.setOperation(operation);
+
+        return action;
+    }
+
     private Action generateUpdateConnectableAction(final Connectable 
connectable, final Operation operation, final Component component) {
         final FlowChangeAction action = createFlowChangeAction();
         action.setSourceId(connectable.getIdentifier());
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
index 0e8d78b1a3..3d5f5bd1ac 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java
@@ -34,6 +34,9 @@ import 
org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceNode;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.StatelessGroupNode;
+import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.web.dao.impl.StandardProcessGroupDAO;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -73,10 +76,13 @@ import static org.mockito.Mockito.when;
 public class TestProcessGroupAuditor {
 
     private static final String PG_1 = "processGroup1";
+    private static final String PG_2 = "processGroup2";
     private static final String PROC_1 = "processor1";
     private static final String PROC_2 = "processor2";
     private static final String INPUT_PORT = "inputPort";
     private static final String OUTPUT_PORT = "outputPort";
+    private static final String REMOTE_PG = "remotePG";
+    private static final String REMOTE_GROUP_INPUT_PORT = 
"remoteGroupInputPort";
     private static final String CS_1 = "controllerService1";
     private static final String USER_ID = "user-id";
 
@@ -117,26 +123,137 @@ public class TestProcessGroupAuditor {
     void testVerifyStartProcessGroupAuditing() {
         final ProcessorNode processor1 = mock(StandardProcessorNode.class);
         final ProcessorNode processor2 = mock(StandardProcessorNode.class);
+        final Port inputPort = mock(Port.class);
+        final Port outputPort = mock(Port.class);
+        final ProcessGroup innerProcessGroup = mock(ProcessGroup.class);
+        final StatelessGroupNode statelessGroup = 
mock(StatelessGroupNode.class);
+        final RemoteProcessGroup remoteProcessGroup = 
mock(RemoteProcessGroup.class);
+        final RemoteGroupPort remoteGroupInputPort = 
mock(RemoteGroupPort.class);
+
+        when(processor1.getName()).thenReturn(PROC_1);
         when(processor1.getProcessGroup()).thenReturn(processGroup);
-        when(processor2.getProcessGroup()).thenReturn(processGroup);
         
when(processor1.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
+
+        when(processor2.getName()).thenReturn(PROC_2);
+        when(processor2.getProcessGroup()).thenReturn(processGroup);
         
when(processor2.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
 
+        when(inputPort.getName()).thenReturn(INPUT_PORT);
+        when(inputPort.getProcessGroup()).thenReturn(processGroup);
+        
when(inputPort.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT);
+
+        when(outputPort.getName()).thenReturn(OUTPUT_PORT);
+        when(outputPort.getProcessGroup()).thenReturn(processGroup);
+        
when(outputPort.getConnectableType()).thenReturn(ConnectableType.OUTPUT_PORT);
+
+        when(innerProcessGroup.getName()).thenReturn(PG_2);
+        when(statelessGroup.getProcessGroup()).thenReturn(processGroup);
+        
when(statelessGroup.getConnectableType()).thenReturn(ConnectableType.STATELESS_GROUP);
+
+        when(remoteProcessGroup.getName()).thenReturn(REMOTE_PG);
+        when(remoteGroupInputPort.getProcessGroup()).thenReturn(processGroup);
+        
when(remoteGroupInputPort.getConnectableType()).thenReturn(ConnectableType.REMOTE_INPUT_PORT);
+
+        when(processGroup.findProcessor(PROC_1)).thenReturn(processor1);
+        when(processGroup.findProcessor(PROC_2)).thenReturn(processor2);
+
+        when(processGroup.findProcessor(INPUT_PORT)).thenReturn(null);
+        when(processGroup.findInputPort(INPUT_PORT)).thenReturn(inputPort);
+
+        when(processGroup.findProcessor(OUTPUT_PORT)).thenReturn(null);
+        when(processGroup.findInputPort(OUTPUT_PORT)).thenReturn(null);
+        when(processGroup.findOutputPort(OUTPUT_PORT)).thenReturn(outputPort);
+
+        when(processGroup.findProcessor(PG_2)).thenReturn(null);
+        when(processGroup.findInputPort(PG_2)).thenReturn(null);
+        when(processGroup.findOutputPort(PG_2)).thenReturn(null);
+        
when(processGroup.findProcessGroup(PG_2)).thenReturn(innerProcessGroup);
+
+        
when(processGroup.findProcessor(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+        
when(processGroup.findInputPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+        
when(processGroup.findOutputPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+        
when(processGroup.findProcessGroup(REMOTE_GROUP_INPUT_PORT)).thenReturn(null);
+        
when(processGroup.findRemoteGroupPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(remoteGroupInputPort);
+        
when(remoteGroupInputPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);
+
         when(flowManager.getGroup(eq(PG_1))).thenReturn(processGroup);
         when(flowManager.findConnectable(eq(PROC_1))).thenReturn(processor1);
         when(flowManager.findConnectable(eq(PROC_2))).thenReturn(processor2);
+        
when(flowManager.findConnectable(eq(INPUT_PORT))).thenReturn(inputPort);
+        
when(flowManager.findConnectable(eq(OUTPUT_PORT))).thenReturn(outputPort);
+        when(flowManager.findConnectable(eq(PG_2))).thenReturn(statelessGroup);
+        
when(flowManager.findConnectable(eq(REMOTE_GROUP_INPUT_PORT))).thenReturn(remoteGroupInputPort);
+
         when(flowController.getFlowManager()).thenReturn(flowManager);
 
-        processGroupDAO.scheduleComponents(PG_1, ScheduledState.RUNNING, new 
HashSet<>(Arrays.asList(PROC_1, PROC_2)));
+        processGroupDAO.scheduleComponents(PG_1, ScheduledState.RUNNING, new 
HashSet<>(Arrays.asList(PROC_1, PG_2, PROC_2, INPUT_PORT, 
REMOTE_GROUP_INPUT_PORT, OUTPUT_PORT)));
+
+        verify(auditService, 
times(2)).addActions(argumentCaptorActions.capture());
+        final List<List<Action>> actions = 
argumentCaptorActions.getAllValues();
+        assertEquals(2, actions.size());
+        final Iterator<List<Action>> actionsIterator = actions.iterator();
 
-        verify(auditService).addActions(argumentCaptorActions.capture());
-        final List<Action> actions = argumentCaptorActions.getValue();
-        assertEquals(1, actions.size());
-        final Action action = actions.getFirst();
-        assertInstanceOf(FlowChangeAction.class, action);
-        assertEquals(USER_ID, action.getUserIdentity());
-        assertEquals("ProcessGroup", action.getSourceType().name());
-        assertEquals(Operation.Start, action.getOperation());
+        // pg started
+        final List<Action> pgActions = actionsIterator.next();
+        assertEquals(1, pgActions.size());
+        final Action pgAction = pgActions.iterator().next();
+        assertInstanceOf(FlowChangeAction.class, pgAction);
+        assertEquals(USER_ID, pgAction.getUserIdentity());
+        assertEquals("ProcessGroup", pgAction.getSourceType().name());
+        assertEquals(Operation.Start, pgAction.getOperation());
+
+        List<Action> componentActions = actionsIterator.next();
+        assertEquals(6, componentActions.size());
+
+        componentActions.sort(Comparator.comparing(Action::getSourceName));
+
+        // inputPort started
+        final Iterator<Action> actionIterator = componentActions.iterator();
+        Action componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("InputPort", componentAction.getSourceType().name());
+        assertEquals(INPUT_PORT, componentAction.getSourceName());
+        assertEquals(Operation.Start, componentAction.getOperation());
+
+        // outputPort started
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("OutputPort", componentAction.getSourceType().name());
+        assertEquals(OUTPUT_PORT, componentAction.getSourceName());
+        assertEquals(Operation.Start, componentAction.getOperation());
+
+        // inner stateless pg started
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("ProcessGroup", componentAction.getSourceType().name());
+        assertEquals(PG_2, componentAction.getSourceName());
+        assertEquals(Operation.Start, componentAction.getOperation());
+
+        // processors started
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("Processor", componentAction.getSourceType().name());
+        assertEquals(PROC_1, componentAction.getSourceName());
+        assertEquals(Operation.Start, componentAction.getOperation());
+
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("Processor", componentAction.getSourceType().name());
+        assertEquals(PROC_2, componentAction.getSourceName());
+        assertEquals(Operation.Start, componentAction.getOperation());
+
+        // remote PG started
+        componentAction = actionIterator.next();
+        assertInstanceOf(FlowChangeAction.class, componentAction);
+        assertEquals(USER_ID, componentAction.getUserIdentity());
+        assertEquals("RemoteProcessGroup", 
componentAction.getSourceType().name());
+        assertEquals(REMOTE_PG, componentAction.getSourceName());
+        assertEquals(Operation.Start, componentAction.getOperation());
     }
 
     @Test

Reply via email to