This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f20f89953 KAFKA-10199: Remove tasks from state updater on partition 
lost (#12521)
9f20f89953 is described below

commit 9f20f8995399d9e03f518f7b9c8be2bffb2fdcfc
Author: Bruno Cadonna <cado...@apache.org>
AuthorDate: Wed Aug 17 20:12:30 2022 +0200

    KAFKA-10199: Remove tasks from state updater on partition lost (#12521)
    
    Removes tasks from the state updater when the input partitions of the tasks 
are lost during a rebalance.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../streams/processor/internals/TaskManager.java   | 26 ++++++++++--
 .../kafka/streams/processor/internals/Tasks.java   | 19 ++++++---
 .../processor/internals/TaskManagerTest.java       | 48 ++++++++++++++++++++--
 3 files changed, 81 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 03c36b0daf..4bba28a3f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -465,7 +465,7 @@ public class TaskManager {
                 standbyTasksToCreate.remove(taskId);
             } else {
                 stateUpdater.remove(taskId);
-                tasks.addPendingTaskToClose(taskId);
+                tasks.addPendingTaskToCloseClean(taskId);
             }
         }
     }
@@ -692,7 +692,7 @@ public class TaskManager {
 
                     taskExceptions.putIfAbsent(taskId, e);
                 }
-            } else if (tasks.removePendingTaskToClose(task.id())) {
+            } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 try {
                     task.suspend();
                     task.closeClean();
@@ -710,6 +710,8 @@ public class TaskManager {
 
                     taskExceptions.putIfAbsent(task.id(), e);
                 }
+            } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
+                tasksToCloseDirty.add(task);
             } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
                 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
                 stateUpdater.add(task);
@@ -867,6 +869,15 @@ public class TaskManager {
     void handleLostAll() {
         log.debug("Closing lost active tasks as zombies.");
 
+        closeRunningTasksDirty();
+        removeLostTasksFromStateUpdater();
+
+        if (processingMode == EXACTLY_ONCE_V2) {
+            activeTaskCreator.reInitializeThreadProducer();
+        }
+    }
+
+    private void closeRunningTasksDirty() {
         final Set<Task> allTask = tasks.allTasks();
         for (final Task task : allTask) {
             // Even though we've apparently dropped out of the group, we can 
continue safely to maintain our
@@ -875,9 +886,16 @@ public class TaskManager {
                 closeTaskDirty(task);
             }
         }
+    }
 
-        if (processingMode == EXACTLY_ONCE_V2) {
-            activeTaskCreator.reInitializeThreadProducer();
+    private void removeLostTasksFromStateUpdater() {
+        if (stateUpdater != null) {
+            for (final Task restoringTask : stateUpdater.getTasks()) {
+                if (restoringTask.isActive()) {
+                    tasks.addPendingTaskToCloseDirty(restoringTask.id());
+                    stateUpdater.remove(restoringTask.id());
+                }
+            }
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 9628b42d92..8178fe3691 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -55,7 +55,9 @@ class Tasks {
     private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new 
HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> 
pendingTasksToUpdateInputPartitions = new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
-    private final Set<TaskId> pendingTasksToClose = new HashSet<>();
+    private final Set<TaskId> pendingTasksToCloseClean = new HashSet<>();
+
+    private final Set<TaskId> pendingTasksToCloseDirty = new HashSet<>();
 
     // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
     private final Map<TopicPartition, Task> activeTasksPerPartition = new 
HashMap<>();
@@ -111,12 +113,19 @@ class Tasks {
         pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
     }
 
-    boolean removePendingTaskToClose(final TaskId taskId) {
-        return pendingTasksToClose.remove(taskId);
+    boolean removePendingTaskToCloseDirty(final TaskId taskId) {
+        return pendingTasksToCloseDirty.remove(taskId);
+    }
+    void addPendingTaskToCloseDirty(final TaskId taskId) {
+        pendingTasksToCloseDirty.add(taskId);
+    }
+
+    boolean removePendingTaskToCloseClean(final TaskId taskId) {
+        return pendingTasksToCloseClean.remove(taskId);
     }
 
-    void addPendingTaskToClose(final TaskId taskId) {
-        pendingTasksToClose.add(taskId);
+    void addPendingTaskToCloseClean(final TaskId taskId) {
+        pendingTasksToCloseClean.add(taskId);
     }
 
     Set<Task> drainPendingTaskToInit() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 12ea6477e5..133541bfac 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -118,6 +118,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 
@@ -144,7 +145,9 @@ public class TaskManagerTest {
 
     private final TaskId taskId02 = new TaskId(0, 2);
     private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+    private final TopicPartition t1p2changelog = new 
TopicPartition("changelog", 2);
     private final Set<TopicPartition> taskId02Partitions = mkSet(t1p2);
+    private final Set<TopicPartition> taskId02ChangelogPartitions = 
mkSet(t1p2changelog);
 
     private final TaskId taskId03 = new TaskId(0, 3);
     private final TopicPartition t1p3 = new TopicPartition(topic1, 3);
@@ -343,8 +346,8 @@ public class TaskManagerTest {
         expectLastCall().anyTimes();
         replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer);
 
-        taskManager.tasks().addPendingTaskToClose(taskId00);
-        taskManager.tasks().addPendingTaskToClose(taskId01);
+        taskManager.tasks().addPendingTaskToCloseClean(taskId00);
+        taskManager.tasks().addPendingTaskToCloseClean(taskId01);
 
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter 
-> { });
 
@@ -383,6 +386,45 @@ public class TaskManagerTest {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
+        final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StandbyTask task2 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId01Partitions).build();
+        final StreamTask task3 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId02Partitions).build();
+        final TaskManager taskManager = setupForRevocation(mkSet(task1, task2, 
task3), mkSet(task1, task3));
+
+        taskManager.handleLostAll();
+
+        Mockito.verify(stateUpdater).remove(task1.id());
+        Mockito.verify(stateUpdater).remove(task3.id());
+
+        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+        Mockito.verify(task1).closeDirty();
+        Mockito.verify(task3).closeDirty();
+        Mockito.verify(task2, never()).closeDirty();
+        Mockito.verify(task2, never()).closeClean();
+    }
+
+    private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
+                                           final Set<Task> removedTasks) {
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
+        when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
+        when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
+        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+        consumer.resume(anyObject());
+        expectLastCall().anyTimes();
+        replay(consumer);
+
+        return taskManager;
+    }
+
     @Test
     public void shouldHandleRemovedTasksFromStateUpdater() {
         // tasks to recycle
@@ -436,7 +478,7 @@ public class TaskManagerTest {
             stateUpdater
         );
         taskManager.setMainConsumer(consumer);
-        taskManager.tasks().addPendingTaskToClose(taskId02);
+        taskManager.tasks().addPendingTaskToCloseClean(taskId02);
         taskManager.tasks().addPendingTaskToRecycle(taskId00, 
taskId00Partitions);
         taskManager.tasks().addPendingTaskToRecycle(taskId01, 
taskId01Partitions);
         taskManager.tasks().addPendingTaskToUpdateInputPartitions(taskId03, 
taskId03Partitions);

Reply via email to