This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 36f4c7c  KAFKA-10616: Always call prepare-commit before suspending for 
active tasks (#9464)
36f4c7c is described below

commit 36f4c7c96ed6421af35ef40daee337a04d562947
Author: Guozhang Wang <[email protected]>
AuthorDate: Mon Oct 26 14:24:05 2020 -0700

    KAFKA-10616: Always call prepare-commit before suspending for active tasks 
(#9464)
    
    Today for active tasks we the following active task suspension:
    
    1. closeAndRevive in handleTaskCorruption.
    2. closeClean in assignor#onAssignment.
    3. closeClean in shutdown.
    4. closeDirty in assignor#onAssignment.
    5. closeDirty in listener#onPartitionsLost.
    6. closeDirty in shutdown.
    7. suspend in listener#onPartitionsRevoked.
    
    Among those, 1/4/5/6 do not call prepareCommit which would 
stateManager#flushCache and may cause illegal state manager. This PR would 
require a prepareCommit triggered before suspend.
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
 .../kafka/streams/processor/internals/TaskManager.java  | 17 +++++++++++++++++
 .../streams/processor/internals/TaskManagerTest.java    | 17 ++++++++++++-----
 2 files changed, 29 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 8f5c9a8..85dc41e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -192,6 +192,15 @@ public class TaskManager {
             task.markChangelogAsCorrupted(corruptedPartitions);
 
             try {
+                // we do not need to take the returned offsets since we are 
not going to commit anyways;
+                // this call is only used for active tasks to flush the cache 
before suspending and
+                // closing the topology
+                task.prepareCommit();
+            } catch (final RuntimeException swallow) {
+                log.error("Error flushing cache for corrupted task {} ", 
task.id(), swallow);
+            }
+
+            try {
                 task.suspend();
                 // we need to enforce a checkpoint that removes the corrupted 
partitions
                 task.postCommit(true);
@@ -761,6 +770,14 @@ public class TaskManager {
 
     private void closeTaskDirty(final Task task) {
         try {
+            // we call this function only to flush the case if necessary
+            // before suspending and closing the topology
+            task.prepareCommit();
+        } catch (final RuntimeException swallow) {
+            log.error("Error flushing caches of dirty task {} ", task.id(), 
swallow);
+        }
+
+        try {
             task.suspend();
         } catch (final RuntimeException swallow) {
             log.error("Error suspending dirty task {} ", task.id(), swallow);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 6018dbb..e2a3049 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -458,8 +458,8 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception 
{
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true);
-        final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, 
false);
+        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true);
+        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false);
 
         // `handleAssignment`
         expectRestoreToBeCompleted(consumer, changeLogReader);
@@ -491,6 +491,7 @@ public class TaskManagerTest {
         assertThat(task01.state(), is(Task.State.RUNNING));
 
         taskManager.handleLostAll();
+        assertThat(task00.commitPrepared, is(true));
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(task01.state(), is(Task.State.RUNNING));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
@@ -565,7 +566,7 @@ public class TaskManagerTest {
         replay(stateManager);
 
         final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true, stateManager) {
+        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
             @Override
             public void postCommit(final boolean enforceCheckpoint) {
                 if (enforceCheckpoint) {
@@ -595,6 +596,8 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.RUNNING));
 
         taskManager.handleCorruption(singletonMap(taskId00, 
taskId00Partitions));
+
+        assertThat(task00.commitPrepared, is(true));
         assertThat(task00.state(), is(Task.State.CREATED));
         assertThat(enforcedCheckpoint.get(), is(true));
         assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, 
task00)));
@@ -610,7 +613,7 @@ public class TaskManagerTest {
         stateManager.markChangelogAsCorrupted(taskId00Partitions);
         replay(stateManager);
 
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true, stateManager) {
+        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
             @Override
             public void suspend() {
                 super.suspend();
@@ -637,6 +640,7 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.RUNNING));
 
         taskManager.handleCorruption(singletonMap(taskId00, 
taskId00Partitions));
+        assertThat(task00.commitPrepared, is(true));
         assertThat(task00.state(), is(Task.State.CREATED));
         assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, 
task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
@@ -761,6 +765,8 @@ public class TaskManagerTest {
 
         assertThrows(TaskMigratedException.class, () -> 
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
 
+
+        assertThat(corruptedStandby.commitPrepared, is(true));
         assertThat(corruptedStandby.state(), is(Task.State.CREATED));
         verify(consumer);
     }
@@ -2734,8 +2740,9 @@ public class TaskManagerTest {
 
         @Override
         public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+            commitPrepared = true;
+
             if (commitNeeded) {
-                commitPrepared = true;
                 return committableOffsets;
             } else {
                 return Collections.emptyMap();

Reply via email to