This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b47c4d8598 KAFKA-10199: Remove tasks from state updater on revocation (#12520) b47c4d8598 is described below commit b47c4d859805068de6a8fe8de3bda5e7a21132e2 Author: Bruno Cadonna <cado...@apache.org> AuthorDate: Wed Aug 17 20:13:34 2022 +0200 KAFKA-10199: Remove tasks from state updater on revocation (#12520) Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../streams/processor/internals/TaskManager.java | 16 +++++ .../processor/internals/TaskManagerTest.java | 81 ++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 4bba28a3f3..bab05a5184 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -757,6 +757,8 @@ public class TaskManager { } } + removeRevokedTasksFromStateUpdater(remainingRevokedPartitions); + if (!remainingRevokedPartitions.isEmpty()) { log.debug("The following revoked partitions {} are missing from the current task partitions. It could " + "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " + @@ -842,6 +844,20 @@ public class TaskManager { } } + private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remainingRevokedPartitions) { + if (stateUpdater != null) { + for (final Task restoringTask : stateUpdater.getTasks()) { + if (restoringTask.isActive()) { + if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { + tasks.addPendingTaskToClose(restoringTask.id()); + stateUpdater.remove(restoringTask.id()); + remainingRevokedPartitions.removeAll(restoringTask.inputPartitions()); + } + } + } + } + } + private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare, final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) { for (final Task task : tasksToPrepare) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 133541bfac..ff52ad5ae9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -751,6 +751,87 @@ public class TaskManagerTest { assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); } + @Test + public void shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() { + final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final TaskManager taskManager = setupForRevocation(mkSet(task), mkSet(task)); + + taskManager.handleRevocation(taskId00Partitions); + + Mockito.verify(stateUpdater).remove(task.id()); + + taskManager.tryToCompleteRestoration(time.milliseconds(), null); + + Mockito.verify(task).closeClean(); + } + + public void shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() { + final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId01Partitions).build(); + final TaskManager taskManager = setupForRevocation(mkSet(task1, task2), mkSet(task1, task2)); + + taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions)); + + Mockito.verify(stateUpdater).remove(task1.id()); + Mockito.verify(stateUpdater).remove(task2.id()); + + taskManager.tryToCompleteRestoration(time.milliseconds(), null); + + Mockito.verify(task1).closeClean(); + Mockito.verify(task2).closeClean(); + } + + @Test + public void shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation() { + final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final TaskManager taskManager = setupForRevocation(mkSet(task), Collections.emptySet()); + + taskManager.handleRevocation(taskId01Partitions); + + Mockito.verify(stateUpdater, never()).remove(task.id()); + + taskManager.tryToCompleteRestoration(time.milliseconds(), null); + + Mockito.verify(task, never()).closeClean(); + } + + @Test + public void shouldNotRemoveStandbyTaskFromStateUpdaterOnRevocation() { + final StandbyTask task = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final TaskManager taskManager = setupForRevocation(mkSet(task), Collections.emptySet()); + + taskManager.handleRevocation(taskId00Partitions); + + Mockito.verify(stateUpdater, never()).remove(task.id()); + + taskManager.tryToCompleteRestoration(time.milliseconds(), null); + + Mockito.verify(task, never()).closeClean(); + } + + private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater, + final Set<Task> removedTasks) { + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true); + when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater); + when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks); + expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); + consumer.resume(anyObject()); + expectLastCall().anyTimes(); + replay(consumer); + + return taskManager; + } + @Test public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);