This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3c174764e2 NIFI-15394 Included scoped Controller Services when
Clearing Bulletins for Process Groups (#10699)
3c174764e2 is described below
commit 3c174764e2c01a9c9053ba78df94f41d824308da
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Dec 30 17:26:05 2025 +0100
NIFI-15394 Included scoped Controller Services when Clearing Bulletins for
Process Groups (#10699)
Signed-off-by: David Handermann <[email protected]>
---
.../java/org/apache/nifi/web/api/FlowResource.java | 23 +++++-
.../org/apache/nifi/web/api/TestFlowResource.java | 89 +++++++++++++++++++++-
2 files changed, 108 insertions(+), 4 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 29f487fdf8..32f9c513a1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -1318,6 +1318,13 @@ public class FlowResource extends ApplicationResource {
.collect(Collectors.toSet())
);
+ // Collect Controller Service IDs to distinguish them from local
connectables during authorization
+ final Set<String> controllerServiceIds =
serviceFacade.filterComponents(id, group ->
+ group.findAllControllerServices().stream()
+ .map(cs -> cs.getIdentifier())
+ .collect(Collectors.toSet())
+ );
+
// if the components are not specified, gather all authorized
components
if (clearBulletinsForGroupRequestEntity.getComponents() == null) {
// get component IDs that the user has write access to
@@ -1344,6 +1351,11 @@ public class FlowResource extends ApplicationResource {
.filter(remoteProcessGroup ->
remoteProcessGroup.isAuthorized(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser()))
.forEach(remoteProcessGroup ->
componentIds.add(remoteProcessGroup.getIdentifier()));
+ // find all controller services with write permissions
+ group.findAllControllerServices().stream()
+ .filter(controllerService ->
controllerService.isAuthorized(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser()))
+ .forEach(controllerService ->
componentIds.add(controllerService.getIdentifier()));
+
return componentIds;
});
@@ -1367,9 +1379,14 @@ public class FlowResource extends ApplicationResource {
// ensure access to every component being cleared
final Set<String> requestComponentsToClear =
clearBulletinsForGroupRequestEntity.getComponents();
requestComponentsToClear.forEach(componentId -> {
- final Authorizable authorizable =
remoteProcessGroupIds.contains(componentId)
- ? lookup.getRemoteProcessGroup(componentId)
- : lookup.getLocalConnectable(componentId);
+ final Authorizable authorizable;
+ if (remoteProcessGroupIds.contains(componentId)) {
+ authorizable =
lookup.getRemoteProcessGroup(componentId);
+ } else if (controllerServiceIds.contains(componentId))
{
+ authorizable =
lookup.getControllerService(componentId).getAuthorizable();
+ } else {
+ authorizable =
lookup.getLocalConnectable(componentId);
+ }
authorizable.authorize(authorizer,
RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
},
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
index 6724dbfdd2..ed3135e20e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
@@ -28,11 +28,14 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.DisabledServiceValidationResult;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.prometheusutil.BulletinMetricsRegistry;
import org.apache.nifi.prometheusutil.ClusterMetricsRegistry;
@@ -47,6 +50,7 @@ import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
import org.apache.nifi.web.api.dto.DifferenceDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ClearBulletinsForGroupRequestEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.request.FlowMetricsProducer;
import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
@@ -67,6 +71,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -90,6 +95,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -515,6 +521,87 @@ public class TestFlowResource {
verify(serviceFacade, never()).authorizeAccess(any());
}
+ @Test
+ public void testClearBulletinsIncludesControllerServices() {
+ // This test verifies that when clearing bulletins for a process group,
+ // controller services are included in the components to clear
+
+ final ClearBulletinsForGroupRequestEntity entity = new
ClearBulletinsForGroupRequestEntity();
+ entity.setId(PROCESS_GROUP_ID);
+ entity.setFromTimestamp(Instant.now());
+ // components is null, so clearBulletins will gather all authorized
components
+
+ when(niFiProperties.isNode()).thenReturn(false);
+ resource.httpServletRequest = new MockHttpServletRequest();
+
+ // Capture the function passed to filterComponents (called 3 times)
+ final ArgumentCaptor<Function<ProcessGroup, Set<String>>> filterCaptor
= ArgumentCaptor.captor();
+ when(serviceFacade.filterComponents(eq(PROCESS_GROUP_ID),
filterCaptor.capture())).thenReturn(Set.of());
+
+ final Response response = resource.clearBulletins(PROCESS_GROUP_ID,
entity);
+
+ assertNotNull(response);
+
+ // Get the third captured function (the one that gathers writable
component IDs)
+ // First call: RPG IDs, Second call: Controller Service IDs, Third
call: writable component IDs
+ final List<Function<ProcessGroup, Set<String>>> capturedFunctions =
filterCaptor.getAllValues();
+ assertEquals(3, capturedFunctions.size(), "filterComponents should be
called 3 times");
+
+ final Function<ProcessGroup, Set<String>> writableComponentsFunction =
capturedFunctions.get(2);
+
+ // Create a mock process group with processors, ports, and controller
services
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
+ // Mock a processor with write permissions
+ final ProcessorNode processor = mock(ProcessorNode.class);
+ when(processor.getIdentifier()).thenReturn("processor-1");
+ when(processor.isAuthorized(any(), any(), any())).thenReturn(true);
+
+ // Mock an input port with write permissions
+ final Port inputPort = mock(Port.class);
+ when(inputPort.getIdentifier()).thenReturn("input-port-1");
+ when(inputPort.isAuthorized(any(), any(), any())).thenReturn(true);
+
+ // Mock an output port with write permissions
+ final Port outputPort = mock(Port.class);
+ when(outputPort.getIdentifier()).thenReturn("output-port-1");
+ when(outputPort.isAuthorized(any(), any(), any())).thenReturn(true);
+
+ // Mock a remote process group
+ final RemoteProcessGroup remoteProcessGroup =
mock(RemoteProcessGroup.class);
+ when(remoteProcessGroup.getIdentifier()).thenReturn("rpg-1");
+ when(remoteProcessGroup.isAuthorized(any(), any(),
any())).thenReturn(true);
+
+ // Mock a controller service with write permissions
+ final ControllerServiceNode controllerService =
mock(ControllerServiceNode.class);
+
when(controllerService.getIdentifier()).thenReturn("controller-service-1");
+ when(controllerService.isAuthorized(any(), any(),
any())).thenReturn(true);
+
+ // Mock another controller service without write permissions (should
be excluded)
+ final ControllerServiceNode unauthorizedControllerService =
mock(ControllerServiceNode.class);
+
lenient().when(unauthorizedControllerService.getIdentifier()).thenReturn("controller-service-2");
+ when(unauthorizedControllerService.isAuthorized(any(), any(),
any())).thenReturn(false);
+
+ // Set up the process group to return all components
+ when(processGroup.findAllProcessors()).thenReturn(List.of(processor));
+ when(processGroup.findAllInputPorts()).thenReturn(List.of(inputPort));
+
when(processGroup.findAllOutputPorts()).thenReturn(List.of(outputPort));
+
when(processGroup.findAllRemoteProcessGroups()).thenReturn(List.of(remoteProcessGroup));
+
when(processGroup.findAllControllerServices()).thenReturn(Set.of(controllerService,
unauthorizedControllerService));
+
+ // Apply the captured function to our mock process group
+ final Set<String> componentIds =
writableComponentsFunction.apply(processGroup);
+
+ // Verify that all authorized components are included
+ assertTrue(componentIds.contains("processor-1"), "Processor should be
included");
+ assertTrue(componentIds.contains("input-port-1"), "Input port should
be included");
+ assertTrue(componentIds.contains("output-port-1"), "Output port should
be included");
+ assertTrue(componentIds.contains("rpg-1"), "Remote process group
should be included");
+ assertTrue(componentIds.contains("controller-service-1"), "Authorized
controller service should be included");
+ assertFalse(componentIds.contains("controller-service-2"),
"Unauthorized controller service should be excluded");
+ assertEquals(5, componentIds.size(), "Should have exactly 5 authorized
components");
+ }
+
private void setUpGetVersionDifference() {
doReturn(getDifferences()).when(serviceFacade).getVersionDifference(anyString(),
any(FlowVersionLocation.class), any(FlowVersionLocation.class));
}
@@ -734,4 +821,4 @@ public class TestFlowResource {
return new Sample(name, labelNames, labelValues, value);
}
}
-}
\ No newline at end of file
+}