This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d276109835 NIFI-15303 Filter Invalid when Enabling Controller Services
d276109835 is described below
commit d27610983553c24f79ffb9c87ec538441ddc938b
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Dec 5 22:12:48 2025 -0600
NIFI-15303 Filter Invalid when Enabling Controller Services
- Updated the activate Controller Services method to enable valid nodes and
nodes that are invalid due to disabled dependent Controller Services
- Avoid enabling Controller Services that are invalid due to other reasons
that require manual intervention
Signed-off-by: Pierre Villard <[email protected]>
This closes #10611.
---
.../java/org/apache/nifi/web/api/FlowResource.java | 24 +++++-
.../org/apache/nifi/web/api/TestFlowResource.java | 91 +++++++++++++++++++++-
2 files changed, 111 insertions(+), 4 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index a53fe83b13..d1406f2bfb 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -64,6 +64,8 @@ import
org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.DisabledServiceValidationResult;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@@ -1176,7 +1178,7 @@ public class FlowResource extends ApplicationResource {
final Predicate<ControllerServiceNode> filter;
if (ControllerServiceState.ENABLED.equals(desiredState)) {
- filter = service -> !service.isActive();
+ filter = this::isControllerServiceNodeEligibleForEnabling;
} else {
filter = ControllerServiceNode::isActive;
}
@@ -1242,6 +1244,26 @@ public class FlowResource extends ApplicationResource {
);
}
+ private boolean isControllerServiceNodeEligibleForEnabling(final
ControllerServiceNode controllerServiceNode) {
+ final boolean eligibleForEnabling;
+
+ if (controllerServiceNode.isActive()) {
+ // Active Controller Services are enabled
+ eligibleForEnabling = false;
+ } else {
+ final Collection<ValidationResult> validationErrors =
controllerServiceNode.getValidationErrors();
+ if (validationErrors == null || validationErrors.isEmpty()) {
+ // VALID or VALIDATING Controller Services can be enabled
+ eligibleForEnabling = true;
+ } else {
+ // INVALID Controller Services can be enabled when Validation
Results are limited to other disabled Controller Services
+ eligibleForEnabling =
validationErrors.stream().allMatch(DisabledServiceValidationResult.class::isInstance);
+ }
+ }
+
+ return eligibleForEnabling;
+ }
+
/**
* Clears bulletins for components in the specified Process Group.
*
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
index b524b91360..40476c4e76 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
@@ -26,8 +26,13 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.DisabledServiceValidationResult;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
+import org.apache.nifi.flow.ExecutionEngine;
+import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.prometheusutil.BulletinMetricsRegistry;
import org.apache.nifi.prometheusutil.ClusterMetricsRegistry;
@@ -36,16 +41,19 @@ import org.apache.nifi.prometheusutil.JvmMetricsRegistry;
import org.apache.nifi.prometheusutil.NiFiMetricsRegistry;
import org.apache.nifi.prometheusutil.PrometheusMetricsUtil;
import org.apache.nifi.registry.flow.FlowVersionLocation;
+import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
import org.apache.nifi.web.api.dto.DifferenceDTO;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.request.FlowMetricsProducer;
import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -53,6 +61,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;
+import org.springframework.mock.web.MockHttpServletRequest;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -64,6 +74,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -74,9 +85,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -106,12 +119,19 @@ public class TestFlowResource {
private static final String SAMPLE_BUCKET_ID_B =
"42998285-d06c-41dd-a757-7a14ab9673f4";
private static final String SAMPLE_FLOW_ID_B =
"e6483662-9226-41c1-adec-10357af97ce2";
+ private static final String PROCESS_GROUP_ID =
"00000000-0000-0000-0000-000000000000";
+ private static final String FIRST_SERVICE_ID =
"00000000-0000-0000-0000-000000000001";
+ private static final String SECOND_SERVICE_ID =
"00000000-0000-0000-0000-000000000002";
+
@InjectMocks
private FlowResource resource = new FlowResource();
@Mock
private NiFiServiceFacade serviceFacade;
+ @Mock
+ private NiFiProperties niFiProperties;
+
@Test
public void testGetFlowMetricsProducerInvalid() {
assertThrows(ResourceNotFoundException.class, () ->
resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null,
null, null, null));
@@ -413,7 +433,73 @@ public class TestFlowResource {
final FlowComparisonEntity entity = (FlowComparisonEntity)
response.getEntity();
final List<DifferenceDTO> differences =
entity.getComponentDifferences().stream().map(ComponentDifferenceDTO::getDifferences).flatMap(Collection::stream).collect(Collectors.toList());
assertEquals(1, differences.size());
- assertEquals(createDifference("Position Changed", "Position was
changed"), differences.get(0));
+ assertEquals(createDifference("Position Changed", "Position was
changed"), differences.getFirst());
+ }
+
+ @Test
+ public void testActivateControllerServicesStateEnabled() {
+ final ActivateControllerServicesEntity entity = new
ActivateControllerServicesEntity();
+ entity.setId(PROCESS_GROUP_ID);
+ entity.setState("ENABLED");
+
+ when(niFiProperties.isNode()).thenReturn(false);
+ resource.httpServletRequest = new MockHttpServletRequest();
+
+ final ArgumentCaptor<Function<ProcessGroup, Set<String>>>
revisionsCaptor = ArgumentCaptor.captor();
+ when(serviceFacade.getRevisionsFromGroup(eq(PROCESS_GROUP_ID),
revisionsCaptor.capture())).thenReturn(Set.of());
+
+ final Response response =
resource.activateControllerServices(PROCESS_GROUP_ID, entity);
+
+ assertNotNull(response);
+
+ final Function<ProcessGroup, Set<String>> revisionsFunction =
revisionsCaptor.getValue();
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
+ // Set Process Group properties required for standard inclusion
+
when(processGroup.resolveExecutionEngine()).thenReturn(ExecutionEngine.STANDARD);
+
+ // Mock inactive Controller Service for inclusion
+ final ControllerServiceNode inactiveControllerServiceNode =
mock(ControllerServiceNode.class);
+ when(inactiveControllerServiceNode.isActive()).thenReturn(false);
+
when(inactiveControllerServiceNode.getIdentifier()).thenReturn(FIRST_SERVICE_ID);
+
when(inactiveControllerServiceNode.getProcessGroup()).thenReturn(processGroup);
+ when(inactiveControllerServiceNode.isAuthorized(any(), any(),
any())).thenReturn(true);
+
+ // Mock disabled and otherwise valid Controller Service for inclusion
+ final ControllerServiceNode disabledControllerServiceNode =
mock(ControllerServiceNode.class);
+ when(disabledControllerServiceNode.isActive()).thenReturn(false);
+
when(disabledControllerServiceNode.getIdentifier()).thenReturn(SECOND_SERVICE_ID);
+
when(disabledControllerServiceNode.getProcessGroup()).thenReturn(processGroup);
+ when(disabledControllerServiceNode.isAuthorized(any(), any(),
any())).thenReturn(true);
+ final Collection<ValidationResult> disabledValidationErrors = List.of(
+ new DisabledServiceValidationResult("Controller Service",
SECOND_SERVICE_ID)
+ );
+
when(disabledControllerServiceNode.getValidationErrors()).thenReturn(disabledValidationErrors);
+
+ // Mock invalid Controller Service for exclusion
+ final ControllerServiceNode invalidControllerServiceNode =
mock(ControllerServiceNode.class);
+ when(invalidControllerServiceNode.isActive()).thenReturn(false);
+ final Collection<ValidationResult> excludedValidationErrors = List.of(
+ new ValidationResult.Builder().valid(false).build()
+ );
+
when(invalidControllerServiceNode.getValidationErrors()).thenReturn(excludedValidationErrors);
+
+ // Prepare Controller Service Nodes for evaluation
+ final Set<ControllerServiceNode> controllerServiceNodes = Set.of(
+ inactiveControllerServiceNode,
+ disabledControllerServiceNode,
+ invalidControllerServiceNode
+ );
+
when(processGroup.findAllControllerServices()).thenReturn(controllerServiceNodes);
+
+ final Set<String> serviceIds = revisionsFunction.apply(processGroup);
+ assertNotNull(serviceIds);
+
+ final Set<String> expectedServicesIds = Set.of(
+ FIRST_SERVICE_ID,
+ SECOND_SERVICE_ID
+ );
+ assertEquals(expectedServicesIds, serviceIds);
}
private void setUpGetVersionDifference() {
@@ -465,8 +551,7 @@ public class TestFlowResource {
final StreamingOutput streamingOutput = (StreamingOutput)
response.getEntity();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
streamingOutput.write(outputStream);
- final byte[] outputBytes = outputStream.toByteArray();
- return new String(outputBytes, StandardCharsets.UTF_8);
+ return outputStream.toString(StandardCharsets.UTF_8);
}
private List<CollectorRegistry> getCollectorRegistries(boolean
includeProcessorPerfStatus) {