This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2fa98c2777 NIFI-14693: Restoring Component State in StatelessDataflow
API
2fa98c2777 is described below
commit 2fa98c277752d03b5affd3426a5f2d3d3fe9fb59
Author: Joe Gresock <[email protected]>
AuthorDate: Wed Jun 25 21:54:08 2025 -0400
NIFI-14693: Restoring Component State in StatelessDataflow API
Signed-off-by: Pierre Villard <[email protected]>
This closes #10049.
---
.../nifi/stateless/flow/StatelessDataflow.java | 8 ++++
.../nifi/stateless/flow/StandardStatelessFlow.java | 47 ++++++++++++++++++++++
2 files changed, 55 insertions(+)
diff --git
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index 9ae5efc882..758b153371 100644
---
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -94,10 +94,18 @@ public interface StatelessDataflow {
boolean isFlowFileQueued();
+ /**
+ *
+ * @return True if there are any processors in the dataflow with the
{@link org.apache.nifi.annotation.behavior.Stateful} annotation
+ */
+ boolean isStateful();
+
void purge();
Map<String, String> getComponentStates(Scope scope);
+ void setComponentStates(Map<String, String> componentStates, Scope scope);
+
BulletinRepository getBulletinRepository();
OptionalLong getCounter(String componentId, String counterName);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 215a717d41..2aa2d2a8d9 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -51,6 +51,7 @@ import
org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
@@ -135,6 +136,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
private volatile ExecutorService runDataflowExecutor;
private volatile ScheduledExecutorService backgroundTaskExecutor;
private volatile boolean initialized = false;
+ private volatile Boolean stateful = null;
private volatile boolean shutdown = false;
private volatile boolean manageControllerServices = true;
@@ -711,6 +713,17 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
}
+ @Override
+ public boolean isStateful() {
+ if (stateful == null) {
+ final boolean hasStatefulReportingTask =
reportingTasks.stream().anyMatch(this::isStateful);
+ if (hasStatefulReportingTask) {
+ return true;
+ }
+ stateful = isStateful(rootGroup);
+ }
+ return stateful;
+ }
private boolean isStateful(final ProcessGroup processGroup) {
final boolean hasStatefulProcessor =
processGroup.getProcessors().stream().anyMatch(this::isStateful);
@@ -873,6 +886,40 @@ public class StandardStatelessFlow implements
StatelessDataflow {
return serializedStateMaps;
}
+ @Override
+ public void setComponentStates(final Map<String, String> componentStates,
final Scope scope) {
+ final Map<String, StateMap> stateMaps =
deserializeStateMaps(componentStates);
+ stateManagerProvider.updateComponentsStates(stateMaps, scope);
+ }
+
+ private Map<String, StateMap> deserializeStateMaps(final Map<String,
String> componentStates) {
+ if (componentStates == null) {
+ return Collections.emptyMap();
+ }
+
+ final Map<String, StateMap> deserializedStateMaps = new HashMap<>();
+
+ for (final Map.Entry<String, String> entry :
componentStates.entrySet()) {
+ final String componentId = entry.getKey();
+ final String serialized = entry.getValue();
+
+ final SerializableStateMap deserialized;
+ try {
+ deserialized = objectMapper.readValue(serialized,
SerializableStateMap.class);
+ } catch (final Exception e) {
+ // We want to avoid throwing an Exception here because if we
do, we may never be able to run the flow again, at least not without
+ // destroying all state that exists for the component. Would
be better to simply skip the state for this component
+ logger.error("Failed to deserialized components' state for
component with ID {}. State will be reset to empty", componentId, e);
+ continue;
+ }
+
+ final StateMap stateMap = new
StandardStateMap(deserialized.getStateValues(),
Optional.ofNullable(deserialized.getVersion()));
+ deserializedStateMaps.put(componentId, stateMap);
+ }
+
+ return deserializedStateMaps;
+ }
+
@Override
public BulletinRepository getBulletinRepository() {
return bulletinRepository;