This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c93e379080d KAFKA-19924: Ensure deterministic task order by using
TreeSet in TaskManager (#20992)
c93e379080d is described below
commit c93e379080d680eed6e2b52d96d748c8e2746167
Author: Shashank <[email protected]>
AuthorDate: Wed Nov 26 02:03:22 2025 -0800
KAFKA-19924: Ensure deterministic task order by using TreeSet in
TaskManager (#20992)
Replace `HashSet` with `TreeSet` ordered by `TaskId` to ensure
deterministic processing order
Reviewers: Lucas Brutschy <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 35 ++++++++--------
.../processor/internals/TaskManagerTest.java | 46 ++++++++++++++--------
2 files changed, 48 insertions(+), 33 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c1b1c06379e..4092381fdf4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -225,8 +225,8 @@ public class TaskManager {
// We need to stop all processing, since we need to commit
non-corrupted tasks as well.
maybeLockTasks(activeTasks);
- final Set<Task> corruptedActiveTasks = new HashSet<>();
- final Set<Task> corruptedStandbyTasks = new HashSet<>();
+ final Set<Task> corruptedActiveTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ final Set<Task> corruptedStandbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
for (final TaskId taskId : corruptedTasks) {
final Task task = tasks.task(taskId);
@@ -559,7 +559,7 @@ public class TaskManager {
private void handleTasksPendingInitialization() {
// All tasks pending initialization are not part of the usual
bookkeeping
- final Set<Task> tasksToCloseDirty = new HashSet<>();
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
for (final Task task : tasks.drainPendingTasksToInit()) {
closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
@@ -577,7 +577,7 @@ public class TaskManager {
// recycle the startup standbys to active, and remove them from the
set of actives that need to be created
if (!startupStandbyTasksToRecycle.isEmpty()) {
- final Set<Task> tasksToCloseDirty = new HashSet<>();
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
for (final Map.Entry<Task, Set<TopicPartition>> entry :
startupStandbyTasksToRecycle.entrySet()) {
final Task task = entry.getKey();
recycleTaskFromStateUpdater(task, entry.getValue(),
tasksToCloseDirty, failedTasks);
@@ -635,8 +635,8 @@ public class TaskManager {
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<TaskId,
RuntimeException> failedTasks) {
final Map<Task, Set<TopicPartition>> tasksToRecycleFromStateUpdater =
new HashMap<>();
- final Set<Task> tasksToCloseCleanFromStateUpdater = new HashSet<>();
- final Set<Task> tasksToCloseDirtyFromStateUpdater = new HashSet<>();
+ final Set<Task> tasksToCloseCleanFromStateUpdater = new
TreeSet<>(Comparator.comparing(Task::id));
+ final Set<Task> tasksToCloseDirtyFromStateUpdater = new
TreeSet<>(Comparator.comparing(Task::id));
handleTasksInStateUpdater(
activeTasksToCreate,
standbyTasksToCreate,
@@ -1152,8 +1152,8 @@ public class TaskManager {
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
final Set<TopicPartition> remainingRevokedPartitions = new
HashSet<>(revokedPartitions);
- final Set<Task> revokedActiveTasks = new HashSet<>();
- final Set<Task> commitNeededActiveTasks = new HashSet<>();
+ final Set<Task> revokedActiveTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ final Set<Task> commitNeededActiveTasks = new
TreeSet<>(Comparator.comparing(Task::id));
final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsPerTask = new HashMap<>();
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
@@ -1192,7 +1192,7 @@ public class TaskManager {
// even if commit failed, we should still continue and complete
suspending those tasks, so we would capture
// any exception and rethrow it at the end. some exceptions may be
handled immediately and then swallowed,
// as such we just need to skip those dirty tasks in the checkpoint
- final Set<Task> dirtyTasks = new HashSet<>();
+ final Set<Task> dirtyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
try {
if (revokedTasksNeedCommit) {
// in handleRevocation we must call
commitOffsetsOrTransaction() directly rather than
@@ -1337,8 +1337,9 @@ public class TaskManager {
private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
if (stateUpdater != null) {
final Map<TaskId,
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new
LinkedHashMap<>();
- final Set<Task> tasksToCloseClean = new
HashSet<>(tasks.drainPendingActiveTasksToInit());
- final Set<Task> tasksToCloseDirty = new HashSet<>();
+ final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
+ tasksToCloseClean.addAll(tasks.drainPendingActiveTasksToInit());
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
for (final Task restoringTask : stateUpdater.tasks()) {
if (restoringTask.isActive()) {
futures.put(restoringTask.id(),
stateUpdater.remove(restoringTask.id()));
@@ -1592,8 +1593,8 @@ public class TaskManager {
final CompletableFuture<StateUpdater.RemovedTaskResult> future
= stateUpdater.remove(task.id());
futures.put(task.id(), future);
}
- final Set<Task> tasksToCloseClean = new HashSet<>();
- final Set<Task> tasksToCloseDirty = new HashSet<>();
+ final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
// at this point we removed all tasks, so the shutdown should not
take a lot of time
stateUpdater.shutdown(Duration.ofMinutes(1L));
@@ -1635,7 +1636,7 @@ public class TaskManager {
.collect(Collectors.toSet());
maybeLockTasks(ids);
- final Set<Task> tasksToCloseDirty = new HashSet<>();
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
tasksToCloseDirty.addAll(tryCloseCleanActiveTasks(activeTasks, clean,
firstException));
tasksToCloseDirty.addAll(tryCloseCleanStandbyTasks(standbyTasks,
clean, firstException));
@@ -1751,7 +1752,7 @@ public class TaskManager {
if (!clean) {
return standbyTaskIterable();
}
- final Set<Task> tasksToCloseDirty = new HashSet<>();
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
// first committing and then suspend / close clean
for (final Task task : standbyTasksToClose) {
@@ -2083,8 +2084,8 @@ public class TaskManager {
void maybeCloseTasksFromRemovedTopologies(final Set<String>
currentNamedTopologies) {
try {
- final Set<Task> activeTasksToRemove = new HashSet<>();
- final Set<Task> standbyTasksToRemove = new HashSet<>();
+ final Set<Task> activeTasksToRemove = new
TreeSet<>(Comparator.comparing(Task::id));
+ final Set<Task> standbyTasksToRemove = new
TreeSet<>(Comparator.comparing(Task::id));
for (final Task task : tasks.allTasks()) {
if
(!currentNamedTopologies.contains(task.id().topologyName())) {
if (task.isActive()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3e87eebe733..aa2e95623e3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4385,23 +4385,33 @@ public class TaskManagerTest {
@Test
public void
shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
- final StateMachineTask migratedTask01 = new StateMachineTask(taskId01,
taskId01Partitions, false, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new TaskMigratedException("t1 close exception", new
RuntimeException());
- }
- };
+ final StandbyTask migratedTask01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions)
+ .build();
+ final StandbyTask migratedTask02 = standbyTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions)
+ .build();
- final StateMachineTask migratedTask02 = new StateMachineTask(taskId02,
taskId02Partitions, false, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new TaskMigratedException("t2 close exception", new
RuntimeException());
- }
- };
- taskManager.addTask(migratedTask01);
- taskManager.addTask(migratedTask02);
+ doThrow(new TaskMigratedException("t1 close exception", new
RuntimeException()))
+ .when(migratedTask01).suspend();
+ doThrow(new TaskMigratedException("t2 close exception", new
RuntimeException()))
+ .when(migratedTask02).suspend();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01,
migratedTask02));
+
+ // mock futures for removing tasks from StateUpdater
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future01 = new
CompletableFuture<>();
+ when(stateUpdater.remove(taskId01)).thenReturn(future01);
+ future01.complete(new StateUpdater.RemovedTaskResult(migratedTask01));
+
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future02 = new
CompletableFuture<>();
+ when(stateUpdater.remove(taskId02)).thenReturn(future02);
+ future02.complete(new StateUpdater.RemovedTaskResult(migratedTask02));
final TaskMigratedException thrown = assertThrows(
TaskMigratedException.class,
@@ -4413,6 +4423,10 @@ public class TaskManagerTest {
thrown.getMessage(),
equalTo("t2 close exception; it means all tasks belonging to this
thread should be migrated.")
);
+ verify(migratedTask01, times(2)).suspend();
+ verify(migratedTask02, times(2)).suspend();
+ verify(stateUpdater).remove(taskId01);
+ verify(stateUpdater).remove(taskId02);
}
@Test