This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new c77417f KAFKA-8972 (2.4 blocker): clear all state for zombie task on
TaskMigratedException (#7608)
c77417f is described below
commit c77417f01df03c1248ddc72088d3396a69ad1b40
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed Oct 30 00:20:01 2019 -0400
KAFKA-8972 (2.4 blocker): clear all state for zombie task on
TaskMigratedException (#7608)
Third bugfix for the failing broker bounce system test with cooperative
rebalancing:
tl;dr We need to remove everything associated with a task when it is
closed, but in some cases (eg AssignedTasks#commit) on a
TaskMigratedExceptionwe would close it as a zombie and then (only) remove the
taskId from therunning` map. This left its partitions, restorers, state stores,
etc around and in an undefined state, causing exceptions when closing and/or
opening the stores again.
Longer explanation:
In AssignedTasks (the abstract class from which the standby and active task
variations extend) a commit failure (even due to broker down/unavailable) is
treated as a TaskMigratedException after which the failed task is closed as a
zombie and removed from running -- the remaining tasks (ie those still in
running are then also closed as zombies in the subsequent onPartitionsLost
However we do not remove the closed task from runningByPartition nor do we
remove the corresponding changelogs, if restoring, from the
StoreChangelogReader since that applies only to active tasks, and AssignedTasks
is generic/abstract. The changelog reader then retains a mapping from the
closed task's changelog partition to its CompositeRestoreListener (and does not
replace this when the new one comes along after the rebalance). The restore
listener has a reference to a specific Rocks [...]
Although technically this bug existed before KIP-429, it was only uncovered
now that we remove tasks and clear their state/partitions/etc one at a time. We
don't technically need to cherrypick the fix back earlier as before we just
blindly clear all data structures entirely during an eager rebalance.
Reviewers: Boyang Chen <[email protected]>, Guozhang Wang
<[email protected]>
---
.../processor/internals/AssignedStandbyTasks.java | 10 +--
.../processor/internals/AssignedStreamsTasks.java | 96 ++++++++++------------
.../streams/processor/internals/AssignedTasks.java | 24 ++++--
.../processor/internals/ChangelogReader.java | 4 +
.../processor/internals/StoreChangelogReader.java | 12 +++
.../streams/processor/internals/TaskManager.java | 4 +-
.../internals/AssignedStreamsTasksTest.java | 87 ++++++++------------
.../processor/internals/MockChangelogReader.java | 5 ++
.../processor/internals/StreamThreadTest.java | 12 +--
9 files changed, 126 insertions(+), 128 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index 0c9a70d..f217a55 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -34,8 +34,8 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask>
{
public void shutdown(final boolean clean) {
final String shutdownType = clean ? "Clean" : "Unclean";
log.debug(shutdownType + " shutdown of all standby tasks" + "\n" +
- "created tasks to close: {}" + "\n" +
- "running tasks to close: {}",
+ "non-initialized standby tasks to close: {}" + "\n" +
+ "running standby tasks to close: {}",
clean, created.keySet(), running.keySet());
super.shutdown(clean);
}
@@ -63,7 +63,7 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask>
{
final List<TopicPartition> revokedChangelogs = new ArrayList<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
revokedTasks.entrySet()) {
final TaskId taskId = entry.getKey();
- final Task task;
+ final StandbyTask task;
if (running.containsKey(taskId)) {
task = running.get(taskId);
@@ -77,9 +77,9 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask>
{
try {
task.close(true, false);
} catch (final RuntimeException e) {
- log.error("Closing the {} {} failed due to the following
error:", taskTypeName, task.id(), e);
+ log.error("Closing the standby task {} failed due to the
following error:", task.id(), e);
} finally {
- running.remove(taskId);
+ removeTaskFromRunning(task);
revokedChangelogs.addAll(task.changelogPartitions());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 3515824..63dac25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -136,12 +136,12 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
suspended.put(id, task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
// swallow and move on since we are rebalancing
- log.info("Failed to suspend the stream task {} since it got
migrated to another thread already. " +
+ log.info("Failed to suspend stream task {} since it got
migrated to another thread already. " +
"Closing it as zombie and moving on.", id);
firstException.compareAndSet(null, closeZombieTask(task));
prevActiveTasks.remove(id);
} catch (final RuntimeException e) {
- log.error("Suspending the stream task {} failed due to the
following error:", id, e);
+ log.error("Suspending stream task {} failed due to the
following error:", id, e);
firstException.compareAndSet(null, e);
try {
prevActiveTasks.remove(id);
@@ -152,9 +152,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
id, f);
}
} finally {
- running.remove(id);
- runningByPartition.keySet().removeAll(task.partitions());
-
runningByPartition.keySet().removeAll(task.changelogPartitions());
+ removeTaskFromRunning(task);
taskChangelogs.addAll(task.changelogPartitions());
}
}
@@ -193,9 +191,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
private RuntimeException closeRunning(final boolean isZombie,
final StreamTask task,
final List<TopicPartition>
closedTaskChangelogs) {
- running.remove(task.id());
- runningByPartition.keySet().removeAll(task.partitions());
- runningByPartition.keySet().removeAll(task.changelogPartitions());
+ removeTaskFromRunning(task);
closedTaskChangelogs.addAll(task.changelogPartitions());
try {
@@ -228,12 +224,8 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
private RuntimeException closeRestoring(final boolean isZombie,
final StreamTask task,
final List<TopicPartition>
closedTaskChangelogs) {
- restoring.remove(task.id());
+ removeTaskFromRestoring(task);
closedTaskChangelogs.addAll(task.changelogPartitions());
- for (final TopicPartition tp : task.partitions()) {
- restoredPartitions.remove(tp);
- restoringByPartition.remove(tp);
- }
try {
final boolean clean = !isZombie;
@@ -282,15 +274,19 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
for (final TaskId id : lostTasks) {
if (suspended.containsKey(id)) {
+ log.debug("Closing the zombie suspended stream task {}.", id);
firstException.compareAndSet(null, closeSuspended(true,
suspended.get(id)));
} else if (created.containsKey(id)) {
+ log.debug("Closing the zombie created stream task {}.", id);
firstException.compareAndSet(null, closeNonRunning(true,
created.get(id), lostTaskChangelogs));
} else if (restoring.containsKey(id)) {
+ log.debug("Closing the zombie restoring stream task {}.", id);
firstException.compareAndSet(null, closeRestoring(true,
created.get(id), lostTaskChangelogs));
} else if (running.containsKey(id)) {
+ log.debug("Closing the zombie running stream task {}.", id);
firstException.compareAndSet(null, closeRunning(true,
running.get(id), lostTaskChangelogs));
} else {
- // task may have already been closed as a zombie and removed
from all task maps
+ log.warn("Skipping closing the zombie stream task {} as it was
already removed.", id);
}
}
@@ -324,14 +320,8 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
} catch (final TaskMigratedException e) {
// we need to catch migration exception internally since
this function
// is triggered in the rebalance callback
- log.info("Failed to resume the stream task {} since it got
migrated to another thread already. " +
- "Closing it as zombie before triggering a new
rebalance.", task.id());
- final RuntimeException fatalException =
closeZombieTask(task);
- running.remove(taskId);
-
- if (fatalException != null) {
- throw fatalException;
- }
+ log.info("Failed to resume stream task {} since it got
migrated to another thread already. " +
+ "Will trigger a new rebalance and close all tasks
as zombies together.", task.id());
throw e;
}
log.trace("Resuming the suspended stream task {}", task.id());
@@ -374,7 +364,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
}
- void addToRestoring(final StreamTask task) {
+ void addTaskToRestoring(final StreamTask task) {
restoring.put(task.id(), task);
for (final TopicPartition topicPartition : task.partitions()) {
restoringByPartition.put(topicPartition, task);
@@ -384,6 +374,18 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
}
+ private void removeTaskFromRestoring(final StreamTask task) {
+ restoring.remove(task.id());
+ for (final TopicPartition topicPartition : task.partitions()) {
+ restoredPartitions.remove(topicPartition);
+ restoringByPartition.remove(topicPartition);
+ }
+ for (final TopicPartition topicPartition : task.changelogPartitions())
{
+ restoredPartitions.remove(topicPartition);
+ restoringByPartition.remove(topicPartition);
+ }
+ }
+
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
@@ -392,8 +394,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
int committed = 0;
RuntimeException firstException = null;
- for (final Iterator<StreamTask> it = running().iterator();
it.hasNext(); ) {
- final StreamTask task = it.next();
+ for (final StreamTask task : running.values()) {
try {
if (task.commitRequested() && task.commitNeeded()) {
task.commit();
@@ -402,12 +403,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
} catch (final TaskMigratedException e) {
log.info("Failed to commit stream task {} since it got
migrated to another thread already. " +
- "Closing it as zombie before triggering a new
rebalance.", task.id());
- final RuntimeException fatalException = closeZombieTask(task);
- if (fatalException != null) {
- throw fatalException;
- }
- it.remove();
+ "Will trigger a new rebalance and close all tasks as
zombies together.", task.id());
throw e;
} catch (final RuntimeException t) {
log.error("Failed to commit stream task {} due to the
following error:", task.id(), t);
@@ -443,21 +439,14 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
int process(final long now) {
int processed = 0;
- final Iterator<Map.Entry<TaskId, StreamTask>> it =
running.entrySet().iterator();
- while (it.hasNext()) {
- final StreamTask task = it.next().getValue();
+ for (final StreamTask task : running.values()) {
try {
if (task.isProcessable(now) && task.process()) {
processed++;
}
} catch (final TaskMigratedException e) {
log.info("Failed to process stream task {} since it got
migrated to another thread already. " +
- "Closing it as zombie before triggering a new
rebalance.", task.id());
- final RuntimeException fatalException = closeZombieTask(task);
- if (fatalException != null) {
- throw fatalException;
- }
- it.remove();
+ "Will trigger a new rebalance and close all tasks as
zombies together.", task.id());
throw e;
} catch (final RuntimeException e) {
log.error("Failed to process stream task {} due to the
following error:", task.id(), e);
@@ -473,9 +462,8 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
*/
int punctuate() {
int punctuated = 0;
- final Iterator<Map.Entry<TaskId, StreamTask>> it =
running.entrySet().iterator();
- while (it.hasNext()) {
- final StreamTask task = it.next().getValue();
+
+ for (final StreamTask task : running.values()) {
try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
@@ -485,12 +473,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got
migrated to another thread already. " +
- "Closing it as zombie before triggering a new
rebalance.", task.id());
- final RuntimeException fatalException = closeZombieTask(task);
- if (fatalException != null) {
- throw fatalException;
- }
- it.remove();
+ "Will trigger a new rebalance and close all tasks as
zombies together.", task.id());
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the
following error:", task.id(), e);
@@ -512,14 +495,23 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
public void shutdown(final boolean clean) {
final String shutdownType = clean ? "Clean" : "Unclean";
log.debug(shutdownType + " shutdown of all active tasks" + "\n" +
- "non-initialized tasks to close: {}" + "\n" +
+ "non-initialized stream tasks to close: {}" + "\n" +
"restoring tasks to close: {}" + "\n" +
- "running tasks to close: {}" + "\n" +
- "suspended tasks to close: {}",
+ "running stream tasks to close: {}" + "\n" +
+ "suspended stream tasks to close: {}",
clean, created.keySet(), restoring.keySet(), running.keySet(),
suspended.keySet());
super.shutdown(clean);
}
+ @Override
+ public boolean isEmpty() {
+ return super.isEmpty()
+ && restoring.isEmpty()
+ && restoringByPartition.isEmpty()
+ && restoredPartitions.isEmpty()
+ && suspended.isEmpty();
+ }
+
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 56fefe0..6d64d40 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -71,7 +71,7 @@ abstract class AssignedTasks<T extends Task> {
task.initializeMetadata();
if (!task.initializeStateStores()) {
log.debug("Transitioning {} {} to restoring",
taskTypeName, entry.getKey());
- ((AssignedStreamsTasks) this).addToRestoring((StreamTask)
task);
+ ((AssignedStreamsTasks)
this).addTaskToRestoring((StreamTask) task);
} else {
transitionToRunning(task);
}
@@ -121,6 +121,12 @@ abstract class AssignedTasks<T extends Task> {
}
}
+ void removeTaskFromRunning(final T task) {
+ running.remove(task.id());
+ runningByPartition.keySet().removeAll(task.partitions());
+ runningByPartition.keySet().removeAll(task.changelogPartitions());
+ }
+
T runningTaskFor(final TopicPartition partition) {
return runningByPartition.get(partition);
}
@@ -176,6 +182,12 @@ abstract class AssignedTasks<T extends Task> {
created.clear();
}
+ boolean isEmpty() {
+ return runningByPartition.isEmpty()
+ && running.isEmpty()
+ && created.isEmpty();
+ }
+
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
@@ -184,8 +196,7 @@ abstract class AssignedTasks<T extends Task> {
int committed = 0;
RuntimeException firstException = null;
- for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
- final T task = it.next();
+ for (final T task : running.values()) {
try {
if (task.commitNeeded()) {
task.commit();
@@ -193,12 +204,7 @@ abstract class AssignedTasks<T extends Task> {
}
} catch (final TaskMigratedException e) {
log.info("Failed to commit {} {} since it got migrated to
another thread already. " +
- "Closing it as zombie before triggering a new
rebalance.", taskTypeName, task.id());
- final RuntimeException fatalException = closeZombieTask(task);
- if (fatalException != null) {
- throw fatalException;
- }
- it.remove();
+ "Will trigger a new rebalance and close all tasks as
zombies together.", taskTypeName, task.id());
throw e;
} catch (final RuntimeException t) {
log.error("Failed to commit {} {} due to the following error:",
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 782be15..df01e7c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -56,4 +56,8 @@ public interface ChangelogReader {
*/
void clear();
+ /**
+ * @return whether the changelog reader has just been cleared or is
uninitialized.
+ */
+ boolean isEmpty();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 81f76a3..4e5ae57 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -69,6 +69,8 @@ public class StoreChangelogReader implements ChangelogReader {
stateRestorers.put(restorer.partition(), restorer);
log.trace("Added restorer for changelog {}", restorer.partition());
+ } else {
+ log.debug("Skip re-adding restorer for changelog {}",
restorer.partition());
}
needsInitializing.add(restorer.partition());
@@ -296,6 +298,16 @@ public class StoreChangelogReader implements
ChangelogReader {
completedRestorers.clear();
}
+ @Override
+ public boolean isEmpty() {
+ return partitionInfo.isEmpty()
+ && stateRestorers.isEmpty()
+ && needsRestoring.isEmpty()
+ && restoreToOffsets.isEmpty()
+ && needsInitializing.isEmpty()
+ && completedRestorers.isEmpty();
+ }
+
private long processNext(final List<ConsumerRecord<byte[], byte[]>>
records,
final StateRestorer restorer,
final Long endOffset) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 82496df..72cff77 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -269,8 +269,8 @@ public class TaskManager {
if (exception != null) {
throw exception;
- } else if (!assignedActiveTasks.isEmpty()) {
- throw new IllegalStateException("TaskManager had leftover tasks
after removing all zombies");
+ } else if (!(active.isEmpty() && assignedActiveTasks.isEmpty() &&
changelogReader.isEmpty())) {
+ throw new IllegalStateException("TaskManager found leftover active
task state after closing all zombies");
}
return zombieTasks;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index a8f96e4..68ca9bd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -43,6 +43,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.junit.function.ThrowingRunnable;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -50,6 +51,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -170,7 +172,7 @@ public class AssignedStreamsTasksTest {
t1.initializeMetadata();
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).times(2);
-
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(2);
+
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(3);
t1.closeStateManager(true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@@ -259,24 +261,17 @@ public class AssignedStreamsTasksTest {
}
@Test
- public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
+ public void
shouldNotCloseTaskWithinResumeSuspendedIfTaskMigratedException() {
mockRunningTaskSuspension();
t1.resume();
t1.initializeTopology();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
- t1.close(false, true);
- EasyMock.expectLastCall();
EasyMock.replay(t1);
assertThat(suspendTask(), nullValue());
- try {
- assignedTasks.maybeResumeSuspendedTask(taskId1,
Collections.singleton(tp1));
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(),
equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
+ verifyTaskMigratedExceptionDoesNotCloseTask(
+ () -> assignedTasks.maybeResumeSuspendedTask(taskId1,
Collections.singleton(tp1)));
}
private void mockTaskInitialization() {
@@ -303,23 +298,16 @@ public class AssignedStreamsTasksTest {
}
@Test
- public void shouldCloseTaskOnCommitIfTaskMigratedException() {
+ public void shouldNotCloseTaskWithinCommitIfTaskMigratedException() {
mockTaskInitialization();
EasyMock.expect(t1.commitNeeded()).andReturn(true);
t1.commit();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
- t1.close(false, true);
- EasyMock.expectLastCall();
EasyMock.replay(t1);
addAndInitTask();
- try {
- assignedTasks.commit();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(),
equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
+ verifyTaskMigratedExceptionDoesNotCloseTask(
+ () -> assignedTasks.commit());
}
@Test
@@ -357,44 +345,30 @@ public class AssignedStreamsTasksTest {
}
@Test
- public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
+ public void shouldNotCloseTaskWithinMaybeCommitIfTaskMigratedException() {
mockTaskInitialization();
EasyMock.expect(t1.commitRequested()).andReturn(true);
EasyMock.expect(t1.commitNeeded()).andReturn(true);
t1.commit();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
- t1.close(false, true);
- EasyMock.expectLastCall();
EasyMock.replay(t1);
addAndInitTask();
- try {
- assignedTasks.maybeCommitPerUserRequested();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(),
equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
+ verifyTaskMigratedExceptionDoesNotCloseTask(
+ () -> assignedTasks.maybeCommitPerUserRequested());
}
@Test
- public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
+ public void shouldNotCloseTaskWithinProcessIfTaskMigratedException() {
mockTaskInitialization();
EasyMock.expect(t1.isProcessable(0L)).andReturn(true);
t1.process();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
- t1.close(false, true);
- EasyMock.expectLastCall();
EasyMock.replay(t1);
addAndInitTask();
- try {
- assignedTasks.process(0L);
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
-
- assertThat(assignedTasks.runningTaskIds(),
equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
+ verifyTaskMigratedExceptionDoesNotCloseTask(
+ () -> assignedTasks.process(0L));
}
@Test
@@ -438,39 +412,33 @@ public class AssignedStreamsTasksTest {
}
@Test
- public void
shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
+ public void
shouldNotCloseTaskWithinMaybePunctuateStreamTimeIfTaskMigratedException() {
mockTaskInitialization();
t1.maybePunctuateStreamTime();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
- t1.close(false, true);
- EasyMock.expectLastCall();
EasyMock.replay(t1);
addAndInitTask();
- try {
- assignedTasks.punctuate();
- fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
- assertThat(assignedTasks.runningTaskIds(),
equalTo(Collections.EMPTY_SET));
- EasyMock.verify(t1);
+ verifyTaskMigratedExceptionDoesNotCloseTask(
+ () -> assignedTasks.punctuate());
}
@Test
- public void
shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
+ public void
shouldNotloseTaskWithinMaybePunctuateSystemTimeIfTaskMigratedException() {
mockTaskInitialization();
EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
t1.maybePunctuateSystemTime();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
- t1.close(false, true);
- EasyMock.expectLastCall();
EasyMock.replay(t1);
addAndInitTask();
try {
assignedTasks.punctuate();
fail("Should have thrown TaskMigratedException.");
- } catch (final TaskMigratedException expected) { /* ignore */ }
+ } catch (final TaskMigratedException expected) {
+ assertThat(assignedTasks.runningTaskIds(),
equalTo(Collections.singleton(taskId1)));
+ }
EasyMock.verify(t1);
}
@@ -571,5 +539,16 @@ public class AssignedStreamsTasksTest {
EasyMock.expectLastCall();
}
+ private void verifyTaskMigratedExceptionDoesNotCloseTask(final
ThrowingRunnable action) {
+ final Set<TaskId> expectedRunningTaskIds =
Collections.singleton(taskId1);
+
+ // This action is expected to throw a TaskMigratedException
+ assertThrows(TaskMigratedException.class, action);
+
+ // This task should be closed as a zombie with all the other tasks
during onPartitionsLost
+ assertThat(assignedTasks.runningTaskIds(),
equalTo(expectedRunningTaskIds));
+
+ EasyMock.verify(t1);
+ }
}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
index 9cc51ce..25c5a13 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
@@ -60,6 +60,11 @@ public class MockChangelogReader implements ChangelogReader {
}
}
+ @Override
+ public boolean isEmpty() {
+ return restoredOffsets.isEmpty() && registered.isEmpty();
+ }
+
public boolean wasRegistered(final TopicPartition partition) {
return registered.contains(partition);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b56eb8..2c9e948 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -842,7 +842,7 @@ public class StreamThreadTest {
}
@Test
- public void
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing()
throws Exception {
+ public void
shouldNotCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing()
throws Exception {
internalTopologyBuilder.addSource(null, "source", null, null, null,
topic1);
internalTopologyBuilder.addSink("sink", "dummyTopic", null, null,
null, "source");
@@ -898,16 +898,16 @@ public class StreamThreadTest {
try {
thread.runOnce();
fail("Should have thrown TaskMigratedException");
- } catch (final TaskMigratedException expected) { /* ignore */ }
- TestUtils.waitForCondition(
- () -> thread.tasks().isEmpty(),
- "StreamsThread did not remove fenced zombie task.");
+ } catch (final TaskMigratedException expected) {
+ assertTrue("StreamsThread removed the fenced zombie task already,
should wait for rebalance to close all zombies together.",
+ thread.tasks().containsKey(task1));
+ }
assertThat(producer.commitCount(), equalTo(1L));
}
@Test
- public void
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks()
{
+ public void
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTasks()
{
final StreamThread thread = createStreamThread(CLIENT_ID, new
StreamsConfig(configProps(true)), true);
internalTopologyBuilder.addSource(null, "name", null, null, null,
topic1);