This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d1fa36a92b NIFI-15519 Skipped saving Flow Configuration on shutdown to 
keep expected Processor states (#10864)
d1fa36a92b is described below

commit d1fa36a92b05cf95017f4d655e5069d2f16eff32
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Feb 6 21:34:49 2026 +0100

    NIFI-15519 Skipped saving Flow Configuration on shutdown to keep expected 
Processor states (#10864)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/controller/StandardFlowService.java       |  30 ++++-
 .../restart/ProcessorAutoResumeAfterRestartIT.java | 146 +++++++++++++++++++++
 2 files changed, 173 insertions(+), 3 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 7755030322..5c6f7864ae 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -311,6 +311,19 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 return;
             }
 
+            // Flush any pending save while processors are still running, 
preserving their
+            // RUNNING states in flow.json.gz. This must happen before 
controller.shutdown()
+            // which sets all processor desired states to STOPPED.
+            final SaveHolder pendingSave = saveHolder.getAndSet(null);
+            if (pendingSave != null) {
+                try {
+                    dao.save(controller, pendingSave.shouldArchive);
+                    logger.info("Flushed pending flow save before shutdown");
+                } catch (final Exception e) {
+                    logger.error("Failed to flush pending flow save before 
shutdown", e);
+                }
+            }
+
             running.set(false);
 
             // Stop Cluster Coordinator before Node Protocol Sender
@@ -334,6 +347,10 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             if (!controller.isTerminated()) {
                 controller.shutdown(force);
             }
+
+            // Clear any save requests triggered during shutdown (e.g. by 
stopping processors).
+            // These would contain incorrect processor states (STOPPED mapped 
to ENABLED).
+            saveHolder.set(null);
         } finally {
             writeLock.unlock();
         }
@@ -357,9 +374,6 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 logger.warn("Scheduling service did not gracefully shutdown 
within configured {} second window", gracefulShutdownSeconds);
             }
         }
-
-        // Ensure that our background save reporting task has a chance to run, 
because we've now shut down the executor, which could cause the save reporting 
task to get canceled.
-        saveReportingTask.run();
     }
 
     @Override
