This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c93e379080d KAFKA-19924: Ensure deterministic task order by using 
TreeSet in TaskManager (#20992)
c93e379080d is described below

commit c93e379080d680eed6e2b52d96d748c8e2746167
Author: Shashank <[email protected]>
AuthorDate: Wed Nov 26 02:03:22 2025 -0800

    KAFKA-19924: Ensure deterministic task order by using TreeSet in 
TaskManager (#20992)
    
    Replace `HashSet` with `TreeSet` ordered by `TaskId` to ensure
    deterministic processing order
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   | 35 ++++++++--------
 .../processor/internals/TaskManagerTest.java       | 46 ++++++++++++++--------
 2 files changed, 48 insertions(+), 33 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c1b1c06379e..4092381fdf4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -225,8 +225,8 @@ public class TaskManager {
         // We need to stop all processing, since we need to commit 
non-corrupted tasks as well.
         maybeLockTasks(activeTasks);
 
-        final Set<Task> corruptedActiveTasks = new HashSet<>();
-        final Set<Task> corruptedStandbyTasks = new HashSet<>();
+        final Set<Task> corruptedActiveTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        final Set<Task> corruptedStandbyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
@@ -559,7 +559,7 @@ public class TaskManager {
     private void handleTasksPendingInitialization() {
         // All tasks pending initialization are not part of the usual 
bookkeeping
 
-        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         for (final Task task : tasks.drainPendingTasksToInit()) {
             closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
@@ -577,7 +577,7 @@ public class TaskManager {
 
         // recycle the startup standbys to active, and remove them from the 
set of actives that need to be created
         if (!startupStandbyTasksToRecycle.isEmpty()) {
-            final Set<Task> tasksToCloseDirty = new HashSet<>();
+            final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
             for (final Map.Entry<Task, Set<TopicPartition>> entry : 
startupStandbyTasksToRecycle.entrySet()) {
                 final Task task = entry.getKey();
                 recycleTaskFromStateUpdater(task, entry.getValue(), 
tasksToCloseDirty, failedTasks);
@@ -635,8 +635,8 @@ public class TaskManager {
                                                  final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                  final Map<TaskId, 
RuntimeException> failedTasks) {
         final Map<Task, Set<TopicPartition>> tasksToRecycleFromStateUpdater = 
new HashMap<>();
-        final Set<Task> tasksToCloseCleanFromStateUpdater = new HashSet<>();
-        final Set<Task> tasksToCloseDirtyFromStateUpdater = new HashSet<>();
+        final Set<Task> tasksToCloseCleanFromStateUpdater = new 
TreeSet<>(Comparator.comparing(Task::id));
+        final Set<Task> tasksToCloseDirtyFromStateUpdater = new 
TreeSet<>(Comparator.comparing(Task::id));
         handleTasksInStateUpdater(
             activeTasksToCreate,
             standbyTasksToCreate,
@@ -1152,8 +1152,8 @@ public class TaskManager {
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
         final Set<TopicPartition> remainingRevokedPartitions = new 
HashSet<>(revokedPartitions);
 
-        final Set<Task> revokedActiveTasks = new HashSet<>();
-        final Set<Task> commitNeededActiveTasks = new HashSet<>();
+        final Set<Task> revokedActiveTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        final Set<Task> commitNeededActiveTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
         final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsPerTask = new HashMap<>();
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
@@ -1192,7 +1192,7 @@ public class TaskManager {
         // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
         // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
         // as such we just need to skip those dirty tasks in the checkpoint
-        final Set<Task> dirtyTasks = new HashSet<>();
+        final Set<Task> dirtyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
         try {
             if (revokedTasksNeedCommit) {
                 // in handleRevocation we must call 
commitOffsetsOrTransaction() directly rather than
@@ -1337,8 +1337,9 @@ public class TaskManager {
     private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
         if (stateUpdater != null) {
             final Map<TaskId, 
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new 
LinkedHashMap<>();
-            final Set<Task> tasksToCloseClean = new 
HashSet<>(tasks.drainPendingActiveTasksToInit());
-            final Set<Task> tasksToCloseDirty = new HashSet<>();
+            final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
+            tasksToCloseClean.addAll(tasks.drainPendingActiveTasksToInit());
+            final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
             for (final Task restoringTask : stateUpdater.tasks()) {
                 if (restoringTask.isActive()) {
                     futures.put(restoringTask.id(), 
stateUpdater.remove(restoringTask.id()));
@@ -1592,8 +1593,8 @@ public class TaskManager {
                 final CompletableFuture<StateUpdater.RemovedTaskResult> future 
= stateUpdater.remove(task.id());
                 futures.put(task.id(), future);
             }
-            final Set<Task> tasksToCloseClean = new HashSet<>();
-            final Set<Task> tasksToCloseDirty = new HashSet<>();
+            final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
+            final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
             addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
             // at this point we removed all tasks, so the shutdown should not 
take a lot of time
             stateUpdater.shutdown(Duration.ofMinutes(1L));
@@ -1635,7 +1636,7 @@ public class TaskManager {
                 .collect(Collectors.toSet());
         maybeLockTasks(ids);
 
-        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
         tasksToCloseDirty.addAll(tryCloseCleanActiveTasks(activeTasks, clean, 
firstException));
         tasksToCloseDirty.addAll(tryCloseCleanStandbyTasks(standbyTasks, 
clean, firstException));
 
@@ -1751,7 +1752,7 @@ public class TaskManager {
         if (!clean) {
             return standbyTaskIterable();
         }
-        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         // first committing and then suspend / close clean
         for (final Task task : standbyTasksToClose) {
@@ -2083,8 +2084,8 @@ public class TaskManager {
 
     void maybeCloseTasksFromRemovedTopologies(final Set<String> 
currentNamedTopologies) {
         try {
-            final Set<Task> activeTasksToRemove = new HashSet<>();
-            final Set<Task> standbyTasksToRemove = new HashSet<>();
+            final Set<Task> activeTasksToRemove = new 
TreeSet<>(Comparator.comparing(Task::id));
+            final Set<Task> standbyTasksToRemove = new 
TreeSet<>(Comparator.comparing(Task::id));
             for (final Task task : tasks.allTasks()) {
                 if 
(!currentNamedTopologies.contains(task.id().topologyName())) {
                     if (task.isActive()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3e87eebe733..aa2e95623e3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4385,23 +4385,33 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
-        final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new TaskMigratedException("t1 close exception", new 
RuntimeException());
-            }
-        };
+        final StandbyTask migratedTask01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId01Partitions)
+            .build();
+        final StandbyTask migratedTask02 = standbyTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId02Partitions)
+            .build();
 
-        final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, 
taskId02Partitions, false, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new TaskMigratedException("t2 close exception", new 
RuntimeException());
-            }
-        };
-        taskManager.addTask(migratedTask01);
-        taskManager.addTask(migratedTask02);
+        doThrow(new TaskMigratedException("t1 close exception", new 
RuntimeException()))
+            .when(migratedTask01).suspend();
+        doThrow(new TaskMigratedException("t2 close exception", new 
RuntimeException()))
+            .when(migratedTask02).suspend();
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, 
migratedTask02));
+
+        // mock futures for removing tasks from StateUpdater
+        final CompletableFuture<StateUpdater.RemovedTaskResult> future01 = new 
CompletableFuture<>();
+        when(stateUpdater.remove(taskId01)).thenReturn(future01);
+        future01.complete(new StateUpdater.RemovedTaskResult(migratedTask01));
+
+        final CompletableFuture<StateUpdater.RemovedTaskResult> future02 = new 
CompletableFuture<>();
+        when(stateUpdater.remove(taskId02)).thenReturn(future02);
+        future02.complete(new StateUpdater.RemovedTaskResult(migratedTask02));
 
         final TaskMigratedException thrown = assertThrows(
             TaskMigratedException.class,
@@ -4413,6 +4423,10 @@ public class TaskManagerTest {
             thrown.getMessage(),
             equalTo("t2 close exception; it means all tasks belonging to this 
thread should be migrated.")
         );
+        verify(migratedTask01, times(2)).suspend();
+        verify(migratedTask02, times(2)).suspend();
+        verify(stateUpdater).remove(taskId01);
+        verify(stateUpdater).remove(taskId02);
     }
 
     @Test

Reply via email to