This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 0f9bd919bd0 KAFKA-19994: TaskManager may not close all tasks on task
timeouts (#21155)
0f9bd919bd0 is described below
commit 0f9bd919bd0da5280db1691cb25399ccc6cd7362
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Dec 16 09:40:24 2025 +0100
KAFKA-19994: TaskManager may not close all tasks on task timeouts (#21155)
When a TimeoutException occurs while trying to put multiple active tasks
back into running, we will add the timed out task back to the state
updater, so that we retry it.
However, if we run into a Task timeout (failing to make progress for a
long time), we will rethrow a StreamsException wrapping the
TimeoutException we have drained multiple tasks from the state
updater, they will be lost, and not added back to the state updater,
and therefore not be closed correctly. The task directories remain
locked, causing issues trying to replace the stream thread.
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 24 +++++++++--
.../processor/internals/TaskManagerTest.java | 46 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 859d62906ab..13efca5b200 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -57,6 +57,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -978,9 +979,12 @@ public class TaskManager {
}
}
+ /**
+ * @throws StreamsException if fetching committed offsets timed out often
enough to exceed task timeout
+ */
private void transitRestoredTaskToRunning(final Task task,
final long now,
- final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+ final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) throws
StreamsException {
try {
task.completeRestoration(offsetResetter);
tasks.addTask(task);
@@ -1057,8 +1061,22 @@ public class TaskManager {
private void handleRestoredTasksFromStateUpdater(final long now,
final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
final Duration timeout = Duration.ZERO;
- for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout))
{
- transitRestoredTaskToRunning(task, now, offsetResetter);
+ // Create a mutable copy to support iterator.remove()
+ final Set<StreamTask> restoredTasks = new
LinkedHashSet<>(stateUpdater.drainRestoredActiveTasks(timeout));
+ final Iterator<StreamTask> iterator = restoredTasks.iterator();
+
+ try {
+ while (iterator.hasNext()) {
+ final Task task = iterator.next();
+ transitRestoredTaskToRunning(task, now, offsetResetter);
+ iterator.remove(); // Remove successfully transitioned tasks
+ }
+ } finally {
+ // Add back any tasks that we drained but didn't successfully
transition
+ // from the state updater, so that they are closed during shutdown.
+ for (final Task task : restoredTasks) {
+ stateUpdater.add(task);
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 25cf8fdc401..b740c8c9194 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1643,6 +1643,52 @@ public class TaskManagerTest {
verifyNoInteractions(consumer);
}
+ @Test
+ public void shouldAddFailedRestoredTasksBackToStateUpdaterOnException() {
+ final StreamTask task1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StreamTask task2 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId01Partitions).build();
+ final StreamTask task3 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId02Partitions).build();
+
+ // Use LinkedHashSet to ensure predictable iteration order
+ final Set<StreamTask> restoredTasks = new java.util.LinkedHashSet<>();
+ restoredTasks.add(task1);
+ restoredTasks.add(task2);
+ restoredTasks.add(task3);
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTransitionToRunningOfRestoredTask(restoredTasks, tasks);
+
+ // task1 completes successfully, task2 throws StreamsException from
maybeInitTaskTimeoutOrThrow
+ // task3 is never processed because task2 throws
+ final TimeoutException timeoutException = new TimeoutException();
+
doThrow(timeoutException).when(task2).completeRestoration(noOpResetter);
+ doThrow(new StreamsException("Task timeout exceeded",
task2.id())).when(task2).maybeInitTaskTimeoutOrThrow(anyLong(),
eq(timeoutException));
+
+ assertThrows(StreamsException.class, () ->
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
+
+ // task1 should be successfully transitioned
+ verify(tasks).addTask(task1);
+ verify(consumer).resume(task1.inputPartitions());
+ verify(task1).clearTaskTimeout();
+
+ // task2 should be added back to state updater once in the finally
block
+ // (the add in the catch block doesn't execute because
maybeInitTaskTimeoutOrThrow throws)
+ verify(stateUpdater).add(task2);
+ verify(tasks, never()).addTask(task2);
+ verify(task2, never()).clearTaskTimeout();
+
+ // task3 should also be added back to state updater in the finally
block
+ verify(stateUpdater).add(task3);
+ verify(tasks, never()).addTask(task3);
+ verify(task3, never()).clearTaskTimeout();
+ }
+
private TaskManager setUpTransitionToRunningOfRestoredTask(final
Set<StreamTask> statefulTasks,
final
TasksRegistry tasks) {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);