@@ -1026,6 +1040,16 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                     }
                     writeLock.lock();
                     try {
+                        // Skip saving during shutdown to preserve RUNNING 
processor states in flow.json.gz.
+                        // During graceful shutdown, processor desired states 
are set to STOPPED before this
+                        // save executes. Saving at that point would persist 
ENABLED states instead of RUNNING,
+                        // which prevents auto-resume of processors on the 
next startup.
+                        if (!running.get()) {
+                            StandardFlowService.this.saveHolder.set(null);
+                            logger.info("Skipping flow controller save because 
service is no longer running");
+                            return;
+                        }
+
                         dao.save(controller, holder.shouldArchive);
                         // Nulling it out if it is still set to our current 
SaveHolder.  Otherwise leave it alone because it means
                         // another save is already pending.
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java
new file mode 100644
index 0000000000..709f6a53cb
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.restart;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.tests.system.NiFiInstance;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ProcessorAutoResumeAfterRestartIT extends NiFiSystemIT {
+
+    @Test
+    public void testRunningProcessorsResumeAfterRestart() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity generate = 
getClientUtil().createProcessor(GENERATE_FLOWFILE);
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor(TERMINATE_FLOWFILE);
+        getClientUtil().createConnection(generate, terminate, SUCCESS);
+
+        getClientUtil().updateProcessorSchedulingPeriod(generate, "10 sec");
+        getClientUtil().startProcessor(generate);
+        getClientUtil().startProcessor(terminate);
+        getClientUtil().waitForRunningProcessor(generate.getId());
+        getClientUtil().waitForRunningProcessor(terminate.getId());
+
+        final Set<String> runningProcessorIds = Set.of(generate.getId(), 
terminate.getId());
+
+        // Wait for the flow to be saved at least once with RUNNING states.
+        // The SaveReportingTask runs every 500ms and the default write delay 
is 500ms,
+        // so 3 seconds is sufficient for at least one successful save.
+        Thread.sleep(3000);
+
+        // Trigger a new save request by making a flow modification, then 
immediately stop NiFi.
+        // Each REST API modification calls saveFlowChanges() which sets a 
saveHolder with a 500ms delay.
+        // By stopping NiFi immediately after the modification, the pending 
save has not yet been processed.
+        // During NiFi's shutdown sequence, the pending save would execute 
after all processors have been
+        // stopped, persisting ENABLED states instead of RUNNING. The fix in 
StandardFlowService.stop()
+        // flushes any pending save before stopping the controller, preserving 
RUNNING states.
+        final ProcessorEntity addedBeforeShutdown = 
getClientUtil().createProcessor(TERMINATE_FLOWFILE);
+
+        final NiFiInstance nifiInstance = getNiFiInstance();
+        nifiInstance.stop();
+
+        // After shutdown, verify that flow.json.gz still has RUNNING states 
for the started processors.
+        // Without the fix, the shutdown race condition would cause processors 
to be saved with ENABLED
+        // states instead of RUNNING, preventing auto-resume on the next 
startup.
+        final File confDir = new File(nifiInstance.getInstanceDirectory(), 
"conf");
+        final File flowJsonGz = new File(confDir, "flow.json.gz");
+
+        final Map<String, String> processorStates = 
getProcessorScheduledStates(flowJsonGz);
+        assertFalse(processorStates.isEmpty(), "Expected processors in 
flow.json.gz");
+        for (final String processorId : runningProcessorIds) {
+            final String state = processorStates.get(processorId);
+            assertEquals("RUNNING", state);
+        }
+
+        assertTrue(processorStates.containsKey(addedBeforeShutdown.getId()));
+
+        nifiInstance.start(true);
+        getClientUtil().waitForRunningProcessor(generate.getId());
+        getClientUtil().waitForRunningProcessor(terminate.getId());
+
+        final ProcessorEntity generateAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(generate.getId());
+        assertEquals("RUNNING", 
generateAfterRestart.getComponent().getState());
+
+        final ProcessorEntity terminateAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(terminate.getId());
+        assertEquals("RUNNING", 
terminateAfterRestart.getComponent().getState());
+
+        final ProcessorEntity addedAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(addedBeforeShutdown.getId());
+        assertNotNull(addedAfterRestart);
+        assertEquals("STOPPED", addedAfterRestart.getComponent().getState());
+    }
+
+    private Map<String, String> getProcessorScheduledStates(final File 
flowJsonGz) throws IOException {
+        final byte[] decompressed = decompress(flowJsonGz);
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode root = mapper.readTree(decompressed);
+        final Map<String, String> states = new HashMap<>();
+        final JsonNode rootGroup = root.path("rootGroup");
+        collectProcessorStates(rootGroup, states);
+        return states;
+    }
+
+    private void collectProcessorStates(final JsonNode group, final 
Map<String, String> states) {
+        final JsonNode processors = group.path("processors");
+        if (processors.isArray()) {
+            for (final JsonNode processor : processors) {
+                final JsonNode instanceId = 
processor.path("instanceIdentifier");
+                final JsonNode scheduledState = 
processor.path("scheduledState");
+                if (!instanceId.isMissingNode() && 
!scheduledState.isMissingNode()) {
+                    states.put(instanceId.asText(), scheduledState.asText());
+                }
+            }
+        }
+        final JsonNode childGroups = group.path("processGroups");
+        if (childGroups.isArray()) {
+            for (final JsonNode childGroup : childGroups) {
+                collectProcessorStates(childGroup, states);
+            }
+        }
+    }
+
+    private byte[] decompress(final File gzipFile) throws IOException {
+        try (final InputStream fis = Files.newInputStream(gzipFile.toPath());
+             final GZIPInputStream gzis = new GZIPInputStream(fis);
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            final byte[] buffer = new byte[8192];
+            int len;
+            while ((len = gzis.read(buffer)) != -1) {
+                baos.write(buffer, 0, len);
+            }
+            return baos.toByteArray();
+        }
+    }
+}

Reply via email to