This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new f0faae2 KAFKA-12523: handle TaskCorruption and TimeoutException
during handleCorruption and handleRevocation (#10407)
f0faae2 is described below
commit f0faae28fa0eea3ee835765afed521f0418c6843
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Mon Mar 29 14:06:22 2021 -0700
KAFKA-12523: handle TaskCorruption and TimeoutException during
handleCorruption and handleRevocation (#10407)
Need to handle TaskCorruptedException and TimeoutException that can be
thrown from offset commit during handleRevocation or handleCorruption
Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang
<[email protected]>
---
.../streams/processor/internals/AbstractTask.java | 1 +
.../streams/processor/internals/StreamTask.java | 9 +-
.../streams/processor/internals/TaskManager.java | 155 +++++++----
.../kafka/streams/processor/internals/Tasks.java | 10 +
.../processor/internals/StreamTaskTest.java | 18 ++
.../processor/internals/TaskManagerTest.java | 293 +++++++++++++++++++++
6 files changed, 433 insertions(+), 53 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 0e22967..771a400 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -132,6 +132,7 @@ public abstract class AbstractTask implements Task {
@Override
public void revive() {
if (state == CLOSED) {
+ clearTaskTimeout();
transitionTo(CREATED);
} else {
throw new IllegalStateException("Illegal state " + state() + "
while reviving task " + id);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3b21d56..99d6ac6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -486,8 +486,12 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
throw new IllegalStateException("Unknown state " + state() + "
while post committing active task " + id);
}
- commitRequested = false;
+ clearCommitStatuses();
+ }
+
+ private void clearCommitStatuses() {
commitNeeded = false;
+ commitRequested = false;
hasPendingTxCommit = false;
}
@@ -503,6 +507,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
public void closeClean() {
validateClean();
removeAllSensors();
+ clearCommitStatuses();
close(true);
log.info("Closed clean");
}
@@ -510,6 +515,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
@Override
public void closeDirty() {
removeAllSensors();
+ clearCommitStatuses();
close(false);
log.info("Closed dirty");
}
@@ -524,6 +530,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
public void closeCleanAndRecycleState() {
validateClean();
removeAllSensors();
+ clearCommitStatuses();
switch (state()) {
case SUSPENDED:
stateMgr.recycle();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c73de3c..30e7245 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -155,40 +155,55 @@ public class TaskManager {
* @throws TaskMigratedException
*/
void handleCorruption(final Set<TaskId> corruptedTasks) {
- final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks =
new HashMap<>();
- final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new
HashMap<>();
+ final Set<Task> corruptedActiveTasks = new HashSet<>();
+ final Set<Task> corruptedStandbyTasks = new HashSet<>();
for (final TaskId taskId : corruptedTasks) {
final Task task = tasks.task(taskId);
if (task.isActive()) {
- corruptedActiveTasks.put(task, task.changelogPartitions());
+ corruptedActiveTasks.add(task);
} else {
- corruptedStandbyTasks.put(task, task.changelogPartitions());
+ corruptedStandbyTasks.add(task);
}
}
// Make sure to clean up any corrupted standby tasks in their entirety
before committing
// since TaskMigrated can be thrown and the resulting handleLostAll
will only clean up active tasks
- closeAndRevive(corruptedStandbyTasks);
-
- commit(tasks()
- .values()
- .stream()
- .filter(t -> t.state() == Task.State.RUNNING || t.state()
== Task.State.RESTORING)
- .filter(t -> !corruptedTasks.contains(t.id()))
- .collect(Collectors.toSet())
- );
+ closeDirtyAndRevive(corruptedStandbyTasks, true);
+
+ // We need to commit before closing the corrupted active tasks since
this will force the ongoing txn to abort
+ try {
+ commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()
+ .values()
+ .stream()
+ .filter(t -> t.state() == Task.State.RUNNING ||
t.state() == Task.State.RESTORING)
+ .filter(t -> !corruptedTasks.contains(t.id()))
+ .collect(Collectors.toSet()),
+ new HashMap<>()
+ );
+ } catch (final TaskCorruptedException e) {
+ log.info("Some additional tasks were found corrupted while trying
to commit, these will be added to the " +
+ "tasks to clean and revive: {}", e.corruptedTasks());
+ corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+ } catch (final TimeoutException e) {
+ log.info("Hit TimeoutException when committing all non-corrupted
tasks, these will be closed and revived");
+ final Collection<Task> uncorruptedTasks = new
HashSet<>(tasks.activeTasks());
+ uncorruptedTasks.removeAll(corruptedActiveTasks);
+ // Those tasks which just timed out can just be closed dirty
without marking changelogs as corrupted
+ closeDirtyAndRevive(uncorruptedTasks, false);
+ }
- closeAndRevive(corruptedActiveTasks);
+ closeDirtyAndRevive(corruptedActiveTasks, true);
}
- private void closeAndRevive(final Map<Task, Collection<TopicPartition>>
taskWithChangelogs) {
- for (final Map.Entry<Task, Collection<TopicPartition>> entry :
taskWithChangelogs.entrySet()) {
- final Task task = entry.getKey();
+ private void closeDirtyAndRevive(final Collection<Task>
taskWithChangelogs, final boolean markAsCorrupted) {
+ for (final Task task : taskWithChangelogs) {
+ final Collection<TopicPartition> corruptedPartitions =
task.changelogPartitions();
// mark corrupted partitions to not be checkpointed, and then
close the task as dirty
- final Collection<TopicPartition> corruptedPartitions =
entry.getValue();
- task.markChangelogAsCorrupted(corruptedPartitions);
+ if (markAsCorrupted) {
+ task.markChangelogAsCorrupted(corruptedPartitions);
+ }
try {
// we do not need to take the returned offsets since we are
not going to commit anyways;
@@ -201,8 +216,11 @@ public class TaskManager {
try {
task.suspend();
+
// we need to enforce a checkpoint that removes the corrupted
partitions
- task.postCommit(true);
+ if (markAsCorrupted) {
+ task.postCommit(true);
+ }
} catch (final RuntimeException swallow) {
log.error("Error suspending corrupted task {} ", task.id(),
swallow);
}
@@ -332,8 +350,8 @@ public class TaskManager {
// write the checkpoint file.
final Map<TopicPartition, OffsetAndMetadata> offsets =
task.prepareCommit();
if (!offsets.isEmpty()) {
- log.error("Task {} should has been committed when it was
suspended, but it reports non-empty " +
- "offsets {} to commit; it means it fails
during last commit and hence should be closed dirty",
+ log.error("Task {} should have been committed when it was
suspended, but it reports non-empty " +
+ "offsets {} to commit; this means it
failed during last commit and hence should be closed dirty",
task.id(), offsets);
tasksToCloseDirty.add(task);
@@ -512,20 +530,35 @@ public class TaskManager {
prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks,
consumedOffsetsPerTask);
}
- // even if commit failed, we should still continue and complete
suspending those tasks,
- // so we would capture any exception and throw
+ // even if commit failed, we should still continue and complete
suspending those tasks, so we would capture
+ // any exception and rethrow it at the end. some exceptions may be
handled immediately and then swallowed,
+ // as such we just need to skip those dirty tasks in the checkpoint
+ final Set<Task> dirtyTasks = new HashSet<>();
try {
commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ } catch (final TaskCorruptedException e) {
+ log.warn("Some tasks were corrupted when trying to commit offsets,
these will be cleaned and revived: {}",
+ e.corruptedTasks());
+
+ // If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
+ dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+ closeDirtyAndRevive(dirtyTasks, true);
+ } catch (final TimeoutException e) {
+ log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
+
+ // If we hit a TimeoutException it must be ALOS, just close dirty
and revive without wiping the state
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+ closeDirtyAndRevive(dirtyTasks, false);
} catch (final RuntimeException e) {
log.error("Exception caught while committing those revoked tasks "
+ revokedActiveTasks, e);
firstException.compareAndSet(null, e);
+ dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
}
- // only try to complete post-commit if committing succeeded;
- // we enforce checkpointing upon suspending a task: if it is resumed
later we just
- // proceed normally, if it is going to be closed we would checkpoint
by then
- if (firstException.get() == null) {
- for (final Task task : revokedActiveTasks) {
+ // we enforce checkpointing upon suspending a task: if it is resumed
later we just proceed normally, if it is
+ // going to be closed we would checkpoint by then
+ for (final Task task : revokedActiveTasks) {
+ if (!dirtyTasks.contains(task)) {
try {
task.postCommit(true);
} catch (final RuntimeException e) {
@@ -533,9 +566,11 @@ public class TaskManager {
firstException.compareAndSet(null, e);
}
}
+ }
- if (shouldCommitAdditionalTasks) {
- for (final Task task : commitNeededActiveTasks) {
+ if (shouldCommitAdditionalTasks) {
+ for (final Task task : commitNeededActiveTasks) {
+ if (!dirtyTasks.contains(task)) {
try {
// for non-revoking active tasks, we should not
enforce checkpoint
// since if it is EOS enabled, no checkpoint should be
written while
@@ -975,42 +1010,53 @@ public class TaskManager {
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
+ * @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS)
+ * @throws TaskCorruptedException if committing offsets failed due to
TimeoutException (EOS)
* @return number of committed offsets, or -1 if we are in the middle of a
rebalance and cannot commit
*/
int commit(final Collection<Task> tasksToCommit) {
+ int committed = 0;
if (rebalanceInProgress) {
- return -1;
+ committed = -1;
} else {
- int committed = 0;
final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
- for (final Task task : tasksToCommit) {
- if (task.commitNeeded()) {
- final Map<TopicPartition, OffsetAndMetadata>
offsetAndMetadata = task.prepareCommit();
- if (task.isActive()) {
- consumedOffsetsAndMetadataPerTask.put(task,
offsetAndMetadata);
- }
- }
- }
-
try {
- commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
- for (final Task task : tasksToCommit) {
- if (task.commitNeeded()) {
- task.clearTaskTimeout();
- ++committed;
- task.postCommit(false);
- }
- }
+ committed =
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit,
consumedOffsetsAndMetadataPerTask);
} catch (final TimeoutException timeoutException) {
consumedOffsetsAndMetadataPerTask
.keySet()
.forEach(t ->
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
}
+ }
+ return committed;
+ }
+ /**
+ * @param consumedOffsetsAndMetadataPerTask an empty map that will be
filled in with the prepared offsets
+ */
+ private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final
Collection<Task> tasksToCommit,
+ final
Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask) {
+ int committed = 0;
- return committed;
+ for (final Task task : tasksToCommit) {
+ if (task.commitNeeded()) {
+ final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata
= task.prepareCommit();
+ if (task.isActive()) {
+ consumedOffsetsAndMetadataPerTask.put(task,
offsetAndMetadata);
+ }
+ }
}
+
+ commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+
+ for (final Task task : tasksToCommit) {
+ if (task.commitNeeded()) {
+ task.clearTaskTimeout();
+ ++committed;
+ task.postCommit(false);
+ }
+ }
+ return committed;
}
/**
@@ -1030,6 +1076,11 @@ public class TaskManager {
}
}
+ /**
+ * @throws TaskMigratedException if committing offsets failed due to
CommitFailedException (non-EOS)
+ * @throws TimeoutException if committing offsets failed due to
TimeoutException (non-EOS)
+ * @throws TaskCorruptedException if committing offsets failed due to
TimeoutException (EOS)
+ */
private void commitOffsetsOrTransaction(final Map<Task,
Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
log.debug("Committing task offsets {}",
offsetsPerTask.entrySet().stream().collect(Collectors.toMap(t ->
t.getKey().id(), Entry::getValue))); // avoid logging actual Task objects
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index e12290f..4193deb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+import java.util.HashSet;
import org.slf4j.Logger;
import java.util.Collection;
@@ -234,6 +236,14 @@ class Tasks {
return allTasksPerId.get(taskId);
}
+ Collection<Task> tasks(final Collection<TaskId> taskIds) {
+ final Set<Task> tasks = new HashSet<>();
+ for (final TaskId taskId : taskIds) {
+ tasks.add(task(taskId));
+ }
+ return tasks;
+ }
+
// TODO: change return type to `StreamTask`
Collection<Task> activeTasks() {
return readOnlyActiveTasks;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9832f14..ec5d295 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2123,6 +2123,24 @@ public class StreamTaskTest {
}
@Test
+ public void shouldClearCommitStatusesInCloseDirty() {
+ task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"),
StreamsConfig.METRICS_LATEST);
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+
+ task.addRecords(partition1,
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+ assertTrue(task.process(0L));
+ task.requestCommit();
+
+ task.suspend();
+ assertThat(task.commitNeeded(), is(true));
+ assertThat(task.commitRequested(), is(true));
+ task.closeDirty();
+ assertThat(task.commitNeeded(), is(false));
+ assertThat(task.commitRequested(), is(false));
+ }
+
+ @Test
public void closeShouldBeIdempotent() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 00d4012..c618cd3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -107,6 +107,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@RunWith(EasyMockRunner.class)
public class TaskManagerTest {
@@ -116,12 +117,16 @@ public class TaskManagerTest {
private final TaskId taskId00 = new TaskId(0, 0);
private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
+ private final TopicPartition t1p0changelog = new
TopicPartition("changelog", 0);
private final Set<TopicPartition> taskId00Partitions = mkSet(t1p0);
+ private final Set<TopicPartition> taskId00ChangelogPartitions =
mkSet(t1p0changelog);
private final Map<TaskId, Set<TopicPartition>> taskId00Assignment =
singletonMap(taskId00, taskId00Partitions);
private final TaskId taskId01 = new TaskId(0, 1);
private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
+ private final TopicPartition t1p1changelog = new
TopicPartition("changelog", 1);
private final Set<TopicPartition> taskId01Partitions = mkSet(t1p1);
+ private final Set<TopicPartition> taskId01ChangelogPartitions =
mkSet(t1p1changelog);
private final Map<TaskId, Set<TopicPartition>> taskId01Assignment =
singletonMap(taskId01, taskId01Partitions);
private final TaskId taskId02 = new TaskId(0, 2);
@@ -815,6 +820,281 @@ public class TaskManagerTest {
}
@Test
+ public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS()
{
+ final ProcessorStateManager stateManager =
EasyMock.createStrictMock(ProcessorStateManager.class);
+ stateManager.markChangelogAsCorrupted(taskId00Partitions);
+ replay(stateManager);
+
+ final StateMachineTask corruptedActive = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+ final StateMachineTask uncorruptedActive = new
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
+ @Override
+ public void markChangelogAsCorrupted(final
Collection<TopicPartition> partitions) {
+ fail("Should not try to mark changelogs as corrupted for
uncorrupted task");
+ }
+ };
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
+ uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
+
+ // handleAssignment
+ final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+ assignment.putAll(taskId00Assignment);
+ assignment.putAll(taskId01Assignment);
+ expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+ topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(),
anyString());
+ expectLastCall().anyTimes();
+
+ expectRestoreToBeCompleted(consumer, changeLogReader);
+
+ consumer.commitSync(offsets);
+ expectLastCall().andThrow(new TimeoutException());
+
+ expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions));
+
+ replay(activeTaskCreator, standbyTaskCreator, topologyBuilder,
consumer, changeLogReader);
+
+ taskManager.handleAssignment(assignment, emptyMap());
+ assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+
+ assertThat(uncorruptedActive.state(), is(Task.State.RUNNING));
+ assertThat(corruptedActive.state(), is(Task.State.RUNNING));
+
+ // make sure this will be committed and throw
+ uncorruptedActive.setCommitNeeded();
+ corruptedActive.setChangelogOffsets(singletonMap(t1p0, 0L));
+
+ assertThat(uncorruptedActive.commitPrepared, is(false));
+ assertThat(uncorruptedActive.commitNeeded, is(true));
+ assertThat(uncorruptedActive.commitCompleted, is(false));
+ assertThat(corruptedActive.commitPrepared, is(false));
+ assertThat(corruptedActive.commitNeeded, is(false));
+ assertThat(corruptedActive.commitCompleted, is(false));
+
+ taskManager.handleCorruption(singleton(taskId00));
+
+ assertThat(uncorruptedActive.commitPrepared, is(true));
+ assertThat(uncorruptedActive.commitNeeded, is(false));
+ assertThat(uncorruptedActive.commitCompleted, is(false)); //if not
corrupted, we should close dirty without committing
+ assertThat(corruptedActive.commitPrepared, is(true));
+ assertThat(corruptedActive.commitNeeded, is(false));
+ assertThat(corruptedActive.commitCompleted, is(true)); //if corrupted,
should enforce checkpoint with corrupted tasks removed
+
+ assertThat(corruptedActive.state(), is(Task.State.CREATED));
+ assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
+ verify(consumer);
+ }
+
+ @Test
+ public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
{
+ setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA);
+ final StreamsProducer producer = mock(StreamsProducer.class);
+ expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+ final ProcessorStateManager stateManager =
EasyMock.createMock(ProcessorStateManager.class);
+
+ final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new
AtomicBoolean(false);
+ final StateMachineTask corruptedActiveTask = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
+ @Override
+ public void markChangelogAsCorrupted(final
Collection<TopicPartition> partitions) {
+ super.markChangelogAsCorrupted(partitions);
+ corruptedTaskChangelogMarkedAsCorrupted.set(true);
+ }
+ };
+ stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
+
+ final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new
AtomicBoolean(false);
+ final StateMachineTask uncorruptedActiveTask = new
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
+ @Override
+ public void markChangelogAsCorrupted(final
Collection<TopicPartition> partitions) {
+ super.markChangelogAsCorrupted(partitions);
+ uncorruptedTaskChangelogMarkedAsCorrupted.set(true);
+ }
+ };
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
+ uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
+ stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
+
+ // handleAssignment
+ final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+ assignment.putAll(taskId00Assignment);
+ assignment.putAll(taskId01Assignment);
+ expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(corruptedActiveTask,
uncorruptedActiveTask));
+ topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(),
anyString());
+ expectLastCall().anyTimes();
+
+ expectRestoreToBeCompleted(consumer, changeLogReader);
+
+ final ConsumerGroupMetadata groupMetadata = new
ConsumerGroupMetadata("appId");
+ expect(consumer.groupMetadata()).andReturn(groupMetadata);
+ producer.commitTransaction(offsets, groupMetadata);
+ expectLastCall().andThrow(new TimeoutException());
+
+ expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions));
+
+ replay(activeTaskCreator, standbyTaskCreator, topologyBuilder,
consumer, changeLogReader, stateManager, producer);
+
+ taskManager.handleAssignment(assignment, emptyMap());
+ assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+
+ assertThat(uncorruptedActiveTask.state(), is(Task.State.RUNNING));
+ assertThat(corruptedActiveTask.state(), is(Task.State.RUNNING));
+
+ // make sure this will be committed and throw
+ uncorruptedActiveTask.setCommitNeeded();
+
+ final Map<TopicPartition, Long> corruptedActiveTaskChangelogOffsets =
singletonMap(t1p0changelog, 0L);
+
corruptedActiveTask.setChangelogOffsets(corruptedActiveTaskChangelogOffsets);
+ final Map<TopicPartition, Long> uncorruptedActiveTaskChangelogOffsets
= singletonMap(t1p1changelog, 0L);
+
uncorruptedActiveTask.setChangelogOffsets(uncorruptedActiveTaskChangelogOffsets);
+
+ assertThat(uncorruptedActiveTask.commitPrepared, is(false));
+ assertThat(uncorruptedActiveTask.commitNeeded, is(true));
+ assertThat(uncorruptedActiveTask.commitCompleted, is(false));
+ assertThat(corruptedActiveTask.commitPrepared, is(false));
+ assertThat(corruptedActiveTask.commitNeeded, is(false));
+ assertThat(corruptedActiveTask.commitCompleted, is(false));
+
+ taskManager.handleCorruption(singleton(taskId00));
+
+ assertThat(uncorruptedActiveTask.commitPrepared, is(true));
+ assertThat(uncorruptedActiveTask.commitNeeded, is(false));
+ assertThat(uncorruptedActiveTask.commitCompleted, is(true)); //if
corrupted due to timeout on commit, should enforce checkpoint with corrupted
tasks removed
+ assertThat(corruptedActiveTask.commitPrepared, is(true));
+ assertThat(corruptedActiveTask.commitNeeded, is(false));
+ assertThat(corruptedActiveTask.commitCompleted, is(true)); //if
corrupted, should enforce checkpoint with corrupted tasks removed
+
+ assertThat(corruptedActiveTask.state(), is(Task.State.CREATED));
+ assertThat(uncorruptedActiveTask.state(), is(Task.State.CREATED));
+ assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
+ assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
+ verify(consumer);
+ }
+
+ @Test
+ public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS()
{
+ final StateMachineTask revokedActiveTask = new
StateMachineTask(taskId00, taskId00Partitions, true);
+ final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
+ revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
+ revokedActiveTask.setCommitNeeded();
+
+ final StateMachineTask unrevokedActiveTaskWithCommitNeeded = new
StateMachineTask(taskId01, taskId01Partitions, true) {
+ @Override
+ public void markChangelogAsCorrupted(final
Collection<TopicPartition> partitions) {
+ fail("Should not try to mark changelogs as corrupted for
uncorrupted task");
+ }
+ };
+ final Map<TopicPartition, OffsetAndMetadata> offsets01 =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
+
unrevokedActiveTaskWithCommitNeeded.setCommittableOffsetsAndMetadata(offsets01);
+ unrevokedActiveTaskWithCommitNeeded.setCommitNeeded();
+
+ final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new
StateMachineTask(taskId02, taskId02Partitions, true);
+
+ final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets
= new HashMap<>();
+ expectedCommittedOffsets.putAll(offsets00);
+ expectedCommittedOffsets.putAll(offsets01);
+
+ final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
+ mkEntry(taskId00, taskId00Partitions),
+ mkEntry(taskId01, taskId01Partitions),
+ mkEntry(taskId02, taskId02Partitions)
+ );
+
+ expectRestoreToBeCompleted(consumer, changeLogReader);
+
+ expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive))).andReturn(asList(revokedActiveTask,
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+ expectLastCall();
+ consumer.commitSync(expectedCommittedOffsets);
+ expectLastCall().andThrow(new TimeoutException());
+ expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
+
+ replay(activeTaskCreator, standbyTaskCreator, consumer,
changeLogReader);
+
+ taskManager.handleAssignment(assignmentActive, emptyMap());
+ assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ assertThat(revokedActiveTask.state(), is(Task.State.RUNNING));
+ assertThat(unrevokedActiveTaskWithCommitNeeded.state(),
is(State.RUNNING));
+ assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(),
is(Task.State.RUNNING));
+
+ taskManager.handleRevocation(taskId00Partitions);
+
+ assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
+ assertThat(unrevokedActiveTaskWithCommitNeeded.state(),
is(State.CREATED));
+ assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(),
is(State.RUNNING));
+ }
+
+ @Test
+ public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
{
+ setUpTaskManager(ProcessingMode.EXACTLY_ONCE_BETA);
+ final StreamsProducer producer = mock(StreamsProducer.class);
+ expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+ final ProcessorStateManager stateManager =
EasyMock.createMock(ProcessorStateManager.class);
+
+ final StateMachineTask revokedActiveTask = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
+ final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets
= singletonMap(t1p0, new OffsetAndMetadata(0L, null));
+
revokedActiveTask.setCommittableOffsetsAndMetadata(revokedActiveTaskOffsets);
+ revokedActiveTask.setCommitNeeded();
+
+ final AtomicBoolean unrevokedTaskChangelogMarkedAsCorrupted = new
AtomicBoolean(false);
+ final StateMachineTask unrevokedActiveTask = new
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
+ @Override
+ public void markChangelogAsCorrupted(final
Collection<TopicPartition> partitions) {
+ super.markChangelogAsCorrupted(partitions);
+ unrevokedTaskChangelogMarkedAsCorrupted.set(true);
+ }
+ };
+ final Map<TopicPartition, OffsetAndMetadata> unrevokedTaskOffsets =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
+
unrevokedActiveTask.setCommittableOffsetsAndMetadata(unrevokedTaskOffsets);
+ unrevokedActiveTask.setCommitNeeded();
+
+ final StateMachineTask unrevokedActiveTaskWithoutCommitNeeded = new
StateMachineTask(taskId02, taskId02Partitions, true, stateManager);
+
+ final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets
= new HashMap<>();
+ expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
+ expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
+
+ stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
+ stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
+
+ final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
+ mkEntry(taskId00, taskId00Partitions),
+ mkEntry(taskId01, taskId01Partitions),
+ mkEntry(taskId02, taskId02Partitions)
+ );
+
+ expectRestoreToBeCompleted(consumer, changeLogReader);
+
+ expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask,
unrevokedActiveTaskWithoutCommitNeeded));
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+ expectLastCall();
+
+ final ConsumerGroupMetadata groupMetadata = new
ConsumerGroupMetadata("appId");
+ expect(consumer.groupMetadata()).andReturn(groupMetadata);
+ producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
+ expectLastCall().andThrow(new TimeoutException());
+
+ expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
+
+ replay(activeTaskCreator, standbyTaskCreator, consumer,
changeLogReader, producer, stateManager);
+
+ taskManager.handleAssignment(assignmentActive, emptyMap());
+ assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ assertThat(revokedActiveTask.state(), is(Task.State.RUNNING));
+ assertThat(unrevokedActiveTask.state(), is(Task.State.RUNNING));
+ assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(),
is(State.RUNNING));
+
+ final Map<TopicPartition, Long> revokedActiveTaskChangelogOffsets =
singletonMap(t1p0changelog, 0L);
+
revokedActiveTask.setChangelogOffsets(revokedActiveTaskChangelogOffsets);
+ final Map<TopicPartition, Long> unrevokedActiveTaskChangelogOffsets =
singletonMap(t1p1changelog, 0L);
+
unrevokedActiveTask.setChangelogOffsets(unrevokedActiveTaskChangelogOffsets);
+
+ taskManager.handleRevocation(taskId00Partitions);
+
+ assertThat(unrevokedTaskChangelogMarkedAsCorrupted.get(), is(true));
+ assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
+ assertThat(unrevokedActiveTask.state(), is(State.CREATED));
+ assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(),
is(State.RUNNING));
+ }
+
+ @Test
public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
false);
@@ -1056,6 +1336,7 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive)))
.andReturn(asList(task00, task01, task02));
+
expect(activeTaskCreator.threadProducer()).andReturn(producer);
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
@@ -2926,9 +3207,12 @@ public class TaskManagerTest {
private static class StateMachineTask extends AbstractTask implements Task
{
private final boolean active;
+
+ // TODO: KAFKA-12569 clean up usage of these flags and use the new
commitCompleted flag where appropriate
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean commitPrepared = false;
+ private boolean commitCompleted = false;
private Map<TopicPartition, OffsetAndMetadata> committableOffsets =
Collections.emptyMap();
private Map<TopicPartition, Long> purgeableOffsets;
private Map<TopicPartition, Long> changelogOffsets =
Collections.emptyMap();
@@ -3006,6 +3290,7 @@ public class TaskManagerTest {
@Override
public void postCommit(final boolean enforceCheckpoint) {
commitNeeded = false;
+ commitCompleted = true;
}
@Override
@@ -3027,6 +3312,14 @@ public class TaskManagerTest {
}
@Override
+ public void revive() {
+ //TODO: KAFKA-12569 move clearing of commit-required statuses to
closeDirty/Clean/AndRecycle methods
+ commitNeeded = false;
+ commitRequested = false;
+ super.revive();
+ }
+
+ @Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final Exception cause) {
timeout = currentWallClockMs;