This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 36f4c7c KAFKA-10616: Always call prepare-commit before suspending for
active tasks (#9464)
36f4c7c is described below
commit 36f4c7c96ed6421af35ef40daee337a04d562947
Author: Guozhang Wang <[email protected]>
AuthorDate: Mon Oct 26 14:24:05 2020 -0700
KAFKA-10616: Always call prepare-commit before suspending for active tasks
(#9464)
Today for active tasks we the following active task suspension:
1. closeAndRevive in handleTaskCorruption.
2. closeClean in assignor#onAssignment.
3. closeClean in shutdown.
4. closeDirty in assignor#onAssignment.
5. closeDirty in listener#onPartitionsLost.
6. closeDirty in shutdown.
7. suspend in listener#onPartitionsRevoked.
Among those, 1/4/5/6 do not call prepareCommit which would
stateManager#flushCache and may cause illegal state manager. This PR would
require a prepareCommit triggered before suspend.
Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
.../kafka/streams/processor/internals/TaskManager.java | 17 +++++++++++++++++
.../streams/processor/internals/TaskManagerTest.java | 17 ++++++++++++-----
2 files changed, 29 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 8f5c9a8..85dc41e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -192,6 +192,15 @@ public class TaskManager {
task.markChangelogAsCorrupted(corruptedPartitions);
try {
+ // we do not need to take the returned offsets since we are
not going to commit anyways;
+ // this call is only used for active tasks to flush the cache
before suspending and
+ // closing the topology
+ task.prepareCommit();
+ } catch (final RuntimeException swallow) {
+ log.error("Error flushing cache for corrupted task {} ",
task.id(), swallow);
+ }
+
+ try {
task.suspend();
// we need to enforce a checkpoint that removes the corrupted
partitions
task.postCommit(true);
@@ -761,6 +770,14 @@ public class TaskManager {
private void closeTaskDirty(final Task task) {
try {
+ // we call this function only to flush the case if necessary
+ // before suspending and closing the topology
+ task.prepareCommit();
+ } catch (final RuntimeException swallow) {
+ log.error("Error flushing caches of dirty task {} ", task.id(),
swallow);
+ }
+
+ try {
task.suspend();
} catch (final RuntimeException swallow) {
log.error("Error suspending dirty task {} ", task.id(), swallow);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 6018dbb..e2a3049 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -458,8 +458,8 @@ public class TaskManagerTest {
@Test
public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception
{
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true);
- final Task task01 = new StateMachineTask(taskId01, taskId01Partitions,
false);
+ final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true);
+ final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, false);
// `handleAssignment`
expectRestoreToBeCompleted(consumer, changeLogReader);
@@ -491,6 +491,7 @@ public class TaskManagerTest {
assertThat(task01.state(), is(Task.State.RUNNING));
taskManager.handleLostAll();
+ assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
@@ -565,7 +566,7 @@ public class TaskManagerTest {
replay(stateManager);
final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
+ final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
@Override
public void postCommit(final boolean enforceCheckpoint) {
if (enforceCheckpoint) {
@@ -595,6 +596,8 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleCorruption(singletonMap(taskId00,
taskId00Partitions));
+
+ assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(enforcedCheckpoint.get(), is(true));
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00,
task00)));
@@ -610,7 +613,7 @@ public class TaskManagerTest {
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
+ final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
@Override
public void suspend() {
super.suspend();
@@ -637,6 +640,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleCorruption(singletonMap(taskId00,
taskId00Partitions));
+ assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00,
task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
@@ -761,6 +765,8 @@ public class TaskManagerTest {
assertThrows(TaskMigratedException.class, () ->
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
+
+ assertThat(corruptedStandby.commitPrepared, is(true));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
verify(consumer);
}
@@ -2734,8 +2740,9 @@ public class TaskManagerTest {
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ commitPrepared = true;
+
if (commitNeeded) {
- commitPrepared = true;
return committableOffsets;
} else {
return Collections.emptyMap();