This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9cf57139f2 NIFI-14948 - Adding null safe guarding when working on a
set of flow differences (#10283)
9cf57139f2 is described below
commit 9cf57139f251370e61aa26ec1fc44304d2fb3393
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Sep 9 17:28:42 2025 +0200
NIFI-14948 - Adding null safe guarding when working on a set of flow
differences (#10283)
---
.../apache/nifi/util/FlowDifferenceFilters.java | 5 ++
.../nifi/util/TestFlowDifferenceFilters.java | 16 ++++-
.../serialization/AffectedComponentSet.java | 6 ++
.../serialization/TestAffectedComponentSet.java | 16 +++++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 1 +
.../nifi/web/StandardNiFiServiceFacadeTest.java | 76 +++++++++++++++++++++-
6 files changed, 117 insertions(+), 3 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index e9dbd8ee84..09f46b9444 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -321,6 +321,11 @@ public class FlowDifferenceFilters {
return false;
}
+ // If there is no local component, this is a change affecting only the
proposed flow.
+ if (fd.getComponentA() == null) {
+ return false;
+ }
+
if (fd.getComponentA().getComponentType() ==
ComponentType.CONTROLLER_SERVICE) {
return true;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
index 9ee689dd53..7868979685 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestFlowDifferenceFilters.java
@@ -23,8 +23,10 @@ import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardFlowDifference;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -140,5 +142,17 @@ public class TestFlowDifferenceFilters {
assertTrue(FlowDifferenceFilters.isScheduledStateNew(flowDifference));
}
-}
+ @Test
+ public void testIsLocalScheduleStateChangeWithNullComponentADoesNotNPE() {
+ // Simulate DEEP comparison producing a scheduled state change for a
newly added component (no local A)
+ final FlowDifference flowDifference =
Mockito.mock(FlowDifference.class);
+
Mockito.when(flowDifference.getDifferenceType()).thenReturn(DifferenceType.SCHEDULED_STATE_CHANGED);
+ Mockito.when(flowDifference.getComponentA()).thenReturn(null);
+ Mockito.when(flowDifference.getValueA()).thenReturn("RUNNING");
+ Mockito.when(flowDifference.getValueB()).thenReturn("RUNNING");
+
+ // Should not throw and should return false since no local component
+
assertFalse(FlowDifferenceFilters.isLocalScheduleStateChange(flowDifference));
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
index f3f7356b48..5c51b2c7bd 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
@@ -379,6 +379,12 @@ public class AffectedComponentSet {
}
}
+ // With DEEP comparison, configuration differences for newly added
components may reference a null local component (Component A).
+ // These do not affect any existing local components, so ignore them.
+ if (difference.getComponentA() == null) {
+ return;
+ }
+
if (differenceType == DifferenceType.COMPONENT_REMOVED &&
difference.getComponentA().getComponentType() == ComponentType.PROCESS_GROUP) {
// If a Process Group is removed, we need to consider any
component within the Process Group as affected also
addAllComponentsWithinGroup(difference.getComponentA().getInstanceIdentifier());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
index 69db20bd76..23bb919ba9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
@@ -20,6 +20,8 @@ package org.apache.nifi.controller.serialization;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -83,4 +85,18 @@ public class TestAffectedComponentSet {
assertEquals(2, setA.getComponentCount());
assertEquals(8, removed.getComponentCount());
}
+
+ @Test
+ public void testAddAffectedComponentsWithNullComponentADoesNotNPE() {
+ final AffectedComponentSet set = new AffectedComponentSet(controller);
+
+ // Simulate a DEEP comparison config difference on an added Process
Group
+ final FlowDifference difference = Mockito.mock(FlowDifference.class);
+
when(difference.getDifferenceType()).thenReturn(DifferenceType.EXECUTION_ENGINE_CHANGED);
+ when(difference.getComponentA()).thenReturn(null); // Newly added PG
=> local component is null
+
+ // Should not throw and should not add any local components
+ set.addAffectedComponents(difference);
+ assertEquals(0, set.getComponentCount());
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 1d11e7902f..16f83e1f9b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5760,6 +5760,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final Set<AffectedComponentEntity> affectedComponents =
comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() !=
DifferenceType.COMPONENT_ADDED) // components that are added are not components
that will be affected in the local flow.
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
+ .filter(difference -> difference.getComponentA() != null) // a
difference that would not affect a local component
.filter(diff ->
!FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
.filter(diff ->
!FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff,
proposedFlow.getContents(), flowManager))
.filter(diff ->
!FlowDifferenceFilters.isScheduledStateNew(diff))
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 5344f8ec15..49941a7233 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -45,10 +45,13 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
+import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.flow.VersionedReportingTask;
@@ -64,6 +67,13 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.flow.FlowRegistryUtil;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparatorVersionedStrategy;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
@@ -87,6 +97,7 @@ import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.CopyRequestEntity;
import org.apache.nifi.web.api.entity.CopyResponseEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
@@ -125,6 +136,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -143,10 +155,10 @@ import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.same;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -298,6 +310,66 @@ public class StandardNiFiServiceFacadeTest {
}
+ @Test
+ public void
testGetComponentsAffectedByFlowUpdate_WithNewStatelessProcessGroup_ReproducesNPE()
{
+ final String groupId = UUID.randomUUID().toString();
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
+
when(processGroup.getAncestorServiceIds()).thenReturn(Collections.emptySet());
+
+ final FlowManager flowManager = mock(FlowManager.class);
+ final ExtensionManager extensionManager = mock(ExtensionManager.class);
+ when(flowController.getFlowManager()).thenReturn(flowManager);
+
when(flowController.getExtensionManager()).thenReturn(extensionManager);
+
+ final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade);
+ final NiFiRegistryFlowMapper flowMapper =
mock(NiFiRegistryFlowMapper.class);
+
doReturn(flowMapper).when(serviceFacadeSpy).makeNiFiRegistryFlowMapper(extensionManager);
+
+ final InstantiatedVersionedProcessGroup localRoot = new
InstantiatedVersionedProcessGroup("local-root-instance", groupId);
+ when(flowMapper.mapProcessGroup(any(ProcessGroup.class),
any(ControllerServiceProvider.class), any(FlowManager.class), eq(true)))
+ .thenReturn(localRoot);
+
+ // Build proposed (updated) flow with a NEW child Process Group
configured with Stateless Execution Engine
+ final VersionedProcessGroup proposedRoot = new VersionedProcessGroup();
+ proposedRoot.setIdentifier("root");
+ proposedRoot.setName("root");
+
+ final VersionedProcessGroup newChild = new VersionedProcessGroup();
+ newChild.setIdentifier("child");
+ newChild.setName("child");
+ newChild.setGroupIdentifier("root");
+ newChild.setExecutionEngine(ExecutionEngine.STATELESS);
+ proposedRoot.getProcessGroups().add(newChild);
+
+ final RegisteredFlowSnapshot updatedSnapshot = new
RegisteredFlowSnapshot();
+ updatedSnapshot.setFlowContents(proposedRoot);
+
+ final ComparableDataFlow localFlow = new
StandardComparableDataFlow("Current Flow", localRoot);
+ final ComparableDataFlow proposedFlow = new
StandardComparableDataFlow("New Flow", proposedRoot);
+ final FlowComparator flowComparator = new StandardFlowComparator(
+ localFlow,
+ proposedFlow,
+ Collections.emptySet(),
+ new StaticDifferenceDescriptor(),
+ Function.identity(),
+ VersionedComponent::getIdentifier,
+ FlowComparatorVersionedStrategy.DEEP);
+
+ final org.apache.nifi.registry.flow.diff.FlowComparison comparison =
flowComparator.compare();
+ final boolean hasExecEngineChange =
comparison.getDifferences().stream()
+ .anyMatch(d -> d.getDifferenceType() ==
DifferenceType.EXECUTION_ENGINE_CHANGED
+ && d.getComponentA() == null
+ && d.getComponentB() instanceof VersionedProcessGroup
+ && "child".equals(d.getComponentB().getIdentifier()));
+ assertTrue(hasExecEngineChange, "Expected EXECUTION_ENGINE_CHANGED
difference for Stateless child group");
+
+ // Act: Should not throw after fix; no local components are affected
by a new Stateless child group
+ final Set<AffectedComponentEntity> affected =
serviceFacadeSpy.getComponentsAffectedByFlowUpdate(groupId, updatedSnapshot);
+ assertNotNull(affected);
+ assertTrue(affected.isEmpty(), "No local components should be affected
for added Stateless group");
+ }
+
private FlowChangeAction getAction(final Integer actionId, final String
processorId) {
final FlowChangeAction action = new FlowChangeAction();
action.setId(actionId);
@@ -1377,4 +1449,4 @@ public class StandardNiFiServiceFacadeTest {
verify(controllerFacade, times(1)).resetAllCounters();
verify(dtoFactory, times(1)).createCountersDto(any());
}
-}
\ No newline at end of file
+}