This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 304b95d159 NIFI-15640: NPE in the applyFlow should NOT crash the
runtime. (#10931)
304b95d159 is described below
commit 304b95d15949ac0055616be7514f0576f8a1a175
Author: Bob Paulin <[email protected]>
AuthorDate: Tue Feb 24 09:51:48 2026 -0600
NIFI-15640: NPE in the applyFlow should NOT crash the runtime. (#10931)
* NIFI-15640: NPE in the applyFlow should NOT crash the runtime.
* Leave the flow in a state of UPDATE_FAILED
* With validationState of INVALID.
* NIFI-15640: NPE in the applyFlow should NOT crash the runtime.
* Code Review Feedback
---
.../nifi/components/connector/ConnectorNode.java | 9 +++++
.../connector/StandardConnectorNode.java | 9 +++++
.../connector/StandardConnectorRepository.java | 1 +
.../serialization/VersionedFlowSynchronizer.java | 5 +--
.../connector/TestStandardConnectorRepository.java | 38 ++++++++++++++++++++++
5 files changed, 58 insertions(+), 4 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
index 6cafcbbaf1..93e90c1adb 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java
@@ -263,6 +263,15 @@ public interface ConnectorNode extends
ComponentAuthorizable, VersionedComponent
void inheritConfiguration(List<VersionedConfigurationStep>
activeFlowConfiguration, List<VersionedConfigurationStep>
workingFlowConfiguration,
Bundle flowContextBundle) throws FlowUpdateException;
+ /**
+ * Marks the connector as invalid with the given subject and explanation.
This is used when a flow update
+ * fails during initialization or flow synchronization to indicate the
connector cannot operate.
+ *
+ * @param subject the subject of the validation failure
+ * @param explanation the reason the connector is invalid
+ */
+ void markInvalid(final String subject, final String explanation);
+
/**
* Returns the list of available actions that can be performed on this
Connector.
* Each action includes whether it is currently allowed and, if not, the
reason why.
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 438ea78c64..3496a66908 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -316,6 +316,15 @@ public class StandardConnectorNode implements
ConnectorNode {
logger.debug("Aborted update for {}", this);
}
+ @Override
+ public void markInvalid(final String subject, final String explanation) {
+ validationState.set(new ValidationState(ValidationStatus.INVALID,
List.of(new ValidationResult.Builder()
+ .subject(subject)
+ .valid(false)
+ .explanation(explanation)
+ .build())));
+ }
+
@Override
public void setConfiguration(final String stepName, final
StepConfiguration configuration) throws FlowUpdateException {
setConfiguration(stepName, configuration, false);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index 2a13f6acde..4abe276c56 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -346,6 +346,7 @@ public class StandardConnectorRepository implements
ConnectorRepository {
} catch (final Exception e) {
logger.error("Failed to inherit configuration for {}", connector,
e);
connector.abortUpdate(e);
+ connector.markInvalid("Flow Update Failure", "The flow could not
be updated: " + e.getMessage());
throw e;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index f1e1211f84..df93e2c638 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -1175,9 +1175,6 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
connectorRepository.updateConnector(connectorNode,
versionedConnector.getName());
- // TODO: We don't want to throw an Exception here. Consider handling
Connectors first so that we can get all Connectors in a state of
- // prepareForUpdate. If any fails, we can restore them and throw an
Exception. We don't want to be throwing an Exception in the middle
- // of updating the flow.
try {
final List<VersionedConfigurationStep> activeFlowConfig =
versionedConnector.getActiveFlowConfiguration();
final List<VersionedConfigurationStep> workingFlowConfig =
versionedConnector.getWorkingFlowConfiguration();
@@ -1190,7 +1187,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
connectorRepository.stopConnector(connectorNode);
}
} catch (final FlowUpdateException e) {
- throw new RuntimeException(connectorNode + " failed to inherit
configuration", e);
+ logger.error("{} failed to inherit configuration", connectorNode,
e);
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
index 5bd9986f72..4089b27481 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
@@ -48,6 +48,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@@ -759,6 +760,43 @@ public class TestStandardConnectorRepository {
verify(assetManager, never()).deleteAsset(anyString());
}
+ @Test
+ public void
testInheritConfigurationFailureCallsAbortUpdateAndMarkInvalid() throws
FlowUpdateException {
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(null);
+
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn("connector-1");
+ doThrow(new FlowUpdateException("Simulated failure"))
+ .when(connector).inheritConfiguration(any(), any(), any());
+
+ repository.addConnector(connector);
+
+ assertThrows(FlowUpdateException.class, () ->
repository.inheritConfiguration(connector, List.of(), List.of(), null));
+
+ verify(connector).abortUpdate(any(FlowUpdateException.class));
+ verify(connector).markInvalid(eq("Flow Update Failure"), eq("The flow
could not be updated: Simulated failure"));
+ }
+
+ @Test
+ public void testApplyUpdateFailureCallsAbortUpdateButNotMarkInvalid()
throws FlowUpdateException {
+ final AssetManager assetManager = mock(AssetManager.class);
+ when(assetManager.getAssets("connector-1")).thenReturn(List.of());
+ final StandardConnectorRepository repository =
createRepositoryWithProviderAndAssetManager(null, assetManager);
+
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn("connector-1");
+ when(connector.getDesiredState()).thenReturn(ConnectorState.STOPPED);
+ doThrow(new FlowUpdateException("Simulated failure"))
+ .when(connector).prepareForUpdate();
+
+ repository.addConnector(connector);
+
+ repository.applyUpdate(connector, mock(ConnectorUpdateContext.class));
+
+ verify(connector,
timeout(5000)).abortUpdate(any(FlowUpdateException.class));
+ verify(connector, never()).markInvalid(anyString(), anyString());
+ }
+
// --- Helper Methods ---
private StandardConnectorRepository
createRepositoryWithProviderAndAssetManager(