This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9f20f89953 KAFKA-10199: Remove tasks from state updater on partition lost (#12521) 9f20f89953 is described below commit 9f20f8995399d9e03f518f7b9c8be2bffb2fdcfc Author: Bruno Cadonna <cado...@apache.org> AuthorDate: Wed Aug 17 20:12:30 2022 +0200 KAFKA-10199: Remove tasks from state updater on partition lost (#12521) Removes tasks from the state updater when the input partitions of the tasks are lost during a rebalance. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../streams/processor/internals/TaskManager.java | 26 ++++++++++-- .../kafka/streams/processor/internals/Tasks.java | 19 ++++++--- .../processor/internals/TaskManagerTest.java | 48 ++++++++++++++++++++-- 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 03c36b0daf..4bba28a3f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -465,7 +465,7 @@ public class TaskManager { standbyTasksToCreate.remove(taskId); } else { stateUpdater.remove(taskId); - tasks.addPendingTaskToClose(taskId); + tasks.addPendingTaskToCloseClean(taskId); } } } @@ -692,7 +692,7 @@ public class TaskManager { taskExceptions.putIfAbsent(taskId, e); } - } else if (tasks.removePendingTaskToClose(task.id())) { + } else if (tasks.removePendingTaskToCloseClean(task.id())) { try { task.suspend(); task.closeClean(); @@ -710,6 +710,8 @@ public class TaskManager { taskExceptions.putIfAbsent(task.id(), e); } + } else if (tasks.removePendingTaskToCloseDirty(task.id())) { + tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); stateUpdater.add(task); @@ -867,6 +869,15 @@ public class TaskManager { void handleLostAll() { log.debug("Closing lost active tasks as zombies."); + closeRunningTasksDirty(); + removeLostTasksFromStateUpdater(); + + if (processingMode == EXACTLY_ONCE_V2) { + activeTaskCreator.reInitializeThreadProducer(); + } + } + + private void closeRunningTasksDirty() { final Set<Task> allTask = tasks.allTasks(); for (final Task task : allTask) { // Even though we've apparently dropped out of the group, we can continue safely to maintain our @@ -875,9 +886,16 @@ public class TaskManager { closeTaskDirty(task); } } + } - if (processingMode == EXACTLY_ONCE_V2) { - activeTaskCreator.reInitializeThreadProducer(); + private void removeLostTasksFromStateUpdater() { + if (stateUpdater != null) { + for (final Task restoringTask : stateUpdater.getTasks()) { + if (restoringTask.isActive()) { + tasks.addPendingTaskToCloseDirty(restoringTask.id()); + stateUpdater.remove(restoringTask.id()); + } + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index 9628b42d92..8178fe3691 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -55,7 +55,9 @@ class Tasks { private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new HashMap<>(); private final Map<TaskId, Set<TopicPartition>> pendingTasksToUpdateInputPartitions = new HashMap<>(); private final Set<Task> pendingTasksToInit = new HashSet<>(); - private final Set<TaskId> pendingTasksToClose = new HashSet<>(); + private final Set<TaskId> pendingTasksToCloseClean = new HashSet<>(); + + private final Set<TaskId> pendingTasksToCloseDirty = new HashSet<>(); // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<>(); @@ -111,12 +113,19 @@ class Tasks { pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions); } - boolean removePendingTaskToClose(final TaskId taskId) { - return pendingTasksToClose.remove(taskId); + boolean removePendingTaskToCloseDirty(final TaskId taskId) { + return pendingTasksToCloseDirty.remove(taskId); + } + void addPendingTaskToCloseDirty(final TaskId taskId) { + pendingTasksToCloseDirty.add(taskId); + } + + boolean removePendingTaskToCloseClean(final TaskId taskId) { + return pendingTasksToCloseClean.remove(taskId); } - void addPendingTaskToClose(final TaskId taskId) { - pendingTasksToClose.add(taskId); + void addPendingTaskToCloseClean(final TaskId taskId) { + pendingTasksToCloseClean.add(taskId); } Set<Task> drainPendingTaskToInit() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 12ea6477e5..133541bfac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -118,6 +118,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; @@ -144,7 +145,9 @@ public class TaskManagerTest { private final TaskId taskId02 = new TaskId(0, 2); private final TopicPartition t1p2 = new TopicPartition(topic1, 2); + private final TopicPartition t1p2changelog = new TopicPartition("changelog", 2); private final Set<TopicPartition> taskId02Partitions = mkSet(t1p2); + private final Set<TopicPartition> taskId02ChangelogPartitions = mkSet(t1p2changelog); private final TaskId taskId03 = new TaskId(0, 3); private final TopicPartition t1p3 = new TopicPartition(topic1, 3); @@ -343,8 +346,8 @@ public class TaskManagerTest { expectLastCall().anyTimes(); replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer); - taskManager.tasks().addPendingTaskToClose(taskId00); - taskManager.tasks().addPendingTaskToClose(taskId01); + taskManager.tasks().addPendingTaskToCloseClean(taskId00); + taskManager.tasks().addPendingTaskToCloseClean(taskId01); taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { }); @@ -383,6 +386,45 @@ public class TaskManagerTest { Mockito.verify(stateUpdater).add(task01); } + @Test + public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() { + final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final StandbyTask task2 = standbyTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId01Partitions).build(); + final StreamTask task3 = statefulTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId02Partitions).build(); + final TaskManager taskManager = setupForRevocation(mkSet(task1, task2, task3), mkSet(task1, task3)); + + taskManager.handleLostAll(); + + Mockito.verify(stateUpdater).remove(task1.id()); + Mockito.verify(stateUpdater).remove(task3.id()); + + taskManager.tryToCompleteRestoration(time.milliseconds(), null); + + Mockito.verify(task1).closeDirty(); + Mockito.verify(task3).closeDirty(); + Mockito.verify(task2, never()).closeDirty(); + Mockito.verify(task2, never()).closeClean(); + } + + private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater, + final Set<Task> removedTasks) { + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true); + when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater); + when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks); + expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); + consumer.resume(anyObject()); + expectLastCall().anyTimes(); + replay(consumer); + + return taskManager; + } + @Test public void shouldHandleRemovedTasksFromStateUpdater() { // tasks to recycle @@ -436,7 +478,7 @@ public class TaskManagerTest { stateUpdater ); taskManager.setMainConsumer(consumer); - taskManager.tasks().addPendingTaskToClose(taskId02); + taskManager.tasks().addPendingTaskToCloseClean(taskId02); taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions); taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions); taskManager.tasks().addPendingTaskToUpdateInputPartitions(taskId03, taskId03Partitions);