This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 08849bc HOTFIX: move rebalanceInProgress check to skip commit during
handleCorrupted (#10444)
08849bc is described below
commit 08849bc3909d4fabda965c8ca7f78b0feb5473d2
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Mar 30 18:55:38 2021 -0700
HOTFIX: move rebalanceInProgress check to skip commit during
handleCorrupted (#10444)
Minor followup to #10407 -- we need to extract the rebalanceInProgress
check down into the commitAndFillInConsumedOffsetsAndMetadataPerTaskMap method
which is invoked during handleCorrupted, otherwise we may attempt to commit
during a a rebalance which will fail
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 36 ++++++++++-----
.../processor/internals/TaskManagerTest.java | 51 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 12 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 30e7245..ebacbfe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -535,6 +535,9 @@ public class TaskManager {
// as such we just need to skip those dirty tasks in the checkpoint
final Set<Task> dirtyTasks = new HashSet<>();
try {
+ // in handleRevocation we must call commitOffsetsOrTransaction()
directly rather than
+ // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make
sure we don't skip the
+ // offset commit because we are in a rebalance
commitOffsetsOrTransaction(consumedOffsetsPerTask);
} catch (final TaskCorruptedException e) {
log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
@@ -1016,28 +1019,34 @@ public class TaskManager {
*/
int commit(final Collection<Task> tasksToCommit) {
int committed = 0;
- if (rebalanceInProgress) {
- committed = -1;
- } else {
- final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
- try {
- committed =
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit,
consumedOffsetsAndMetadataPerTask);
- } catch (final TimeoutException timeoutException) {
- consumedOffsetsAndMetadataPerTask
- .keySet()
- .forEach(t ->
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
- }
+
+ final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
+ try {
+ committed =
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit,
consumedOffsetsAndMetadataPerTask);
+ } catch (final TimeoutException timeoutException) {
+ consumedOffsetsAndMetadataPerTask
+ .keySet()
+ .forEach(t ->
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
}
+
return committed;
}
/**
+ * @throws TaskMigratedException if committing offsets failed (non-EOS)
+ * or if the task producer got fenced (EOS)
+ * @throws TimeoutException if committing offsets failed due to
TimeoutException (non-EOS)
+ * @throws TaskCorruptedException if committing offsets failed due to
TimeoutException (EOS)
* @param consumedOffsetsAndMetadataPerTask an empty map that will be
filled in with the prepared offsets
+ * @return number of committed offsets, or -1 if we are in the middle of a
rebalance and cannot commit
*/
private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final
Collection<Task> tasksToCommit,
final
Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask) {
- int committed = 0;
+ if (rebalanceInProgress) {
+ return -1;
+ }
+ int committed = 0;
for (final Task task : tasksToCommit) {
if (task.commitNeeded()) {
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata
= task.prepareCommit();
@@ -1077,6 +1086,9 @@ public class TaskManager {
}
/**
+ * Caution: do not invoke this directly if it's possible a rebalance is
occurring, as the commit will fail. If
+ * this is a possibility, prefer the {@link
#commitAndFillInConsumedOffsetsAndMetadataPerTaskMap} instead.
+ *
* @throws TaskMigratedException if committing offsets failed due to
CommitFailedException (non-EOS)
* @throws TimeoutException if committing offsets failed due to
TimeoutException (non-EOS)
* @throws TaskCorruptedException if committing offsets failed due to
TimeoutException (EOS)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c618cd3..f5fa24e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -820,6 +820,57 @@ public class TaskManagerTest {
}
@Test
+ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
+ final ProcessorStateManager stateManager =
EasyMock.createNiceMock(ProcessorStateManager.class);
+ expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new
File[0]);
+
+ final StateMachineTask corruptedActive = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+
+ // make sure this will attempt to be committed and throw
+ final StateMachineTask uncorruptedActive = new
StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
+ uncorruptedActive.setCommitNeeded();
+
+ // handleAssignment
+ final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+ assignment.putAll(taskId00Assignment);
+ assignment.putAll(taskId01Assignment);
+ expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+ topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(),
anyString());
+ expectLastCall().anyTimes();
+ topologyBuilder.addSubscribedTopicsFromMetadata(eq(singleton(topic1)),
anyObject());
+ expectLastCall().anyTimes();
+
+ expectRestoreToBeCompleted(consumer, changeLogReader);
+
+ expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions));
+
+ replay(activeTaskCreator, standbyTaskCreator, topologyBuilder,
consumer, changeLogReader, stateDirectory, stateManager);
+
+ uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
+
+ taskManager.handleAssignment(assignment, emptyMap());
+ assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+
+ assertThat(uncorruptedActive.state(), is(Task.State.RUNNING));
+
+ assertThat(uncorruptedActive.commitPrepared, is(false));
+ assertThat(uncorruptedActive.commitNeeded, is(true));
+ assertThat(uncorruptedActive.commitCompleted, is(false));
+
+ taskManager.handleRebalanceStart(singleton(topic1));
+ assertThat(taskManager.isRebalanceInProgress(), is(true));
+ taskManager.handleCorruption(singleton(taskId00));
+
+ assertThat(uncorruptedActive.commitPrepared, is(false));
+ assertThat(uncorruptedActive.commitNeeded, is(true));
+ assertThat(uncorruptedActive.commitCompleted, is(false));
+
+ assertThat(uncorruptedActive.state(), is(State.RUNNING));
+ verify(consumer);
+ }
+
+ @Test
public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS()
{
final ProcessorStateManager stateManager =
EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);