This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 890c87bd3bf KAFKA-19510: clear pendingTasksToInit on tasks clear.
(#20646)
890c87bd3bf is described below
commit 890c87bd3bff6465a97548ee415577caf4594337
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Oct 8 14:42:50 2025 -0700
KAFKA-19510: clear pendingTasksToInit on tasks clear. (#20646)
Clear pendingTasksToInit on tasks clear. It matters in situations when
we shutting down a thread in PARTITIONS_ASSIGNED state. In this case we
may have locked some unassigned task directories (see
TaskManager#tryToLockAllNonEmptyTaskDirectories). Then we may have
gotten assigned to one or multiple of those tasks. In this scenario, we
will not release the locks for the unassigned task directories (see
TaskManager#releaseLockedUnassignedTaskDirectories), because
TaskManager#allTasks includes pendingTasksToInit, but it hasn't been
cleared.
Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../kafka/streams/processor/internals/Tasks.java | 11 +++++++++++
.../streams/processor/internals/TasksTest.java | 23 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 92dd07ba974..76d63490683 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -258,6 +258,9 @@ class Tasks implements TasksRegistry {
@Override
public synchronized void clear() {
+ pendingTasksToInit.clear();
+ pendingActiveTasksToCreate.clear();
+ pendingStandbyTasksToCreate.clear();
activeTasksPerId.clear();
standbyTasksPerId.clear();
activeTasksPerPartition.clear();
@@ -346,4 +349,12 @@ class Tasks implements TasksRegistry {
public boolean contains(final TaskId taskId) {
return getTask(taskId) != null;
}
+
+ Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate() {
+ return pendingActiveTasksToCreate;
+ }
+
+ Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate() {
+ return pendingStandbyTasksToCreate;
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 0620dcfb006..90c254f9726 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -208,4 +208,27 @@ public class TasksTest {
tasks.addTask(activeTask1);
assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
}
+
+ @Test
+ public void shouldClearAllPendingTasks() {
+ final StandbyTask task = standbyTask(TASK_0_0,
Set.of(TOPIC_PARTITION_B_0))
+ .inState(State.CREATED).build();
+ tasks.addPendingTasksToInit(Collections.singleton(task));
+ final TaskId taskId1 = new TaskId(0, 0, "A");
+ tasks.addPendingActiveTasksToCreate(mkMap(
+ mkEntry(taskId1, Set.of(TOPIC_PARTITION_A_0))
+ ));
+ final TaskId taskId2 = new TaskId(0, 1, "A");
+ tasks.addPendingStandbyTasksToCreate(mkMap(
+ mkEntry(taskId2, Set.of(TOPIC_PARTITION_A_0))
+ ));
+
+ assertTrue(tasks.pendingTasksToInit().contains(task));
+ assertTrue(tasks.pendingActiveTasksToCreate().containsKey(taskId1));
+ assertTrue(tasks.pendingStandbyTasksToCreate().containsKey(taskId2));
+ tasks.clear();
+ assertTrue(tasks.pendingTasksToInit().isEmpty());
+ assertTrue(tasks.pendingActiveTasksToCreate().isEmpty());
+ assertTrue(tasks.pendingStandbyTasksToCreate().isEmpty());
+ }
}