This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/NIFI-15258 by this push:
     new 304b95d159 NIFI-15640: NPE in the applyFlow should NOT crash the 
runtime. (#10931)
304b95d159 is described below

commit 304b95d15949ac0055616be7514f0576f8a1a175
Author: Bob Paulin <[email protected]>
AuthorDate: Tue Feb 24 09:51:48 2026 -0600

    NIFI-15640: NPE in the applyFlow should NOT crash the runtime. (#10931)
    
    * NIFI-15640: NPE in the applyFlow should NOT crash the runtime.
    
    * Leave the flow in a state of UPDATE_FAILED
    * With validationState of INVALID.
    
    * NIFI-15640: NPE in the applyFlow should NOT crash the runtime.
    
    * Code Review Feedback
---
 .../nifi/components/connector/ConnectorNode.java   |  9 +++++
 .../connector/StandardConnectorNode.java           |  9 +++++
 .../connector/StandardConnectorRepository.java     |  1 +
 .../serialization/VersionedFlowSynchronizer.java   |  5 +--
 .../connector/TestStandardConnectorRepository.java | 38 ++++++++++++++++++++++
 5 files changed, 58 insertions(+), 4 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index 6cafcbbaf1..93e90c1adb 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -263,6 +263,15 @@ public interface ConnectorNode extends 
ComponentAuthorizable, VersionedComponent
     void inheritConfiguration(List<VersionedConfigurationStep> 
activeFlowConfiguration, List<VersionedConfigurationStep> 
workingFlowConfiguration,
         Bundle flowContextBundle) throws FlowUpdateException;
 
+    /**
+     * Marks the connector as invalid with the given subject and explanation. 
This is used when a flow update
+     * fails during initialization or flow synchronization to indicate the 
connector cannot operate.
+     *
+     * @param subject the subject of the validation failure
+     * @param explanation the reason the connector is invalid
+     */
+    void markInvalid(final String subject, final String explanation);
+
     /**
      * Returns the list of available actions that can be performed on this 
Connector.
      * Each action includes whether it is currently allowed and, if not, the 
reason why.
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 438ea78c64..3496a66908 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -316,6 +316,15 @@ public class StandardConnectorNode implements 
ConnectorNode {
         logger.debug("Aborted update for {}", this);
     }
 
+    @Override
+    public void markInvalid(final String subject, final String explanation) {
+        validationState.set(new ValidationState(ValidationStatus.INVALID, 
List.of(new ValidationResult.Builder()
+                .subject(subject)
+                .valid(false)
+                .explanation(explanation)
+                .build())));
+    }
+
     @Override
     public void setConfiguration(final String stepName, final 
StepConfiguration configuration) throws FlowUpdateException {
         setConfiguration(stepName, configuration, false);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index 2a13f6acde..4abe276c56 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -346,6 +346,7 @@ public class StandardConnectorRepository implements 
ConnectorRepository {
         } catch (final Exception e) {
             logger.error("Failed to inherit configuration for {}", connector, 
e);
             connector.abortUpdate(e);
+            connector.markInvalid("Flow Update Failure", "The flow could not 
be updated: " + e.getMessage());
             throw e;
         }
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index f1e1211f84..df93e2c638 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -1175,9 +1175,6 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
 
         connectorRepository.updateConnector(connectorNode, 
versionedConnector.getName());
 
-        // TODO: We don't want to throw an Exception here. Consider handling 
Connectors first so that we can get all Connectors in a state of
-        // prepareForUpdate. If any fails, we can restore them and throw an 
Exception. We don't want to be throwing an Exception in the middle
-        // of updating the flow.
         try {
             final List<VersionedConfigurationStep> activeFlowConfig = 
versionedConnector.getActiveFlowConfiguration();
             final List<VersionedConfigurationStep> workingFlowConfig = 
versionedConnector.getWorkingFlowConfiguration();
@@ -1190,7 +1187,7 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
                 connectorRepository.stopConnector(connectorNode);
             }
         } catch (final FlowUpdateException e) {
-            throw new RuntimeException(connectorNode + " failed to inherit 
configuration", e);
+            logger.error("{} failed to inherit configuration", connectorNode, 
e);
         }
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
index 5bd9986f72..4089b27481 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
@@ -48,6 +48,7 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
@@ -759,6 +760,43 @@ public class TestStandardConnectorRepository {
         verify(assetManager, never()).deleteAsset(anyString());
     }
 
+    @Test
+    public void 
testInheritConfigurationFailureCallsAbortUpdateAndMarkInvalid() throws 
FlowUpdateException {
+        final StandardConnectorRepository repository = 
createRepositoryWithProvider(null);
+
+        final ConnectorNode connector = mock(ConnectorNode.class);
+        when(connector.getIdentifier()).thenReturn("connector-1");
+        doThrow(new FlowUpdateException("Simulated failure"))
+            .when(connector).inheritConfiguration(any(), any(), any());
+
+        repository.addConnector(connector);
+
+        assertThrows(FlowUpdateException.class, () -> 
repository.inheritConfiguration(connector, List.of(), List.of(), null));
+
+        verify(connector).abortUpdate(any(FlowUpdateException.class));
+        verify(connector).markInvalid(eq("Flow Update Failure"), eq("The flow 
could not be updated: Simulated failure"));
+    }
+
+    @Test
+    public void testApplyUpdateFailureCallsAbortUpdateButNotMarkInvalid() 
throws FlowUpdateException {
+        final AssetManager assetManager = mock(AssetManager.class);
+        when(assetManager.getAssets("connector-1")).thenReturn(List.of());
+        final StandardConnectorRepository repository = 
createRepositoryWithProviderAndAssetManager(null, assetManager);
+
+        final ConnectorNode connector = mock(ConnectorNode.class);
+        when(connector.getIdentifier()).thenReturn("connector-1");
+        when(connector.getDesiredState()).thenReturn(ConnectorState.STOPPED);
+        doThrow(new FlowUpdateException("Simulated failure"))
+            .when(connector).prepareForUpdate();
+
+        repository.addConnector(connector);
+
+        repository.applyUpdate(connector, mock(ConnectorUpdateContext.class));
+
+        verify(connector, 
timeout(5000)).abortUpdate(any(FlowUpdateException.class));
+        verify(connector, never()).markInvalid(anyString(), anyString());
+    }
+
     // --- Helper Methods ---
 
     private StandardConnectorRepository 
createRepositoryWithProviderAndAssetManager(

Reply via email to