This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new f0faae2  KAFKA-12523: handle TaskCorruption and TimeoutException 
during handleCorruption  and handleRevocation (#10407)
f0faae2 is described below

commit f0faae28fa0eea3ee835765afed521f0418c6843
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Mon Mar 29 14:06:22 2021 -0700

    KAFKA-12523: handle TaskCorruption and TimeoutException during 
handleCorruption  and handleRevocation (#10407)
    
    Need to handle TaskCorruptedException and TimeoutException that can be 
thrown from offset commit during handleRevocation or handleCorruption
    
    Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../streams/processor/internals/AbstractTask.java  |   1 +
 .../streams/processor/internals/StreamTask.java    |   9 +-
 .../streams/processor/internals/TaskManager.java   | 155 +++++++----
 .../kafka/streams/processor/internals/Tasks.java   |  10 +
 .../processor/internals/StreamTaskTest.java        |  18 ++
 .../processor/internals/TaskManagerTest.java       | 293 +++++++++++++++++++++
 6 files changed, 433 insertions(+), 53 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 0e22967..771a400 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -132,6 +132,7 @@ public abstract class AbstractTask implements Task {
     @Override
     public void revive() {
         if (state == CLOSED) {
+            clearTaskTimeout();
             transitionTo(CREATED);
         } else {
             throw new IllegalStateException("Illegal state " + state() + " 
while reviving task " + id);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3b21d56..99d6ac6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -486,8 +486,12 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 throw new IllegalStateException("Unknown state " + state() + " 
while post committing active task " + id);
         }
 
-        commitRequested = false;
+        clearCommitStatuses();
+    }
+
+    private void clearCommitStatuses() {
         commitNeeded = false;
+        commitRequested = false;
         hasPendingTxCommit = false;
     }
 
@@ -503,6 +507,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     public void closeClean() {
         validateClean();
         removeAllSensors();
+        clearCommitStatuses();
         close(true);
         log.info("Closed clean");
     }
@@ -510,6 +515,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     @Override
     public void closeDirty() {
         removeAllSensors();
+        clearCommitStatuses();
         close(false);
         log.info("Closed dirty");
     }
@@ -524,6 +530,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     public void closeCleanAndRecycleState() {
         validateClean();
         removeAllSensors();
+        clearCommitStatuses();
         switch (state()) {
             case SUSPENDED:
                 stateMgr.recycle();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c73de3c..30e7245 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -155,40 +155,55 @@ public class TaskManager {
      * @throws TaskMigratedException
      */
     void handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = 
new HashMap<>();
-        final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new 
HashMap<>();
+        final Set<Task> corruptedActiveTasks = new HashSet<>();
+        final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
             final Task task = tasks.task(taskId);
             if (task.isActive()) {
-                corruptedActiveTasks.put(task, task.changelogPartitions());
+                corruptedActiveTasks.add(task);
             } else {
-                corruptedStandbyTasks.put(task, task.changelogPartitions());
+                corruptedStandbyTasks.add(task);
             }
         }
 
         // Make sure to clean up any corrupted standby tasks in their entirety 
before committing
         // since TaskMigrated can be thrown and the resulting handleLostAll 
will only clean up active tasks
-        closeAndRevive(corruptedStandbyTasks);
-
-        commit(tasks()
-                   .values()
-                   .stream()
-                   .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
-                   .filter(t -> !corruptedTasks.contains(t.id()))
-                   .collect(Collectors.toSet())
-        );
+        closeDirtyAndRevive(corruptedStandbyTasks, true);
+
+        // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
+        try {
+            commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()
+                       .values()
+                       .stream()
+                       .filter(t -> t.state() == Task.State.RUNNING || 
t.state() == Task.State.RESTORING)
+                       .filter(t -> !corruptedTasks.contains(t.id()))
+                       .collect(Collectors.toSet()),
+                                new HashMap<>()
+            );
+        } catch (final TaskCorruptedException e) {
+            log.info("Some additional tasks were found corrupted while trying 
to commit, these will be added to the " +
+                         "tasks to clean and revive: {}", e.corruptedTasks());
+            corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+        } catch (final TimeoutException e) {
+            log.info("Hit TimeoutException when committing all non-corrupted 
tasks, these will be closed and revived");
+            final Collection<Task> uncorruptedTasks = new 
HashSet<>(tasks.activeTasks());
+            uncorruptedTasks.removeAll(corruptedActiveTasks);
+            // Those tasks which just timed out can just be closed dirty 
without marking changelogs as corrupted
+            closeDirtyAndRevive(uncorruptedTasks, false);
+        }
 
-        closeAndRevive(corruptedActiveTasks);
+        closeDirtyAndRevive(corruptedActiveTasks, true);
     }
 
-    private void closeAndRevive(final Map<Task, Collection<TopicPartition>> 
taskWithChangelogs) {
-        for (final Map.Entry<Task, Collection<TopicPartition>> entry : 
taskWithChangelogs.entrySet()) {
-            final Task task = entry.getKey();
+    private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, final boolean markAsCorrupted) {
+        for (final Task task : taskWithChangelogs) {
+            final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
 
             // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-            final Collection<TopicPartition> corruptedPartitions = 
entry.getValue();
-            task.markChangelogAsCorrupted(corruptedPartitions);
+            if (markAsCorrupted) {
+                task.markChangelogAsCorrupted(corruptedPartitions);
+            }
 
             try {
                 // we do not need to take the returned offsets since we are 
not going to commit anyways;
@@ -201,8 +216,11 @@ public class TaskManager {
 
             try {
                 task.suspend();
+
                 // we need to enforce a checkpoint that removes the corrupted 
partitions
-                task.postCommit(true);
+                if (markAsCorrupted) {
+                    task.postCommit(true);
+                }
             } catch (final RuntimeException swallow) {
                 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
             }
@@ -332,8 +350,8 @@ public class TaskManager {
                 //    write the checkpoint file.
                 final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
                 if (!offsets.isEmpty()) {
-                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
-                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                    log.error("Task {} should have been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; this means it 
failed during last commit and hence should be closed dirty",
                             task.id(), offsets);
 
                     tasksToCloseDirty.add(task);
@@ -512,20 +530,35 @@ public class TaskManager {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
+        // as such we just need to skip those dirty tasks in the checkpoint
+        final Set<Task> dirtyTasks = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final TimeoutException e) {
+            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+            // If we hit a TimeoutException it must be ALOS, just close dirty 
and revive without wiping the state
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+            closeDirtyAndRevive(dirtyTasks, false);
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
             firstException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
         }
 
-        // only try to complete post-commit if committing succeeded;
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just
-        // proceed normally, if it is going to be closed we would checkpoint 
by then
-        if (firstException.get() == null) {
-            for (final Task task : revokedActiveTasks) {
+        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
+        // going to be closed we would checkpoint by then
+        for (final Task task : revokedActiveTasks) {
+            if (!dirtyTasks.contains(task)) {
                 try {
                     task.postCommit(true);
                 } catch (final RuntimeException e) {
@@ -533,9 +566,11 @@ public class TaskManager {
                     firstException.compareAndSet(null, e);
                 }
             }
+        }
 
-            if (shouldCommitAdditionalTasks) {
-                for (final Task task : commitNeededActiveTasks) {
+        if (shouldCommitAdditionalTasks) {
+            for (final Task task : commitNeededActiveTasks) {
+                if (!dirtyTasks.contains(task)) {
                     try {
                         // for non-revoking active tasks, we should not 
enforce checkpoint
                         // since if it is EOS enabled, no checkpoint should be 
written while
@@ -975,42 +1010,53 @@ public class TaskManager {
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
+     * @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS)
+     * @throws TaskCorruptedException if committing offsets failed due to 
TimeoutException (EOS)
      * @return number of committed offsets, or -1 if we are in the middle of a 
rebalance and cannot commit
      */
     int commit(final Collection<Task> tasksToCommit) {
+        int committed = 0;
         if (rebalanceInProgress) {
-            return -1;
+            committed = -1;
         } else {
-            int committed = 0;
             final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            for (final Task task : tasksToCommit) {
-                if (task.commitNeeded()) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata = task.prepareCommit();
-                    if (task.isActive()) {
-                        consumedOffsetsAndMetadataPerTask.put(task, 
offsetAndMetadata);
-                    }
-                }
-            }
-
             try {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
-                for (final Task task : tasksToCommit) {
-                    if (task.commitNeeded()) {
-                        task.clearTaskTimeout();
-                        ++committed;
-                        task.postCommit(false);
-                    }
-                }
+                committed = 
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, 
consumedOffsetsAndMetadataPerTask);
             } catch (final TimeoutException timeoutException) {
                 consumedOffsetsAndMetadataPerTask
                     .keySet()
                     .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
             }
+        }
+        return committed;
+    }
 
+    /**
+     * @param consumedOffsetsAndMetadataPerTask an empty map that will be 
filled in with the prepared offsets
+     */
+    private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final 
Collection<Task> tasksToCommit,
+                                                                    final 
Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask) {
+        int committed = 0;
 
-            return committed;
+        for (final Task task : tasksToCommit) {
+            if (task.commitNeeded()) {
+                final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata 
= task.prepareCommit();
+                if (task.isActive()) {
+                    consumedOffsetsAndMetadataPerTask.put(task, 
offsetAndMetadata);
+                }
+            }
         }
+
+        commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+
+        for (final Task task : tasksToCommit) {
+            if (task.commitNeeded()) {
+                task.clearTaskTimeout();
+                ++committed;
+                task.postCommit(false);
+            }
+        }
+        return committed;
     }
 
     /**
@@ -1030,6 +1076,11 @@ public class TaskManager {
         }
     }
 
+    /**
+     * @throws TaskMigratedException   if committing offsets failed due to 
CommitFailedException (non-EOS)
+     * @throws TimeoutException        if committing offsets failed due to 
TimeoutException (non-EOS)
+     * @throws TaskCorruptedException  if committing offsets failed due to 
TimeoutException (EOS)
+     */
     private void commitOffsetsOrTransaction(final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
         log.debug("Committing task offsets {}", 
offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t -> 
t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index e12290f..4193deb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+import java.util.HashSet;
 import org.slf4j.Logger;
 
 import java.util.Collection;
@@ -234,6 +236,14 @@ class Tasks {
         return allTasksPerId.get(taskId);
     }
 
+    Collection<Task> tasks(final Collection<TaskId> taskIds) {
+        final Set<Task> tasks = new HashSet<>();
+        for (final TaskId taskId : taskIds) {
+            tasks.add(task(taskId));
+        }
+        return tasks;
+    }
+
     // TODO: change return type to `StreamTask`
     Collection<Task> activeTasks() {
         return readOnlyActiveTasks;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9832f14..ec5d295 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2123,6 +2123,24 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldClearCommitStatusesInCloseDirty() {
+        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        task.addRecords(partition1, 
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+        assertTrue(task.process(0L));
+        task.requestCommit();
+
+        task.suspend();
+        assertThat(task.commitNeeded(), is(true));
+        assertThat(task.commitRequested(), is(true));
+        task.closeDirty();
+        assertThat(task.commitNeeded(), is(false));
+        assertThat(task.commitRequested(), is(false));
+    }
+
+    @Test
     public void closeShouldBeIdempotent() {
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 00d4012..c618cd3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -107,6 +107,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(EasyMockRunner.class)
 public class TaskManagerTest {
@@ -116,12 +117,16 @@ public class TaskManagerTest {
 
     private final TaskId taskId00 = new TaskId(0, 0);
     private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
+    private final TopicPartition t1p0changelog = new 
TopicPartition("changelog", 0);
     private final Set<TopicPartition> taskId00Partitions = mkSet(t1p0);
+    private final Set<TopicPartition> taskId00ChangelogPartitions = 
mkSet(t1p0changelog);
     private final Map<TaskId, Set<TopicPartition>> taskId00Assignment = 
singletonMap(taskId00, taskId00Partitions);
 
     private final TaskId taskId01 = new TaskId(0, 1);
     private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
+    private final TopicPartition t1p1changelog = new 
TopicPartition("changelog", 1);
     private final Set<TopicPartition> taskId01Partitions = mkSet(t1p1);
+    private final Set<TopicPartition> taskId01ChangelogPartitions = 
mkSet(t1p1changelog);
     private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = 
singletonMap(taskId01, taskId01Partitions);
 
     private final TaskId taskId02 = new TaskId(0, 2);
@@ -815,6 +820,281 @@ public class TaskManagerTest {
     }
 
     @Test
+    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS()
 {
+        final ProcessorStateManager stateManager = 
EasyMock.createStrictMock(ProcessorStateManager.class);
+        stateManager.markChangelogAsCorrupted(taskId00Partitions);
+        replay(stateManager);
+
+        final StateMachineTask corruptedActive = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+        final StateMachineTask uncorruptedActive = new 
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
+            @Override
+            public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions) {
+                fail("Should not try to mark changelogs as corrupted for 
uncorrupted task");
+            }
+        };
+        final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
+        uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
+
+        // handleAssignment
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.putAll(taskId00Assignment);
+        assignment.putAll(taskId01Assignment);
+        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
+        expectLastCall().anyTimes();
+
+        expectRestoreToBeCompleted(consumer, changeLogReader);
+
+        consumer.commitSync(offsets);
+        expectLastCall().andThrow(new TimeoutException());
+
+        expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions));
+
+        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer, changeLogReader);
+
+        taskManager.handleAssignment(assignment, emptyMap());
+        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+
+        assertThat(uncorruptedActive.state(), is(Task.State.RUNNING));
+        assertThat(corruptedActive.state(), is(Task.State.RUNNING));
+
+        // make sure this will be committed and throw
+        uncorruptedActive.setCommitNeeded();
+        corruptedActive.setChangelogOffsets(singletonMap(t1p0, 0L));
+
+        assertThat(uncorruptedActive.commitPrepared, is(false));
+        assertThat(uncorruptedActive.commitNeeded, is(true));
+        assertThat(uncorruptedActive.commitCompleted, is(false));
+        assertThat(corruptedActive.commitPrepared, is(false));
+        assertThat(corruptedActive.commitNeeded, is(false));
+        assertThat(corruptedActive.commitCompleted, is(false));
+
+        taskManager.handleCorruption(singleton(taskId00));
+
+        assertThat(uncorruptedActive.commitPrepared, is(true));
+        assertThat(uncorruptedActive.commitNeeded, is(false));
+        assertThat(uncorruptedActive.commitCompleted, is(false)); //if not 
corrupted, we should close dirty without committing
+        assertThat(corruptedActive.commitPrepared, is(true));
+        assertThat(corruptedActive.commitNeeded, is(false));
+        assertThat(corruptedActive.commitCompleted, is(true)); //if corrupted, 
should enforce checkpoint with corrupted tasks removed
+
+        assertThat(corruptedActive.state(), is(Task.State.CREATED));
+        assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
+        verify(consumer);
+    }
+
+    @Test
+    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
 {
+        setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA);
+        final StreamsProducer producer = mock(StreamsProducer.class);
+        expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+        final ProcessorStateManager stateManager = 
EasyMock.createMock(ProcessorStateManager.class);
+
+        final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new 
AtomicBoolean(false);
+        final StateMachineTask corruptedActiveTask = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
+            @Override
+            public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions) {
+                super.markChangelogAsCorrupted(partitions);
+                corruptedTaskChangelogMarkedAsCorrupted.set(true);
+            }
+        };
+        stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
+
+        final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new 
AtomicBoolean(false);
+        final StateMachineTask uncorruptedActiveTask = new 
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
+            @Override
+            public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions) {
+                super.markChangelogAsCorrupted(partitions);
+                uncorruptedTaskChangelogMarkedAsCorrupted.set(true);
+            }
+        };
+        final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
+        uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
+        stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
+
+        // handleAssignment
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.putAll(taskId00Assignment);
+        assignment.putAll(taskId01Assignment);
+        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActiveTask, 
uncorruptedActiveTask));
+        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
+        expectLastCall().anyTimes();
+
+        expectRestoreToBeCompleted(consumer, changeLogReader);
+
+        final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
+        expect(consumer.groupMetadata()).andReturn(groupMetadata);
+        producer.commitTransaction(offsets, groupMetadata);
+        expectLastCall().andThrow(new TimeoutException());
+
+        expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions));
+
+        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer, changeLogReader, stateManager, producer);
+
+        taskManager.handleAssignment(assignment, emptyMap());
+        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+
+        assertThat(uncorruptedActiveTask.state(), is(Task.State.RUNNING));
+        assertThat(corruptedActiveTask.state(), is(Task.State.RUNNING));
+
+        // make sure this will be committed and throw
+        uncorruptedActiveTask.setCommitNeeded();
+
+        final Map<TopicPartition, Long> corruptedActiveTaskChangelogOffsets = 
singletonMap(t1p0changelog, 0L);
+        
corruptedActiveTask.setChangelogOffsets(corruptedActiveTaskChangelogOffsets);
+        final Map<TopicPartition, Long> uncorruptedActiveTaskChangelogOffsets 
= singletonMap(t1p1changelog, 0L);
+        
uncorruptedActiveTask.setChangelogOffsets(uncorruptedActiveTaskChangelogOffsets);
+
+        assertThat(uncorruptedActiveTask.commitPrepared, is(false));
+        assertThat(uncorruptedActiveTask.commitNeeded, is(true));
+        assertThat(uncorruptedActiveTask.commitCompleted, is(false));
+        assertThat(corruptedActiveTask.commitPrepared, is(false));
+        assertThat(corruptedActiveTask.commitNeeded, is(false));
+        assertThat(corruptedActiveTask.commitCompleted, is(false));
+
+        taskManager.handleCorruption(singleton(taskId00));
+
+        assertThat(uncorruptedActiveTask.commitPrepared, is(true));
+        assertThat(uncorruptedActiveTask.commitNeeded, is(false));
+        assertThat(uncorruptedActiveTask.commitCompleted, is(true)); //if 
corrupted due to timeout on commit, should enforce checkpoint with corrupted 
tasks removed
+        assertThat(corruptedActiveTask.commitPrepared, is(true));
+        assertThat(corruptedActiveTask.commitNeeded, is(false));
+        assertThat(corruptedActiveTask.commitCompleted, is(true)); //if 
corrupted, should enforce checkpoint with corrupted tasks removed
+
+        assertThat(corruptedActiveTask.state(), is(Task.State.CREATED));
+        assertThat(uncorruptedActiveTask.state(), is(Task.State.CREATED));
+        assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
+        assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
+        verify(consumer);
+    }
+
+    @Test
+    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS()
 {
+        final StateMachineTask revokedActiveTask = new 
StateMachineTask(taskId00, taskId00Partitions, true);
+        final Map<TopicPartition, OffsetAndMetadata> offsets00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
+        revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
+        revokedActiveTask.setCommitNeeded();
+
+        final StateMachineTask unrevokedActiveTaskWithCommitNeeded = new 
StateMachineTask(taskId01, taskId01Partitions, true) {
+            @Override
+            public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions) {
+                fail("Should not try to mark changelogs as corrupted for 
uncorrupted task");
+            }
+        };
+        final Map<TopicPartition, OffsetAndMetadata> offsets01 = 
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
+        
unrevokedActiveTaskWithCommitNeeded.setCommittableOffsetsAndMetadata(offsets01);
+        unrevokedActiveTaskWithCommitNeeded.setCommitNeeded();
+
+        final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new 
StateMachineTask(taskId02, taskId02Partitions, true);
+
+        final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets 
= new HashMap<>();
+        expectedCommittedOffsets.putAll(offsets00);
+        expectedCommittedOffsets.putAll(offsets01);
+
+        final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
+            mkEntry(taskId00, taskId00Partitions),
+            mkEntry(taskId01, taskId01Partitions),
+            mkEntry(taskId02, taskId02Partitions)
+        );
+
+        expectRestoreToBeCompleted(consumer, changeLogReader);
+
+        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(asList(revokedActiveTask, 
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+        expectLastCall();
+        consumer.commitSync(expectedCommittedOffsets);
+        expectLastCall().andThrow(new TimeoutException());
+        expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
+
+        replay(activeTaskCreator, standbyTaskCreator, consumer, 
changeLogReader);
+
+        taskManager.handleAssignment(assignmentActive, emptyMap());
+        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        assertThat(revokedActiveTask.state(), is(Task.State.RUNNING));
+        assertThat(unrevokedActiveTaskWithCommitNeeded.state(), 
is(State.RUNNING));
+        assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(Task.State.RUNNING));
+
+        taskManager.handleRevocation(taskId00Partitions);
+
+        assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
+        assertThat(unrevokedActiveTaskWithCommitNeeded.state(), 
is(State.CREATED));
+        assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(State.RUNNING));
+    }
+
+    @Test
+    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
 {
+        setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA);
+        final StreamsProducer producer = mock(StreamsProducer.class);
+        expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+        final ProcessorStateManager stateManager = 
EasyMock.createMock(ProcessorStateManager.class);
+
+        final StateMachineTask revokedActiveTask = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+        final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets 
= singletonMap(t1p0, new OffsetAndMetadata(0L, null));
+        
revokedActiveTask.setCommittableOffsetsAndMetadata(revokedActiveTaskOffsets);
+        revokedActiveTask.setCommitNeeded();
+
+        final AtomicBoolean unrevokedTaskChangelogMarkedAsCorrupted = new 
AtomicBoolean(false);
+        final StateMachineTask unrevokedActiveTask = new 
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
+            @Override
+            public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions) {
+                super.markChangelogAsCorrupted(partitions);
+                unrevokedTaskChangelogMarkedAsCorrupted.set(true);
+            }
+        };
+        final Map<TopicPartition, OffsetAndMetadata> unrevokedTaskOffsets = 
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
+        
unrevokedActiveTask.setCommittableOffsetsAndMetadata(unrevokedTaskOffsets);
+        unrevokedActiveTask.setCommitNeeded();
+
+        final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new 
StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
+
+        final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets 
= new HashMap<>();
+        expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
+        expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
+
+        stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
+        stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
+
+        final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
+            mkEntry(taskId00, taskId00Partitions),
+            mkEntry(taskId01, taskId01Partitions),
+            mkEntry(taskId02, taskId02Partitions)
+            );
+
+        expectRestoreToBeCompleted(consumer, changeLogReader);
+
+        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+        expectLastCall();
+
+        final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
+        expect(consumer.groupMetadata()).andReturn(groupMetadata);
+        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
+        expectLastCall().andThrow(new TimeoutException());
+
+        expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
+
+        replay(activeTaskCreator, standbyTaskCreator, consumer, 
changeLogReader, producer, stateManager);
+
+        taskManager.handleAssignment(assignmentActive, emptyMap());
+        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        assertThat(revokedActiveTask.state(), is(Task.State.RUNNING));
+        assertThat(unrevokedActiveTask.state(), is(Task.State.RUNNING));
+        assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(State.RUNNING));
+
+        final Map<TopicPartition, Long> revokedActiveTaskChangelogOffsets = 
singletonMap(t1p0changelog, 0L);
+        
revokedActiveTask.setChangelogOffsets(revokedActiveTaskChangelogOffsets);
+        final Map<TopicPartition, Long> unrevokedActiveTaskChangelogOffsets = 
singletonMap(t1p1changelog, 0L);
+        
unrevokedActiveTask.setChangelogOffsets(unrevokedActiveTaskChangelogOffsets);
+
+        taskManager.handleRevocation(taskId00Partitions);
+
+        assertThat(unrevokedTaskChangelogMarkedAsCorrupted.get(), is(true));
+        assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
+        assertThat(unrevokedActiveTask.state(), is(State.CREATED));
+        assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(State.RUNNING));
+    }
+
+    @Test
     public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
false);
 
@@ -1056,6 +1336,7 @@ public class TaskManagerTest {
 
         expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
             .andReturn(asList(task00, task01, task02));
+
         expect(activeTaskCreator.threadProducer()).andReturn(producer);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
@@ -2926,9 +3207,12 @@ public class TaskManagerTest {
 
     private static class StateMachineTask extends AbstractTask implements Task 
{
         private final boolean active;
+
+        // TODO: KAFKA-12569 clean up usage of these flags and use the new 
commitCompleted flag where appropriate
         private boolean commitNeeded = false;
         private boolean commitRequested = false;
         private boolean commitPrepared = false;
+        private boolean commitCompleted = false;
         private Map<TopicPartition, OffsetAndMetadata> committableOffsets = 
Collections.emptyMap();
         private Map<TopicPartition, Long> purgeableOffsets;
         private Map<TopicPartition, Long> changelogOffsets = 
Collections.emptyMap();
@@ -3006,6 +3290,7 @@ public class TaskManagerTest {
         @Override
         public void postCommit(final boolean enforceCheckpoint) {
             commitNeeded = false;
+            commitCompleted = true;
         }
 
         @Override
@@ -3027,6 +3312,14 @@ public class TaskManagerTest {
         }
 
         @Override
+        public void revive() {
+            //TODO: KAFKA-12569 move clearing of commit-required statuses to 
closeDirty/Clean/AndRecycle methods
+            commitNeeded = false;
+            commitRequested = false;
+            super.revive();
+        }
+
+        @Override
         public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
                                                 final Exception cause) {
             timeout = currentWallClockMs;

Reply via email to