This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 82ee8a204b NIFI-14710 - Stateless should trigger failure callback when
queues are not empty after process group is stopped (#10074)
82ee8a204b is described below
commit 82ee8a204b05363aa80d1db5233584dc54cbd7cd
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Jul 9 22:44:44 2025 +0200
NIFI-14710 - Stateless should trigger failure callback when queues are not
empty after process group is stopped (#10074)
NIFI-14710 - Stateless should trigger failure callback when queues are not
empty after process group is stopped
---
.../nifi/stateless/flow/StandardStatelessFlow.java | 34 ++++++++--
.../stateless/flow/TestStandardStatelessFlow.java | 77 ++++++++++++++++++++++
2 files changed, 107 insertions(+), 4 deletions(-)
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 2aa2d2a8d9..7af043966a 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -144,6 +144,16 @@ public class StandardStatelessFlow implements
StatelessDataflow {
final ProcessContextFactory
processContextFactory, final RepositoryContextFactory repositoryContextFactory,
final DataflowDefinition dataflowDefinition,
final StatelessStateManagerProvider
stateManagerProvider, final ProcessScheduler processScheduler, final
BulletinRepository bulletinRepository,
final LifecycleStateManager
lifecycleStateManager, final Duration componentEnableTimeout, final
StatsTracker statsTracker) {
+ this(rootGroup, reportingTasks, controllerServiceProvider,
processContextFactory, repositoryContextFactory, dataflowDefinition,
+ stateManagerProvider, processScheduler, bulletinRepository,
lifecycleStateManager, componentEnableTimeout, statsTracker,
+ new AsynchronousCommitTracker(rootGroup));
+ }
+
+ public StandardStatelessFlow(final ProcessGroup rootGroup, final
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider
controllerServiceProvider,
+ final ProcessContextFactory
processContextFactory, final RepositoryContextFactory repositoryContextFactory,
final DataflowDefinition dataflowDefinition,
+ final StatelessStateManagerProvider
stateManagerProvider, final ProcessScheduler processScheduler, final
BulletinRepository bulletinRepository,
+ final LifecycleStateManager
lifecycleStateManager, final Duration componentEnableTimeout, final
StatsTracker statsTracker,
+ final AsynchronousCommitTracker
commitTracker) {
this.rootGroup = rootGroup;
this.allConnections = rootGroup.findAllConnections();
this.reportingTasks = reportingTasks;
@@ -156,7 +166,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
this.transactionThresholdMeter = new
TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds());
this.bulletinRepository = bulletinRepository;
this.componentEnableTimeoutMillis = componentEnableTimeout.toMillis();
- this.tracker = new AsynchronousCommitTracker(rootGroup);
+ this.tracker = commitTracker;
this.lifecycleStateManager = lifecycleStateManager;
this.inputPorts = new ArrayList<>(rootGroup.getInputPorts());
this.statsTracker = statsTracker;
@@ -395,8 +405,16 @@ public class StandardStatelessFlow implements
StatelessDataflow {
} else {
final boolean gracefullyStopped =
waitForProcessorThreadsToComplete(runningProcessors, gracefulShutdownDuration);
if (gracefullyStopped) {
- logger.info("All Processors have finished running;
triggering session callbacks");
- tracker.triggerCallbacks();
+ if (rootGroup.isDataQueuedForProcessing()) {
+ final int queuedCount = allConnections.stream()
+ .mapToInt(conn ->
conn.getFlowFileQueue().size().getObjectCount())
+ .sum();
+ logger.warn("All Processors finished running but {}
FlowFiles remain queued; treating session as failure", queuedCount);
+ tracker.triggerFailureCallbacks(new
TerminatedTaskException());
+ } else {
+ logger.info("All Processors have finished running;
triggering session callbacks");
+ tracker.triggerCallbacks();
+ }
} else {
logger.warn("{} Processors did not finish running within
the graceful shutdown period of {} millis. Interrupting all running components.
Processors still running: {}",
runningProcessors.size(),
gracefulShutdownDuration.toMillis(), runningProcessors);
@@ -406,7 +424,15 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
} else {
waitForProcessorThreadsToComplete(runningProcessors,
gracefulShutdownDuration);
- tracker.triggerCallbacks();
+ if (rootGroup.isDataQueuedForProcessing()) {
+ final int queuedCount = allConnections.stream()
+ .mapToInt(conn ->
conn.getFlowFileQueue().size().getObjectCount())
+ .sum();
+ logger.warn("{} FlowFiles remain queued after shutdown;
treating session as failure", queuedCount);
+ tracker.triggerFailureCallbacks(new TerminatedTaskException());
+ } else {
+ tracker.triggerCallbacks();
+ }
}
if (runDataflowExecutor != null) {
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/flow/TestStandardStatelessFlow.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/flow/TestStandardStatelessFlow.java
new file mode 100644
index 0000000000..4920b0bf75
--- /dev/null
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/flow/TestStandardStatelessFlow.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.StatelessStateManagerProvider;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.metrics.tracking.StatsTracker;
+import org.apache.nifi.controller.scheduling.LifecycleStateManager;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.exception.TerminatedTaskException;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestStandardStatelessFlow {
+
+ @Test
+ public void testShutdownWithQueuedFlowFilesTriggersFailure() throws
Exception {
+ final ProcessGroup rootGroup = mock(ProcessGroup.class);
+ when(rootGroup.getName()).thenReturn("root");
+ when(rootGroup.getInputPorts()).thenReturn(Collections.emptySet());
+ when(rootGroup.getOutputPorts()).thenReturn(Collections.emptySet());
+
when(rootGroup.findAllProcessors()).thenReturn(Collections.emptyList());
+
when(rootGroup.stopComponents(any())).thenReturn(CompletableFuture.completedFuture(null));
+
when(rootGroup.findAllRemoteProcessGroups()).thenReturn(Collections.emptyList());
+ when(rootGroup.isDataQueuedForProcessing()).thenReturn(true);
+
+ final ControllerServiceProvider controllerServiceProvider =
mock(ControllerServiceProvider.class);
+ final ProcessContextFactory processContextFactory =
mock(ProcessContextFactory.class);
+ final RepositoryContextFactory repositoryContextFactory =
mock(RepositoryContextFactory.class);
+ final DataflowDefinition dataflowDefinition =
mock(DataflowDefinition.class);
+ final StatelessStateManagerProvider stateManagerProvider =
mock(StatelessStateManagerProvider.class);
+ final ProcessScheduler processScheduler = mock(ProcessScheduler.class);
+ final BulletinRepository bulletinRepository =
mock(BulletinRepository.class);
+ final LifecycleStateManager lifecycleStateManager =
mock(LifecycleStateManager.class);
+ final StatsTracker statsTracker = mock(StatsTracker.class);
+
+ final AsynchronousCommitTracker tracker = spy(new
AsynchronousCommitTracker(rootGroup));
+ final StandardStatelessFlow flow = new
StandardStatelessFlow(rootGroup, List.of(), controllerServiceProvider,
processContextFactory,
+ repositoryContextFactory, dataflowDefinition,
stateManagerProvider, processScheduler, bulletinRepository,
+ lifecycleStateManager, Duration.ZERO, statsTracker, tracker);
+
+ flow.shutdown(false, false, Duration.ZERO);
+
+ verify(tracker,
times(1)).triggerFailureCallbacks(any(TerminatedTaskException.class));
+ }
+
+}