This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 08849bc  HOTFIX: move rebalanceInProgress check to skip commit during 
handleCorrupted (#10444)
08849bc is described below

commit 08849bc3909d4fabda965c8ca7f78b0feb5473d2
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Mar 30 18:55:38 2021 -0700

    HOTFIX: move rebalanceInProgress check to skip commit during 
handleCorrupted (#10444)
    
    Minor followup to #10407 -- we need to extract the rebalanceInProgress 
check down into the commitAndFillInConsumedOffsetsAndMetadataPerTaskMap method 
which is invoked during handleCorrupted, otherwise we may attempt to commit 
during a a rebalance which will fail
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   | 36 ++++++++++-----
 .../processor/internals/TaskManagerTest.java       | 51 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 30e7245..ebacbfe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -535,6 +535,9 @@ public class TaskManager {
         // as such we just need to skip those dirty tasks in the checkpoint
         final Set<Task> dirtyTasks = new HashSet<>();
         try {
+            // in handleRevocation we must call commitOffsetsOrTransaction() 
directly rather than
+            // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make 
sure we don't skip the
+            // offset commit because we are in a rebalance
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
         } catch (final TaskCorruptedException e) {
             log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
@@ -1016,28 +1019,34 @@ public class TaskManager {
      */
     int commit(final Collection<Task> tasksToCommit) {
         int committed = 0;
-        if (rebalanceInProgress) {
-            committed = -1;
-        } else {
-            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            try {
-                committed = 
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, 
consumedOffsetsAndMetadataPerTask);
-            } catch (final TimeoutException timeoutException) {
-                consumedOffsetsAndMetadataPerTask
-                    .keySet()
-                    .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
-            }
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
+        try {
+            committed = 
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, 
consumedOffsetsAndMetadataPerTask);
+        } catch (final TimeoutException timeoutException) {
+            consumedOffsetsAndMetadataPerTask
+                .keySet()
+                .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
         }
+
         return committed;
     }
 
     /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     * @throws TimeoutException if committing offsets failed due to 
TimeoutException (non-EOS)
+     * @throws TaskCorruptedException if committing offsets failed due to 
TimeoutException (EOS)
      * @param consumedOffsetsAndMetadataPerTask an empty map that will be 
filled in with the prepared offsets
+     * @return number of committed offsets, or -1 if we are in the middle of a 
rebalance and cannot commit
      */
     private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final 
Collection<Task> tasksToCommit,
                                                                     final 
Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask) {
-        int committed = 0;
+        if (rebalanceInProgress) {
+            return -1;
+        }
 
+        int committed = 0;
         for (final Task task : tasksToCommit) {
             if (task.commitNeeded()) {
                 final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata 
= task.prepareCommit();
@@ -1077,6 +1086,9 @@ public class TaskManager {
     }
 
     /**
+     * Caution: do not invoke this directly if it's possible a rebalance is 
occurring, as the commit will fail. If
+     * this is a possibility, prefer the {@link 
#commitAndFillInConsumedOffsetsAndMetadataPerTaskMap} instead.
+     *
      * @throws TaskMigratedException   if committing offsets failed due to 
CommitFailedException (non-EOS)
      * @throws TimeoutException        if committing offsets failed due to 
TimeoutException (non-EOS)
      * @throws TaskCorruptedException  if committing offsets failed due to 
TimeoutException (EOS)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c618cd3..f5fa24e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -820,6 +820,57 @@ public class TaskManagerTest {
     }
 
     @Test
+    public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
+        final ProcessorStateManager stateManager = 
EasyMock.createNiceMock(ProcessorStateManager.class);
+        expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new 
File[0]);
+
+        final StateMachineTask corruptedActive = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+
+        // make sure this will attempt to be committed and throw
+        final StateMachineTask uncorruptedActive = new 
StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
+        final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
+        uncorruptedActive.setCommitNeeded();
+
+        // handleAssignment
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.putAll(taskId00Assignment);
+        assignment.putAll(taskId01Assignment);
+        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
+        expectLastCall().anyTimes();
+        topologyBuilder.addSubscribedTopicsFromMetadata(eq(singleton(topic1)), 
anyObject());
+        expectLastCall().anyTimes();
+
+        expectRestoreToBeCompleted(consumer, changeLogReader);
+
+        expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions));
+
+        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer, changeLogReader, stateDirectory, stateManager);
+
+        uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
+
+        taskManager.handleAssignment(assignment, emptyMap());
+        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+
+        assertThat(uncorruptedActive.state(), is(Task.State.RUNNING));
+
+        assertThat(uncorruptedActive.commitPrepared, is(false));
+        assertThat(uncorruptedActive.commitNeeded, is(true));
+        assertThat(uncorruptedActive.commitCompleted, is(false));
+
+        taskManager.handleRebalanceStart(singleton(topic1));
+        assertThat(taskManager.isRebalanceInProgress(), is(true));
+        taskManager.handleCorruption(singleton(taskId00));
+
+        assertThat(uncorruptedActive.commitPrepared, is(false));
+        assertThat(uncorruptedActive.commitNeeded, is(true));
+        assertThat(uncorruptedActive.commitCompleted, is(false));
+
+        assertThat(uncorruptedActive.state(), is(State.RUNNING));
+        verify(consumer);
+    }
+
+    @Test
     public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS()
 {
         final ProcessorStateManager stateManager = 
EasyMock.createStrictMock(ProcessorStateManager.class);
         stateManager.markChangelogAsCorrupted(taskId00Partitions);

Reply via email to