This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1b160b6956 KAFKA-19510: clear pendingTasksToInit on tasks clear. 
(#20646)
d1b160b6956 is described below

commit d1b160b6956e19f07205e0019ff0d8ded34b898f
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Oct 8 14:42:50 2025 -0700

    KAFKA-19510: clear pendingTasksToInit on tasks clear. (#20646)
    
    Clear pendingTasksToInit on tasks clear.  It matters in situations when
    we shutting down a thread in PARTITIONS_ASSIGNED state. In this case we
    may have locked some unassigned task directories (see
    TaskManager#tryToLockAllNonEmptyTaskDirectories). Then we may have
    gotten assigned to one or multiple of those tasks. In this scenario,  we
    will not release the locks for the unassigned task directories (see
    TaskManager#releaseLockedUnassignedTaskDirectories), because
    TaskManager#allTasks includes pendingTasksToInit, but it hasn't been
    cleared.
    
    Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy
     <[email protected]>
---
 .../kafka/streams/processor/internals/Tasks.java   | 11 +++++++++++
 .../streams/processor/internals/TasksTest.java     | 23 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 92dd07ba974..76d63490683 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -258,6 +258,9 @@ class Tasks implements TasksRegistry {
 
     @Override
     public synchronized void clear() {
+        pendingTasksToInit.clear();
+        pendingActiveTasksToCreate.clear();
+        pendingStandbyTasksToCreate.clear();
         activeTasksPerId.clear();
         standbyTasksPerId.clear();
         activeTasksPerPartition.clear();
@@ -346,4 +349,12 @@ class Tasks implements TasksRegistry {
     public boolean contains(final TaskId taskId) {
         return getTask(taskId) != null;
     }
+
+    Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate() {
+        return pendingActiveTasksToCreate;
+    }
+
+    Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate() {
+        return pendingStandbyTasksToCreate;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index e2bd30c20a5..ec4d672f9c2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -207,4 +207,27 @@ public class TasksTest {
         tasks.addTask(activeTask1);
         assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
     }
+
+    @Test
+    public void shouldClearAllPendingTasks() {
+        final StandbyTask task = standbyTask(TASK_0_0, 
Set.of(TOPIC_PARTITION_B_0))
+            .inState(State.CREATED).build();
+        tasks.addPendingTasksToInit(Collections.singleton(task));
+        final TaskId taskId1 = new TaskId(0, 0, "A");
+        tasks.addPendingActiveTasksToCreate(mkMap(
+            mkEntry(taskId1, Set.of(TOPIC_PARTITION_A_0))
+        ));
+        final TaskId taskId2 = new TaskId(0, 1, "A");
+        tasks.addPendingStandbyTasksToCreate(mkMap(
+            mkEntry(taskId2, Set.of(TOPIC_PARTITION_A_0))
+        ));
+
+        assertTrue(tasks.pendingTasksToInit().contains(task));
+        assertTrue(tasks.pendingActiveTasksToCreate().containsKey(taskId1));
+        assertTrue(tasks.pendingStandbyTasksToCreate().containsKey(taskId2));
+        tasks.clear();
+        assertTrue(tasks.pendingTasksToInit().isEmpty());
+        assertTrue(tasks.pendingActiveTasksToCreate().isEmpty());
+        assertTrue(tasks.pendingStandbyTasksToCreate().isEmpty());
+    }
 }

Reply via email to