This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 82ee8a204b NIFI-14710 - Stateless should trigger failure callback when 
queues are not empty after process group is stopped (#10074)
82ee8a204b is described below

commit 82ee8a204b05363aa80d1db5233584dc54cbd7cd
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Jul 9 22:44:44 2025 +0200

    NIFI-14710 - Stateless should trigger failure callback when queues are not 
empty after process group is stopped (#10074)
    
    NIFI-14710 - Stateless should trigger failure callback when queues are not 
empty after process group is stopped
---
 .../nifi/stateless/flow/StandardStatelessFlow.java | 34 ++++++++--
 .../stateless/flow/TestStandardStatelessFlow.java  | 77 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 4 deletions(-)

diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 2aa2d2a8d9..7af043966a 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -144,6 +144,16 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
                                  final ProcessContextFactory 
processContextFactory, final RepositoryContextFactory repositoryContextFactory, 
final DataflowDefinition dataflowDefinition,
                                  final StatelessStateManagerProvider 
stateManagerProvider, final ProcessScheduler processScheduler, final 
BulletinRepository bulletinRepository,
                                  final LifecycleStateManager 
lifecycleStateManager, final Duration componentEnableTimeout, final 
StatsTracker statsTracker) {
+        this(rootGroup, reportingTasks, controllerServiceProvider, 
processContextFactory, repositoryContextFactory, dataflowDefinition,
+                stateManagerProvider, processScheduler, bulletinRepository, 
lifecycleStateManager, componentEnableTimeout, statsTracker,
+                new AsynchronousCommitTracker(rootGroup));
+    }
+
+    public StandardStatelessFlow(final ProcessGroup rootGroup, final 
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider 
controllerServiceProvider,
+                                 final ProcessContextFactory 
processContextFactory, final RepositoryContextFactory repositoryContextFactory, 
final DataflowDefinition dataflowDefinition,
+                                 final StatelessStateManagerProvider 
stateManagerProvider, final ProcessScheduler processScheduler, final 
BulletinRepository bulletinRepository,
+                                 final LifecycleStateManager 
lifecycleStateManager, final Duration componentEnableTimeout, final 
StatsTracker statsTracker,
+                                 final AsynchronousCommitTracker 
commitTracker) {
         this.rootGroup = rootGroup;
         this.allConnections = rootGroup.findAllConnections();
         this.reportingTasks = reportingTasks;
@@ -156,7 +166,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         this.transactionThresholdMeter = new 
TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds());
         this.bulletinRepository = bulletinRepository;
         this.componentEnableTimeoutMillis = componentEnableTimeout.toMillis();
-        this.tracker = new AsynchronousCommitTracker(rootGroup);
+        this.tracker = commitTracker;
         this.lifecycleStateManager = lifecycleStateManager;
         this.inputPorts = new ArrayList<>(rootGroup.getInputPorts());
         this.statsTracker = statsTracker;
@@ -395,8 +405,16 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
             } else {
                 final boolean gracefullyStopped = 
waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
                 if (gracefullyStopped) {
-                    logger.info("All Processors have finished running; 
triggering session callbacks");
-                    tracker.triggerCallbacks();
+                    if (rootGroup.isDataQueuedForProcessing()) {
+                        final int queuedCount = allConnections.stream()
+                                .mapToInt(conn -> 
conn.getFlowFileQueue().size().getObjectCount())
+                                .sum();
+                        logger.warn("All Processors finished running but {} 
FlowFiles remain queued; treating session as failure", queuedCount);
+                        tracker.triggerFailureCallbacks(new 
TerminatedTaskException());
+                    } else {
+                        logger.info("All Processors have finished running; 
triggering session callbacks");
+                        tracker.triggerCallbacks();
+                    }
                 } else {
                     logger.warn("{} Processors did not finish running within 
the graceful shutdown period of {} millis. Interrupting all running components. 
Processors still running: {}",
                         runningProcessors.size(), 
gracefulShutdownDuration.toMillis(), runningProcessors);
@@ -406,7 +424,15 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
             }
         } else {
             waitForProcessorThreadsToComplete(runningProcessors, 
gracefulShutdownDuration);
-            tracker.triggerCallbacks();
+            if (rootGroup.isDataQueuedForProcessing()) {
+                final int queuedCount = allConnections.stream()
+                        .mapToInt(conn -> 
conn.getFlowFileQueue().size().getObjectCount())
+                        .sum();
+                logger.warn("{} FlowFiles remain queued after shutdown; 
treating session as failure", queuedCount);
+                tracker.triggerFailureCallbacks(new TerminatedTaskException());
+            } else {
+                tracker.triggerCallbacks();
+            }
         }
 
         if (runDataflowExecutor != null) {
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/flow/TestStandardStatelessFlow.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/flow/TestStandardStatelessFlow.java
new file mode 100644
index 0000000000..4920b0bf75
--- /dev/null
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/flow/TestStandardStatelessFlow.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.StatelessStateManagerProvider;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
+import org.apache.nifi.controller.scheduling.LifecycleStateManager;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.exception.TerminatedTaskException;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestStandardStatelessFlow {
+
+    @Test
+    public void testShutdownWithQueuedFlowFilesTriggersFailure() throws 
Exception {
+        final ProcessGroup rootGroup = mock(ProcessGroup.class);
+        when(rootGroup.getName()).thenReturn("root");
+        when(rootGroup.getInputPorts()).thenReturn(Collections.emptySet());
+        when(rootGroup.getOutputPorts()).thenReturn(Collections.emptySet());
+        
when(rootGroup.findAllProcessors()).thenReturn(Collections.emptyList());
+        
when(rootGroup.stopComponents(any())).thenReturn(CompletableFuture.completedFuture(null));
+        
when(rootGroup.findAllRemoteProcessGroups()).thenReturn(Collections.emptyList());
+        when(rootGroup.isDataQueuedForProcessing()).thenReturn(true);
+
+        final ControllerServiceProvider controllerServiceProvider = 
mock(ControllerServiceProvider.class);
+        final ProcessContextFactory processContextFactory = 
mock(ProcessContextFactory.class);
+        final RepositoryContextFactory repositoryContextFactory = 
mock(RepositoryContextFactory.class);
+        final DataflowDefinition dataflowDefinition = 
mock(DataflowDefinition.class);
+        final StatelessStateManagerProvider stateManagerProvider = 
mock(StatelessStateManagerProvider.class);
+        final ProcessScheduler processScheduler = mock(ProcessScheduler.class);
+        final BulletinRepository bulletinRepository = 
mock(BulletinRepository.class);
+        final LifecycleStateManager lifecycleStateManager = 
mock(LifecycleStateManager.class);
+        final StatsTracker statsTracker = mock(StatsTracker.class);
+
+        final AsynchronousCommitTracker tracker = spy(new 
AsynchronousCommitTracker(rootGroup));
+        final StandardStatelessFlow flow = new 
StandardStatelessFlow(rootGroup, List.of(), controllerServiceProvider, 
processContextFactory,
+                repositoryContextFactory, dataflowDefinition, 
stateManagerProvider, processScheduler, bulletinRepository,
+                lifecycleStateManager, Duration.ZERO, statsTracker, tracker);
+
+        flow.shutdown(false, false, Duration.ZERO);
+
+        verify(tracker, 
times(1)).triggerFailureCallbacks(any(TerminatedTaskException.class));
+    }
+
+}

Reply via email to