This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fa98c2777 NIFI-14693: Restoring Component State in StatelessDataflow 
API
2fa98c2777 is described below

commit 2fa98c277752d03b5affd3426a5f2d3d3fe9fb59
Author: Joe Gresock <[email protected]>
AuthorDate: Wed Jun 25 21:54:08 2025 -0400

    NIFI-14693: Restoring Component State in StatelessDataflow API
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10049.
---
 .../nifi/stateless/flow/StatelessDataflow.java     |  8 ++++
 .../nifi/stateless/flow/StandardStatelessFlow.java | 47 ++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index 9ae5efc882..758b153371 100644
--- 
a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++ 
b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -94,10 +94,18 @@ public interface StatelessDataflow {
 
     boolean isFlowFileQueued();
 
+    /**
+     *
+     * @return True if there are any processors in the dataflow with the 
{@link org.apache.nifi.annotation.behavior.Stateful} annotation
+     */
+    boolean isStateful();
+
     void purge();
 
     Map<String, String> getComponentStates(Scope scope);
 
+    void setComponentStates(Map<String, String> componentStates, Scope scope);
+
     BulletinRepository getBulletinRepository();
 
     OptionalLong getCounter(String componentId, String counterName);
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 215a717d41..2aa2d2a8d9 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -51,6 +51,7 @@ import 
org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.controller.state.StandardStateMap;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
@@ -135,6 +136,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
     private volatile ExecutorService runDataflowExecutor;
     private volatile ScheduledExecutorService backgroundTaskExecutor;
     private volatile boolean initialized = false;
+    private volatile Boolean stateful = null;
     private volatile boolean shutdown = false;
     private volatile boolean manageControllerServices = true;
 
@@ -711,6 +713,17 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         }
     }
 
+    @Override
+    public boolean isStateful() {
+        if (stateful == null) {
+            final boolean hasStatefulReportingTask = 
reportingTasks.stream().anyMatch(this::isStateful);
+            if (hasStatefulReportingTask) {
+                return true;
+            }
+            stateful = isStateful(rootGroup);
+        }
+        return stateful;
+    }
 
     private boolean isStateful(final ProcessGroup processGroup) {
         final boolean hasStatefulProcessor = 
processGroup.getProcessors().stream().anyMatch(this::isStateful);
@@ -873,6 +886,40 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         return serializedStateMaps;
     }
 
+    @Override
+    public void setComponentStates(final Map<String, String> componentStates, 
final Scope scope) {
+        final Map<String, StateMap> stateMaps = 
deserializeStateMaps(componentStates);
+        stateManagerProvider.updateComponentsStates(stateMaps, scope);
+    }
+
+    private Map<String, StateMap> deserializeStateMaps(final Map<String, 
String> componentStates) {
+        if (componentStates == null) {
+            return Collections.emptyMap();
+        }
+
+        final Map<String, StateMap> deserializedStateMaps = new HashMap<>();
+
+        for (final Map.Entry<String, String> entry : 
componentStates.entrySet()) {
+            final String componentId = entry.getKey();
+            final String serialized = entry.getValue();
+
+            final SerializableStateMap deserialized;
+            try {
+                deserialized = objectMapper.readValue(serialized, 
SerializableStateMap.class);
+            } catch (final Exception e) {
+                // We want to avoid throwing an Exception here because if we 
do, we may never be able to run the flow again, at least not without
+                // destroying all state that exists for the component. Would 
be better to simply skip the state for this component
+                logger.error("Failed to deserialized components' state for 
component with ID {}. State will be reset to empty", componentId, e);
+                continue;
+            }
+
+            final StateMap stateMap = new 
StandardStateMap(deserialized.getStateValues(), 
Optional.ofNullable(deserialized.getVersion()));
+            deserializedStateMaps.put(componentId, stateMap);
+        }
+
+        return deserializedStateMaps;
+    }
+
     @Override
     public BulletinRepository getBulletinRepository() {
         return bulletinRepository;

Reply via email to