This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 712ac52 KAFKA-9793: Expand the try-catch for task commit in
HandleAssignment (#8402)
712ac52 is described below
commit 712ac5203e6ce199bed8804f6173acf33d79a173
Author: Boyang Chen <[email protected]>
AuthorDate: Sun Apr 5 22:50:26 2020 -0700
KAFKA-9793: Expand the try-catch for task commit in HandleAssignment (#8402)
As title suggests, we would like to broaden this check so that we don't
fail to close a doom-to-cleanup task.
Reviewers: Guozhang Wang <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 20 ++---
.../processor/internals/TaskManagerTest.java | 90 ++++++++++++++--------
2 files changed, 69 insertions(+), 41 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a0a238c..22222cd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -232,24 +232,26 @@ public class TaskManager {
}
if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
- for (final Task task : additionalTasksForCommitting) {
- task.prepareCommit();
- final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.committableOffsetsAndMetadata();
- if (!committableOffsets.isEmpty()) {
- consumedOffsetsAndMetadataPerTask.put(task.id(),
committableOffsets);
+ try {
+ for (final Task task : additionalTasksForCommitting) {
+ task.prepareCommit();
+ final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.committableOffsetsAndMetadata();
+ if (!committableOffsets.isEmpty()) {
+ consumedOffsetsAndMetadataPerTask.put(task.id(),
committableOffsets);
+ }
}
- }
- try {
commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
for (final Task task : additionalTasksForCommitting) {
task.postCommit();
}
} catch (final RuntimeException e) {
- log.error("Failed to commit tasks that are " +
- "prepared to close clean, will close them as dirty
instead", e);
+ log.error("Failed to batch commit tasks, " +
+ "will close all tasks involved in this commit as dirty by
the end", e);
+ dirtyTasks.addAll(additionalTasksForCommitting);
dirtyTasks.addAll(checkpointPerTask.keySet());
+
checkpointPerTask.clear();
// Just add first taskId to re-throw by the end.
taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(),
e);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 67b2ec8..d4769d4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -154,7 +154,12 @@ public class TaskManagerTest {
@Before
public void setUp() {
- final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new
Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
+ setUpTaskManager(StreamThread.ProcessingMode.AT_LEAST_ONCE);
+ }
+
+ private void setUpTaskManager(final StreamThread.ProcessingMode
processingMode) {
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(new Metrics(), "clientId",
StreamsConfig.METRICS_LATEST);
taskManager = new TaskManager(
changeLogReader,
UUID.randomUUID(),
@@ -165,7 +170,7 @@ public class TaskManagerTest {
topologyBuilder,
adminClient,
stateDirectory,
- StreamThread.ProcessingMode.AT_LEAST_ONCE
+ processingMode
);
taskManager.setMainConsumer(consumer);
}
@@ -1229,22 +1234,56 @@ public class TaskManagerTest {
}
@Test
+ public void
shouldCloseActiveTasksDirtyAndPropagatePrepareCommitException() {
+ setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
+
+ final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true);
+
+ final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true) {
+ @Override
+ public void prepareCommit() {
+ throw new RuntimeException("task 0_1 prepare commit boom!");
+ }
+ };
+
+ task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new
OffsetAndMetadata(0L, null)));
+ task01.setCommitNeeded();
+
+ final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true);
+ final Map<TopicPartition, OffsetAndMetadata> offsetsT02 =
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
+
+ task02.setCommittableOffsetsAndMetadata(offsetsT02);
+ task02.setCommitNeeded();
+
+ taskManager.tasks().put(taskId00, task00);
+ taskManager.tasks().put(taskId01, task01);
+ taskManager.tasks().put(taskId02, task02);
+
+ checkOrder(activeTaskCreator, false);
+
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
+ expectLastCall();
+
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02);
+ expectLastCall();
+
+ replay(activeTaskCreator);
+
+ final RuntimeException thrown = assertThrows(RuntimeException.class,
+ () -> taskManager.handleAssignment(mkMap(mkEntry(taskId00,
taskId00Partitions),
+ mkEntry(taskId01, taskId01Partitions)),
Collections.emptyMap()));
+ assertThat(thrown.getCause().getMessage(), is("task 0_1 prepare commit
boom!"));
+
+ assertThat(task00.state(), is(Task.State.CREATED));
+ assertThat(task01.state(), is(Task.State.CLOSED));
+ assertThat(task02.state(), is(Task.State.CLOSED));
+
+ verify(activeTaskCreator);
+ }
+
+ @Test
public void shouldCloseActiveTasksDirtyAndPropagateCommitException() {
- final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
- new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
- taskManager = new TaskManager(
- changeLogReader,
- UUID.randomUUID(),
- "taskManagerTest",
- streamsMetrics,
- activeTaskCreator,
- standbyTaskCreator,
- topologyBuilder,
- adminClient,
- stateDirectory,
- StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA
- );
- taskManager.setMainConsumer(consumer);
+ setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true);
@@ -1252,7 +1291,6 @@ public class TaskManagerTest {
task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new
OffsetAndMetadata(0L, null)));
task01.setCommitNeeded();
-
final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true);
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 =
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@@ -1266,6 +1304,7 @@ public class TaskManagerTest {
expect(activeTaskCreator.streamsProducerForTask(taskId01)).andThrow(new
RuntimeException("task 0_1 producer boom!"));
checkOrder(activeTaskCreator, false);
+
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
expectLastCall();
@@ -1575,20 +1614,7 @@ public class TaskManagerTest {
final StreamsProducer
producer,
final Map<TopicPartition,
OffsetAndMetadata> offsetsT01,
final Map<TopicPartition,
OffsetAndMetadata> offsetsT02) {
- final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new
Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
- taskManager = new TaskManager(
- changeLogReader,
- UUID.randomUUID(),
- "taskManagerTest",
- streamsMetrics,
- activeTaskCreator,
- standbyTaskCreator,
- topologyBuilder,
- adminClient,
- stateDirectory,
- processingMode
- );
- taskManager.setMainConsumer(consumer);
+ setUpTaskManager(processingMode);
final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true);
task01.setCommittableOffsetsAndMetadata(offsetsT01);