This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new cbc9f57 HOTFIX: safely clear all active state in onPartitionsLost
(#7691)
cbc9f57 is described below
commit cbc9f57c0e6a2f9cd5252c11aeb19abefba4263e
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Nov 19 13:34:38 2019 -0800
HOTFIX: safely clear all active state in onPartitionsLost (#7691)
After a number of last minute bugs were found stemming from the incremental
closing of lost tasks in StreamsRebalanceListener#onPartitionsLost, a safer
approach to this edge case seems warranted. We initially wanted to be as
"future-proof" as possible, and avoid baking further protocol assumptions into
the code that may be broken as the protocol evolves. This meant that rather
than simply closing all active tasks and clearing all associated state in
#onPartitionsLost(lostPartitions) w [...]
Therefore, before worrying about it being "future-proof" it seems we should
make sure it is "present-day-proof" and implement this callback in the safest
possible way, by blindly clearing and closing all active task state. We log all
the relevant state (at debug level) before clearing it, so we can at least tell
from the logs whether/which emptiness checks were being violated.
Reviewers: Guozhang Wang <[email protected]>, Bill Bejeck
<[email protected]>, Andrew Choi <[email protected]>
---
.../consumer/ConsumerRebalanceListener.java | 3 +-
.../processor/internals/AssignedStandbyTasks.java | 3 +-
.../processor/internals/AssignedStreamsTasks.java | 128 +++++++++++----------
.../streams/processor/internals/AssignedTasks.java | 64 +++++++----
.../processor/internals/StoreChangelogReader.java | 2 +-
.../internals/StreamsRebalanceListener.java | 6 +-
.../streams/processor/internals/TaskManager.java | 58 +++-------
.../internals/AssignedStreamsTasksTest.java | 62 +++-------
8 files changed, 156 insertions(+), 170 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index ec2ce39..2f43b60 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -188,7 +188,8 @@ public interface ConsumerRebalanceListener {
* necessary to catch these exceptions and re-attempt to wakeup or
interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the
consumer and now have been reassigned
- * to other consumers (may not include all currently
assigned partitions, i.e. there may still
+ * to other consumers. With the current protocol this
will always include all of the consumer's
+ * previously assigned partitions, but this may change
in future protocols (ie there would still
* be some partitions left)
* @throws org.apache.kafka.common.errors.WakeupException If raised from a
nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised
from a nested call to {@link KafkaConsumer}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index 0f8896e..299390b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -79,7 +80,7 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask>
{
} catch (final RuntimeException e) {
log.error("Closing the standby task {} failed due to the
following error:", task.id(), e);
} finally {
- removeTaskFromRunning(task);
+ removeTaskFromAllStateMaps(task, Collections.emptyMap());
revokedChangelogs.addAll(task.changelogPartitions());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index c9fe9fd..9440c4c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.ArrayList;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
@@ -83,6 +84,15 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask>
implements Restorin
boolean hasRestoringTasks() {
return !restoring.isEmpty();
}
+
+ void clearRestoringPartitions() {
+ if (!restoring.isEmpty()) {
+ log.error("Tried to clear restoring partitions but was still
restoring the stream tasks {}", restoring);
+ throw new IllegalStateException("Should not clear restoring
partitions while set of restoring tasks is non-empty");
+ }
+ restoredPartitions.clear();
+ restoringByPartition.clear();
+ }
Set<TaskId> suspendedTaskIds() {
return suspended.keySet();
@@ -152,7 +162,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
id, f);
}
} finally {
- removeTaskFromRunning(task);
+ removeTaskFromAllStateMaps(task, suspended);
taskChangelogs.addAll(task.changelogPartitions());
}
}
@@ -189,10 +199,8 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
private RuntimeException closeRunning(final boolean isZombie,
- final StreamTask task,
- final List<TopicPartition>
closedTaskChangelogs) {
- removeTaskFromRunning(task);
- closedTaskChangelogs.addAll(task.changelogPartitions());
+ final StreamTask task) {
+ removeTaskFromAllStateMaps(task, Collections.emptyMap());
try {
final boolean clean = !isZombie;
@@ -208,7 +216,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
private RuntimeException closeNonRunning(final boolean isZombie,
final StreamTask task,
final List<TopicPartition>
closedTaskChangelogs) {
- created.remove(task.id());
+ removeTaskFromAllStateMaps(task, Collections.emptyMap());
closedTaskChangelogs.addAll(task.changelogPartitions());
try {
@@ -221,10 +229,11 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
return null;
}
+ // Since a restoring task has not had its topology initialized yet, we
need only close the state manager
private RuntimeException closeRestoring(final boolean isZombie,
final StreamTask task,
final List<TopicPartition>
closedTaskChangelogs) {
- removeTaskFromRestoring(task);
+ removeTaskFromAllStateMaps(task, Collections.emptyMap());
closedTaskChangelogs.addAll(task.changelogPartitions());
try {
@@ -240,7 +249,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
private RuntimeException closeSuspended(final boolean isZombie,
final StreamTask task) {
- suspended.remove(task.id());
+ removeTaskFromAllStateMaps(task, Collections.emptyMap());
try {
final boolean clean = !isZombie;
@@ -269,37 +278,30 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
return firstException.get();
}
- RuntimeException closeZombieTasks(final Set<TaskId> lostTasks, final
List<TopicPartition> lostTaskChangelogs) {
+ RuntimeException closeAllTasksAsZombies() {
+ log.debug("Closing all active tasks as zombies, current state of
active tasks: {}", toString());
+
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
+ final List<TopicPartition> changelogs = new ArrayList<>(); // not
used, as we clear/unsubscribe all changelogs
- for (final TaskId id : lostTasks) {
- if (suspended.containsKey(id)) {
- log.debug("Closing the zombie suspended stream task {}.", id);
- firstException.compareAndSet(null, closeSuspended(true,
suspended.get(id)));
+ for (final TaskId id : allAssignedTaskIds()) {
+ if (running.containsKey(id)) {
+ log.debug("Closing the zombie running stream task {}.", id);
+ firstException.compareAndSet(null, closeRunning(true,
running.get(id)));
} else if (created.containsKey(id)) {
log.debug("Closing the zombie created stream task {}.", id);
- firstException.compareAndSet(null, closeNonRunning(true,
created.get(id), lostTaskChangelogs));
+ firstException.compareAndSet(null, closeNonRunning(true,
created.get(id), changelogs));
} else if (restoring.containsKey(id)) {
log.debug("Closing the zombie restoring stream task {}.", id);
- firstException.compareAndSet(null, closeRestoring(true,
restoring.get(id), lostTaskChangelogs));
- } else if (running.containsKey(id)) {
- log.debug("Closing the zombie running stream task {}.", id);
- firstException.compareAndSet(null, closeRunning(true,
running.get(id), lostTaskChangelogs));
- } else {
- log.warn("Skipping closing the zombie stream task {} as it was
already removed.", id);
+ firstException.compareAndSet(null, closeRestoring(true,
restoring.get(id), changelogs));
+ } else if (suspended.containsKey(id)) {
+ log.debug("Closing the zombie suspended stream task {}.", id);
+ firstException.compareAndSet(null, closeSuspended(true,
suspended.get(id)));
}
}
- // We always clear the prevActiveTasks and replace with current set of
running tasks to encode in subscription
- // We should exclude any tasks that were lost however, they will be
counted as standbys for assignment purposes
- prevActiveTasks.clear();
- prevActiveTasks.addAll(running.keySet());
+ clear();
- // With the current rebalance protocol, there should not be any
running tasks left as they were all lost
- if (!prevActiveTasks.isEmpty()) {
- log.error("Found the still running stream tasks {} after closing
all tasks lost as zombies", prevActiveTasks);
- firstException.compareAndSet(null, new IllegalStateException("Not
all lost tasks were closed as zombies"));
- }
return firstException.get();
}
@@ -311,7 +313,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
if (suspended.containsKey(taskId)) {
final StreamTask task = suspended.get(taskId);
log.trace("Found suspended stream task {}", taskId);
- suspended.remove(taskId);
+ removeTaskFromAllStateMaps(task, Collections.emptyMap());
if (task.partitions().equals(partitions)) {
task.resume();
@@ -346,8 +348,12 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
if (restoredPartitions.containsAll(task.changelogPartitions())) {
transitionToRunning(task);
it.remove();
- restoringByPartition.keySet().removeAll(task.partitions());
-
restoringByPartition.keySet().removeAll(task.changelogPartitions());
+ // Note that because we add back all restored partitions at
the top of this loop, clearing them from
+ // restoredPartitions here doesn't really matter. We do it
anyway as it is the correct thing to do,
+ // and may matter with future changes.
+ removeFromRestoredPartitions(task);
+ removeFromRestoringByPartition(task);
+
log.debug("Stream task {} completed restoration as all its
changelog partitions {} have been applied to restore state",
task.id(),
task.changelogPartitions());
@@ -372,6 +378,24 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
}
+ @Override
+ void removeTaskFromAllStateMaps(final StreamTask task, final Map<TaskId,
StreamTask> currentStateMap) {
+ super.removeTaskFromAllStateMaps(task, currentStateMap);
+
+ final TaskId id = task.id();
+ final Set<TopicPartition> taskPartitions = new
HashSet<>(task.partitions());
+ taskPartitions.addAll(task.changelogPartitions());
+
+ if (currentStateMap != restoring) {
+ restoring.remove(id);
+ restoringByPartition.keySet().removeAll(taskPartitions);
+ restoredPartitions.removeAll(taskPartitions);
+ }
+ if (currentStateMap != suspended) {
+ suspended.remove(id);
+ }
+ }
+
void addTaskToRestoring(final StreamTask task) {
restoring.put(task.id(), task);
for (final TopicPartition topicPartition : task.partitions()) {
@@ -382,16 +406,14 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
}
}
- private void removeTaskFromRestoring(final StreamTask task) {
- restoring.remove(task.id());
- for (final TopicPartition topicPartition : task.partitions()) {
- restoredPartitions.remove(topicPartition);
- restoringByPartition.remove(topicPartition);
- }
- for (final TopicPartition topicPartition : task.changelogPartitions())
{
- restoredPartitions.remove(topicPartition);
- restoringByPartition.remove(topicPartition);
- }
+ private void removeFromRestoringByPartition(final StreamTask task) {
+ restoringByPartition.keySet().removeAll(task.partitions());
+ restoringByPartition.keySet().removeAll(task.changelogPartitions());
+ }
+
+ private void removeFromRestoredPartitions(final StreamTask task) {
+ restoredPartitions.removeAll(task.partitions());
+ restoredPartitions.removeAll(task.changelogPartitions());
}
/**
@@ -497,6 +519,7 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
restoringByPartition.clear();
restoredPartitions.clear();
suspended.clear();
+ prevActiveTasks.clear();
}
@Override
@@ -511,26 +534,13 @@ class AssignedStreamsTasks extends
AssignedTasks<StreamTask> implements Restorin
super.shutdown(clean);
}
- @Override
- public boolean isEmpty() throws IllegalStateException {
- if (restoring.isEmpty() && !restoringByPartition.isEmpty()) {
- log.error("Assigned stream tasks in an inconsistent state: the set
of restoring tasks is empty but the " +
- "restoring by partitions map contained {}",
restoringByPartition);
- throw new IllegalStateException("Found inconsistent state: no
tasks restoring but nonempty restoringByPartition");
- } else {
- return super.isEmpty()
- && restoring.isEmpty()
- && restoringByPartition.isEmpty()
- && restoredPartitions.isEmpty()
- && suspended.isEmpty();
- }
- }
-
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
- describe(builder, restoring.values(), indent, "Restoring:");
- describe(builder, suspended.values(), indent, "Suspended:");
+ describeTasks(builder, restoring.values(), indent, "Restoring:");
+ describePartitions(builder, restoringByPartition.keySet(), indent,
"Restoring Partitions:");
+ describePartitions(builder, restoredPartitions, indent, "Restored
Partitions:");
+ describeTasks(builder, suspended.values(), indent, "Suspended:");
return builder.toString();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index cc4c0f9..45ac26b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -69,12 +69,17 @@ abstract class AssignedTasks<T extends Task> {
try {
final T task = entry.getValue();
task.initializeMetadata();
+
+ // don't remove from created until the task has been
successfully initialized
+ removeTaskFromAllStateMaps(task, created);
+
if (!task.initializeStateStores()) {
log.debug("Transitioning {} {} to restoring",
taskTypeName, entry.getKey());
((AssignedStreamsTasks)
this).addTaskToRestoring((StreamTask) task);
} else {
transitionToRunning(task);
}
+
it.remove();
} catch (final LockException e) {
// If this is a permanent error, then we could spam the log
since this is in the run loop. But, other related
@@ -121,10 +126,25 @@ abstract class AssignedTasks<T extends Task> {
}
}
- void removeTaskFromRunning(final T task) {
- running.remove(task.id());
- runningByPartition.keySet().removeAll(task.partitions());
- runningByPartition.keySet().removeAll(task.changelogPartitions());
+ /**
+ * Removes the passed in task (and its corresponding partitions) from all
state maps and sets,
+ * except for the one it currently resides in.
+ *
+ * @param task the task to be removed
+ * @param currentStateMap the current state map, which the task should not
be removed from
+ */
+ void removeTaskFromAllStateMaps(final T task, final Map<TaskId, T>
currentStateMap) {
+ final TaskId id = task.id();
+ final Set<TopicPartition> taskPartitions = new
HashSet<>(task.partitions());
+ taskPartitions.addAll(task.changelogPartitions());
+
+ if (currentStateMap != running) {
+ running.remove(id);
+ runningByPartition.keySet().removeAll(taskPartitions);
+ }
+ if (currentStateMap != created) {
+ created.remove(id);
+ }
}
T runningTaskFor(final TopicPartition partition) {
@@ -146,15 +166,16 @@ abstract class AssignedTasks<T extends Task> {
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
- describe(builder, running.values(), indent, "Running:");
- describe(builder, created.values(), indent, "New:");
+ describeTasks(builder, running.values(), indent, "Running:");
+ describePartitions(builder, runningByPartition.keySet(), indent,
"Running Partitions:");
+ describeTasks(builder, created.values(), indent, "New:");
return builder.toString();
}
- void describe(final StringBuilder builder,
- final Collection<T> tasks,
- final String indent,
- final String name) {
+ void describeTasks(final StringBuilder builder,
+ final Collection<T> tasks,
+ final String indent,
+ final String name) {
builder.append(indent).append(name);
for (final T t : tasks) {
builder.append(indent).append(t.toString(indent + "\t\t"));
@@ -162,6 +183,17 @@ abstract class AssignedTasks<T extends Task> {
builder.append("\n");
}
+ void describePartitions(final StringBuilder builder,
+ final Collection<TopicPartition> partitions,
+ final String indent,
+ final String name) {
+ builder.append(indent).append(name);
+ for (final TopicPartition tp : partitions) {
+ builder.append(indent).append(tp.toString());
+ }
+ builder.append("\n");
+ }
+
List<T> allTasks() {
final List<T> tasks = new ArrayList<>();
tasks.addAll(running.values());
@@ -182,18 +214,6 @@ abstract class AssignedTasks<T extends Task> {
created.clear();
}
- boolean isEmpty() throws IllegalStateException {
- if (running.isEmpty() && !runningByPartition.isEmpty()) {
- log.error("Assigned stream tasks in an inconsistent state: the set
of running tasks is empty but the " +
- "running by partitions map contained {}",
runningByPartition);
- throw new IllegalStateException("Found inconsistent state: no
tasks running but nonempty runningByPartition");
- } else {
- return runningByPartition.isEmpty()
- && running.isEmpty()
- && created.isEmpty();
- }
- }
-
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index c14f1cc..669eabf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -207,7 +207,7 @@ public class StoreChangelogReader implements
ChangelogReader {
restoreToOffsets.get(partition));
restorer.setStartingOffset(restoreConsumer.position(partition));
- log.debug("Calling restorer for partition {} of task {}",
partition, active.restoringTaskFor(partition));
+ log.debug("Calling restorer for partition {}", partition);
restorer.restoreStarted();
} else {
log.trace("Did not find checkpoint from changelog {} for store
{}, rewinding to beginning.", partition, restorer.storeName());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index a4f1f6a..9544a5b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -143,8 +143,8 @@ public class StreamsRebalanceListener implements
ConsumerRebalanceListener {
Set<TaskId> lostTasks = new HashSet<>();
final long start = time.milliseconds();
try {
- // close lost active tasks but don't try to commit offsets as we
no longer own them
- lostTasks = taskManager.closeLostTasks(lostPartitions);
+ // close all active tasks as lost but don't try to commit offsets
as we no longer own them
+ lostTasks = taskManager.closeLostTasks();
} catch (final Throwable t) {
log.error(
"Error caught during partitions lost, " +
@@ -154,7 +154,7 @@ public class StreamsRebalanceListener implements
ConsumerRebalanceListener {
streamThread.setRebalanceException(t);
} finally {
log.info("partitions lost took {} ms.\n" +
- "\tsuspended lost active tasks: {}\n",
+ "\tclosed lost active tasks: {}\n",
time.milliseconds() - start,
lostTasks);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9c3539e..21126d5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -257,56 +257,32 @@ public class TaskManager {
/**
* Closes active tasks as zombies, as these partitions have been lost and
are no longer owned.
+ * NOTE this method assumes that when it is called, EVERY task/partition
has been lost and must
+ * be closed as a zombie.
* @return list of lost tasks
*/
- Set<TaskId> closeLostTasks(final Collection<TopicPartition>
lostPartitions) {
- final Set<TaskId> zombieTasks = partitionsToTaskSet(lostPartitions);
- log.debug("Closing lost tasks as zombies: {}", zombieTasks);
+ Set<TaskId> closeLostTasks() {
+ final Set<TaskId> lostTasks = new
HashSet<>(assignedActiveTasks.keySet());
+ log.debug("Closing lost active tasks as zombies: {}", lostTasks);
- final List<TopicPartition> lostTaskChangelogs = new ArrayList<>();
+ final RuntimeException exception = active.closeAllTasksAsZombies();
- final RuntimeException exception =
active.closeZombieTasks(zombieTasks, lostTaskChangelogs);
+ log.debug("Clearing assigned active tasks: {}", assignedActiveTasks);
+ assignedActiveTasks.clear();
- assignedActiveTasks.keySet().removeAll(zombieTasks);
- changelogReader.remove(lostTaskChangelogs);
- removeChangelogsFromRestoreConsumer(lostTaskChangelogs, false);
+ log.debug("Clearing the store changelog reader: {}", changelogReader);
+ changelogReader.clear();
- if (exception != null) {
- throw exception;
- }
-
- verifyActiveTaskStateIsEmpty();
-
- return zombieTasks;
- }
-
- private void verifyActiveTaskStateIsEmpty() throws RuntimeException {
- final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
-
- // Verify active has no remaining state, and catch if active.isEmpty
throws so we can log any non-empty state
- try {
- if (!(active.isEmpty())) {
- log.error("The set of active tasks was non-empty: {}", active);
- firstException.compareAndSet(null, new
IllegalStateException("TaskManager found leftover active task state after
closing all zombies"));
- }
- } catch (final IllegalStateException e) {
- firstException.compareAndSet(null, e);
- }
-
- if (!(assignedActiveTasks.isEmpty())) {
- log.error("The set assignedActiveTasks was non-empty: {}",
assignedActiveTasks);
- firstException.compareAndSet(null, new
IllegalStateException("TaskManager found leftover assignedActiveTasks after
closing all zombies"));
+ if (!restoreConsumerAssignedStandbys) {
+ log.debug("Clearing the restore consumer's assignment: {}",
restoreConsumer.assignment());
+ restoreConsumer.unsubscribe();
}
- if (!(changelogReader.isEmpty())) {
- log.error("The changelog-reader's internal state was non-empty:
{}", changelogReader);
- firstException.compareAndSet(null, new
IllegalStateException("TaskManager found leftover changelog reader state after
closing all zombies"));
+ if (exception != null) {
+ throw exception;
}
- final RuntimeException fatalException = firstException.get();
- if (fatalException != null) {
- throw fatalException;
- }
+ return lostTasks;
}
void shutdown(final boolean clean) {
@@ -413,6 +389,8 @@ public class TaskManager {
final Collection<TopicPartition> restored =
changelogReader.restore(active);
active.updateRestored(restored);
removeChangelogsFromRestoreConsumer(restored, false);
+ } else {
+ active.clearRestoringPartitions();
}
if (active.allTasksRunning()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 42dc58b..6d98ada 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -84,8 +84,8 @@ public class AssignedStreamsTasksTest {
public void shouldInitializeNewTasks() {
t1.initializeMetadata();
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
+
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.replay(t1);
addAndInitTask();
@@ -99,15 +99,15 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
t1.initializeTopology();
EasyMock.expectLastCall().once();
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
+
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t2.initializeMetadata();
EasyMock.expect(t2.initializeStateStores()).andReturn(true);
t2.initializeTopology();
EasyMock.expectLastCall().once();
final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
- EasyMock.expect(t2.partitions()).andReturn(t2partitions);
-
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
+ EasyMock.expect(t2.partitions()).andReturn(t2partitions).anyTimes();
+
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.replay(t1, t2);
@@ -127,8 +127,8 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t2.initializeStateStores()).andReturn(true);
t2.initializeTopology();
EasyMock.expectLastCall().once();
- EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
+
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
+
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.replay(t2);
@@ -173,8 +173,8 @@ public class AssignedStreamsTasksTest {
public void shouldCloseRestoringTasks() {
t1.initializeMetadata();
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
-
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).times(2);
-
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(3);
+
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.closeStateManager(true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@@ -186,8 +186,9 @@ public class AssignedStreamsTasksTest {
}
@Test
- public void shouldClosedUnInitializedTasksOnSuspend() {
-
EasyMock.expect(t1.changelogPartitions()).andAnswer(Collections::emptyList);
+ public void shouldCloseUnInitializedTasksOnSuspend() {
+
EasyMock.expect(t1.partitions()).andAnswer(Collections::emptySet).anyTimes();
+
EasyMock.expect(t1.changelogPartitions()).andAnswer(Collections::emptyList).anyTimes();
t1.close(false, false);
EasyMock.expectLastCall();
@@ -213,8 +214,6 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldCloseTaskOnSuspendWhenRuntimeException() {
mockTaskInitialization();
-
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
-
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.suspend();
EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
@@ -232,8 +231,6 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
mockTaskInitialization();
-
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
-
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.suspend();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
@@ -281,8 +278,8 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t1.initializeStateStores()).andReturn(true);
t1.initializeTopology();
EasyMock.expectLastCall().once();
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList());
+
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
}
@Test
@@ -537,10 +534,6 @@ public class AssignedStreamsTasksTest {
return assignedTasks.created.keySet();
}
- @Override
- public List<TopicPartition> expectedLostChangelogs() {
- return clearingPartitions;
- }
}.createTaskAndClear();
}
@@ -549,7 +542,6 @@ public class AssignedStreamsTasksTest {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
-
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.closeStateManager(false);
}
@@ -563,10 +555,6 @@ public class AssignedStreamsTasksTest {
return assignedTasks.restoringTaskIds();
}
- @Override
- public List<TopicPartition> expectedLostChangelogs() {
- return clearingPartitions;
- }
}.createTaskAndClear();
}
@@ -576,7 +564,6 @@ public class AssignedStreamsTasksTest {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
-
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.close(false, true);
}
@@ -590,10 +577,6 @@ public class AssignedStreamsTasksTest {
return assignedTasks.runningTaskIds();
}
- @Override
- public List<TopicPartition> expectedLostChangelogs() {
- return clearingPartitions;
- }
}.createTaskAndClear();
}
@@ -603,7 +586,6 @@ public class AssignedStreamsTasksTest {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
-
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.suspend();
task.closeSuspended(false, null);
}
@@ -621,11 +603,7 @@ public class AssignedStreamsTasksTest {
public Set<TaskId> taskIds() {
return assignedTasks.suspendedTaskIds();
}
-
- @Override
- public List<TopicPartition> expectedLostChangelogs() {
- return Collections.emptyList();
- }
+
}.createTaskAndClear();
}
@@ -640,23 +618,21 @@ public class AssignedStreamsTasksTest {
abstract Set<TaskId> taskIds();
- abstract List<TopicPartition> expectedLostChangelogs();
-
void createTaskAndClear() {
final StreamTask task = EasyMock.createMock(StreamTask.class);
EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
+
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
+
EasyMock.expect(task.toString(EasyMock.anyString())).andReturn("task").anyTimes();
additionalSetup(task);
EasyMock.replay(task);
action(task);
- final List<TopicPartition> changelogs = new ArrayList<>();
final Set<TaskId> ids = new
HashSet<>(Collections.singleton(task.id()));
assertEquals(ids, taskIds());
- assignedTasks.closeZombieTasks(ids, changelogs);
+ assignedTasks.closeAllTasksAsZombies();
assertEquals(Collections.emptySet(), taskIds());
- assertEquals(expectedLostChangelogs(), changelogs);
}
}