This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new cbc9f57  HOTFIX: safely clear all active state in onPartitionsLost 
(#7691)
cbc9f57 is described below

commit cbc9f57c0e6a2f9cd5252c11aeb19abefba4263e
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Nov 19 13:34:38 2019 -0800

    HOTFIX: safely clear all active state in onPartitionsLost (#7691)
    
    After a number of last minute bugs were found stemming from the incremental 
closing of lost tasks in StreamsRebalanceListener#onPartitionsLost, a safer 
approach to this edge case seems warranted. We initially wanted to be as 
"future-proof" as possible, and avoid baking further protocol assumptions into 
the code that may be broken as the protocol evolves. This meant that rather 
than simply closing all active tasks and clearing all associated state in 
#onPartitionsLost(lostPartitions) w [...]
    
    Therefore, before worrying about it being "future-proof" it seems we should 
make sure it is "present-day-proof" and implement this callback in the safest 
possible way, by blindly clearing and closing all active task state. We log all 
the relevant state (at debug level) before clearing it, so we can at least tell 
from the logs whether/which emptiness checks were being violated.
    
    Reviewers: Guozhang Wang <[email protected]>, Bill Bejeck 
<[email protected]>, Andrew Choi <[email protected]>
---
 .../consumer/ConsumerRebalanceListener.java        |   3 +-
 .../processor/internals/AssignedStandbyTasks.java  |   3 +-
 .../processor/internals/AssignedStreamsTasks.java  | 128 +++++++++++----------
 .../streams/processor/internals/AssignedTasks.java |  64 +++++++----
 .../processor/internals/StoreChangelogReader.java  |   2 +-
 .../internals/StreamsRebalanceListener.java        |   6 +-
 .../streams/processor/internals/TaskManager.java   |  58 +++-------
 .../internals/AssignedStreamsTasksTest.java        |  62 +++-------
 8 files changed, 156 insertions(+), 170 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index ec2ce39..2f43b60 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -188,7 +188,8 @@ public interface ConsumerRebalanceListener {
      * necessary to catch these exceptions and re-attempt to wakeup or 
interrupt the consumer thread.
      *
      * @param partitions The list of partitions that were assigned to the 
consumer and now have been reassigned
-     *                   to other consumers (may not include all currently 
assigned partitions, i.e. there may still
+     *                   to other consumers. With the current protocol this 
will always include all of the consumer's
+     *                   previously assigned partitions, but this may change 
in future protocols (ie there would still
      *                   be some partitions left)
      * @throws org.apache.kafka.common.errors.WakeupException If raised from a 
nested call to {@link KafkaConsumer}
      * @throws org.apache.kafka.common.errors.InterruptException If raised 
from a nested call to {@link KafkaConsumer}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index 0f8896e..299390b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -79,7 +80,7 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> 
{
             } catch (final RuntimeException e) {
                 log.error("Closing the standby task {} failed due to the 
following error:", task.id(), e);
             } finally {
-                removeTaskFromRunning(task);
+                removeTaskFromAllStateMaps(task, Collections.emptyMap());
                 revokedChangelogs.addAll(task.changelogPartitions());
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index c9fe9fd..9440c4c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.ArrayList;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
@@ -83,6 +84,15 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> 
implements Restorin
     boolean hasRestoringTasks() {
         return !restoring.isEmpty();
     }
+
+    void clearRestoringPartitions() {
+        if (!restoring.isEmpty()) {
+            log.error("Tried to clear restoring partitions but was still 
restoring the stream tasks {}", restoring);
+            throw new IllegalStateException("Should not clear restoring 
partitions while set of restoring tasks is non-empty");
+        }
+        restoredPartitions.clear();
+        restoringByPartition.clear();
+    }
     
     Set<TaskId> suspendedTaskIds() {
         return suspended.keySet();
@@ -152,7 +162,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                         id, f);
                 }
             } finally {
-                removeTaskFromRunning(task);
+                removeTaskFromAllStateMaps(task, suspended);
                 taskChangelogs.addAll(task.changelogPartitions());
             }
         }
@@ -189,10 +199,8 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     }
 
     private RuntimeException closeRunning(final boolean isZombie,
-                                          final StreamTask task,
-                                          final List<TopicPartition> 
closedTaskChangelogs) {
-        removeTaskFromRunning(task);
-        closedTaskChangelogs.addAll(task.changelogPartitions());
+                                          final StreamTask task) {
+        removeTaskFromAllStateMaps(task, Collections.emptyMap());
 
         try {
             final boolean clean = !isZombie;
@@ -208,7 +216,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     private RuntimeException closeNonRunning(final boolean isZombie,
                                              final StreamTask task,
                                              final List<TopicPartition> 
closedTaskChangelogs) {
-        created.remove(task.id());
+        removeTaskFromAllStateMaps(task, Collections.emptyMap());
         closedTaskChangelogs.addAll(task.changelogPartitions());
 
         try {
@@ -221,10 +229,11 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         return null;
     }
 
+    // Since a restoring task has not had its topology initialized yet, we 
need only close the state manager
     private RuntimeException closeRestoring(final boolean isZombie,
                                             final StreamTask task,
                                             final List<TopicPartition> 
closedTaskChangelogs) {
-        removeTaskFromRestoring(task);
+        removeTaskFromAllStateMaps(task, Collections.emptyMap());
         closedTaskChangelogs.addAll(task.changelogPartitions());
 
         try {
@@ -240,7 +249,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
 
     private RuntimeException closeSuspended(final boolean isZombie,
                                             final StreamTask task) {
-        suspended.remove(task.id());
+        removeTaskFromAllStateMaps(task, Collections.emptyMap());
 
         try {
             final boolean clean = !isZombie;
@@ -269,37 +278,30 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         return firstException.get();
     }
 
-    RuntimeException closeZombieTasks(final Set<TaskId> lostTasks, final 
List<TopicPartition> lostTaskChangelogs) {
+    RuntimeException closeAllTasksAsZombies() {
+        log.debug("Closing all active tasks as zombies, current state of 
active tasks: {}", toString());
+
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+        final List<TopicPartition> changelogs = new ArrayList<>(); // not 
used, as we clear/unsubscribe all changelogs
 
-        for (final TaskId id : lostTasks) {
-            if (suspended.containsKey(id)) {
-                log.debug("Closing the zombie suspended stream task {}.", id);
-                firstException.compareAndSet(null, closeSuspended(true, 
suspended.get(id)));
+        for (final TaskId id : allAssignedTaskIds()) {
+            if (running.containsKey(id)) {
+                log.debug("Closing the zombie running stream task {}.", id);
+                firstException.compareAndSet(null, closeRunning(true, 
running.get(id)));
             } else if (created.containsKey(id)) {
                 log.debug("Closing the zombie created stream task {}.", id);
-                firstException.compareAndSet(null, closeNonRunning(true, 
created.get(id), lostTaskChangelogs));
+                firstException.compareAndSet(null, closeNonRunning(true, 
created.get(id), changelogs));
             } else if (restoring.containsKey(id)) {
                 log.debug("Closing the zombie restoring stream task {}.", id);
-                firstException.compareAndSet(null, closeRestoring(true, 
restoring.get(id), lostTaskChangelogs));
-            } else if (running.containsKey(id)) {
-                log.debug("Closing the zombie running stream task {}.", id);
-                firstException.compareAndSet(null, closeRunning(true, 
running.get(id), lostTaskChangelogs));
-            } else {
-                log.warn("Skipping closing the zombie stream task {} as it was 
already removed.", id);
+                firstException.compareAndSet(null, closeRestoring(true, 
restoring.get(id), changelogs));
+            } else if (suspended.containsKey(id)) {
+                log.debug("Closing the zombie suspended stream task {}.", id);
+                firstException.compareAndSet(null, closeSuspended(true, 
suspended.get(id)));
             }
         }
 
-        // We always clear the prevActiveTasks and replace with current set of 
running tasks to encode in subscription
-        // We should exclude any tasks that were lost however, they will be 
counted as standbys for assignment purposes
-        prevActiveTasks.clear();
-        prevActiveTasks.addAll(running.keySet());
+        clear();
 
-        // With the current rebalance protocol, there should not be any 
running tasks left as they were all lost
-        if (!prevActiveTasks.isEmpty()) {
-            log.error("Found the still running stream tasks {} after closing 
all tasks lost as zombies", prevActiveTasks);
-            firstException.compareAndSet(null, new IllegalStateException("Not 
all lost tasks were closed as zombies"));
-        }
         return firstException.get();
     }
 
@@ -311,7 +313,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         if (suspended.containsKey(taskId)) {
             final StreamTask task = suspended.get(taskId);
             log.trace("Found suspended stream task {}", taskId);
-            suspended.remove(taskId);
+            removeTaskFromAllStateMaps(task, Collections.emptyMap());
 
             if (task.partitions().equals(partitions)) {
                 task.resume();
@@ -346,8 +348,12 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
             if (restoredPartitions.containsAll(task.changelogPartitions())) {
                 transitionToRunning(task);
                 it.remove();
-                restoringByPartition.keySet().removeAll(task.partitions());
-                
restoringByPartition.keySet().removeAll(task.changelogPartitions());
+                // Note that because we add back all restored partitions at 
the top of this loop, clearing them from
+                // restoredPartitions here doesn't really matter. We do it 
anyway as it is the correct thing to do,
+                // and may matter with future changes.
+                removeFromRestoredPartitions(task);
+                removeFromRestoringByPartition(task);
+
                 log.debug("Stream task {} completed restoration as all its 
changelog partitions {} have been applied to restore state",
                     task.id(),
                     task.changelogPartitions());
@@ -372,6 +378,24 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         }
     }
 
+    @Override
+     void removeTaskFromAllStateMaps(final StreamTask task, final Map<TaskId, 
StreamTask> currentStateMap) {
+        super.removeTaskFromAllStateMaps(task, currentStateMap);
+
+        final TaskId id = task.id();
+        final Set<TopicPartition> taskPartitions = new 
HashSet<>(task.partitions());
+        taskPartitions.addAll(task.changelogPartitions());
+
+        if (currentStateMap != restoring) {
+            restoring.remove(id);
+            restoringByPartition.keySet().removeAll(taskPartitions);
+            restoredPartitions.removeAll(taskPartitions);
+        }
+        if (currentStateMap != suspended) {
+            suspended.remove(id);
+        }
+    }
+
     void addTaskToRestoring(final StreamTask task) {
         restoring.put(task.id(), task);
         for (final TopicPartition topicPartition : task.partitions()) {
@@ -382,16 +406,14 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         }
     }
 
-    private void removeTaskFromRestoring(final StreamTask task) {
-        restoring.remove(task.id());
-        for (final TopicPartition topicPartition : task.partitions()) {
-            restoredPartitions.remove(topicPartition);
-            restoringByPartition.remove(topicPartition);
-        }
-        for (final TopicPartition topicPartition : task.changelogPartitions()) 
{
-            restoredPartitions.remove(topicPartition);
-            restoringByPartition.remove(topicPartition);
-        }
+    private void removeFromRestoringByPartition(final StreamTask task) {
+        restoringByPartition.keySet().removeAll(task.partitions());
+        restoringByPartition.keySet().removeAll(task.changelogPartitions());
+    }
+
+    private void removeFromRestoredPartitions(final StreamTask task) {
+        restoredPartitions.removeAll(task.partitions());
+        restoredPartitions.removeAll(task.changelogPartitions());
     }
 
     /**
@@ -497,6 +519,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         restoringByPartition.clear();
         restoredPartitions.clear();
         suspended.clear();
+        prevActiveTasks.clear();
     }
 
     @Override
@@ -511,26 +534,13 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         super.shutdown(clean);
     }
 
-    @Override
-    public boolean isEmpty() throws IllegalStateException {
-        if (restoring.isEmpty() && !restoringByPartition.isEmpty()) {
-            log.error("Assigned stream tasks in an inconsistent state: the set 
of restoring tasks is empty but the " +
-                      "restoring by partitions map contained {}", 
restoringByPartition);
-            throw new IllegalStateException("Found inconsistent state: no 
tasks restoring but nonempty restoringByPartition");
-        } else {
-            return super.isEmpty()
-                       && restoring.isEmpty()
-                       && restoringByPartition.isEmpty()
-                       && restoredPartitions.isEmpty()
-                       && suspended.isEmpty();
-        }
-    }
-
     public String toString(final String indent) {
         final StringBuilder builder = new StringBuilder();
         builder.append(super.toString(indent));
-        describe(builder, restoring.values(), indent, "Restoring:");
-        describe(builder, suspended.values(), indent, "Suspended:");
+        describeTasks(builder, restoring.values(), indent, "Restoring:");
+        describePartitions(builder, restoringByPartition.keySet(), indent, 
"Restoring Partitions:");
+        describePartitions(builder, restoredPartitions, indent, "Restored 
Partitions:");
+        describeTasks(builder, suspended.values(), indent, "Suspended:");
         return builder.toString();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index cc4c0f9..45ac26b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -69,12 +69,17 @@ abstract class AssignedTasks<T extends Task> {
             try {
                 final T task = entry.getValue();
                 task.initializeMetadata();
+
+                // don't remove from created until the task has been 
successfully initialized
+                removeTaskFromAllStateMaps(task, created);
+
                 if (!task.initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
                     ((AssignedStreamsTasks) 
this).addTaskToRestoring((StreamTask) task);
                 } else {
                     transitionToRunning(task);
                 }
+
                 it.remove();
             } catch (final LockException e) {
                 // If this is a permanent error, then we could spam the log 
since this is in the run loop. But, other related
@@ -121,10 +126,25 @@ abstract class AssignedTasks<T extends Task> {
         }
     }
 
-    void removeTaskFromRunning(final T task) {
-        running.remove(task.id());
-        runningByPartition.keySet().removeAll(task.partitions());
-        runningByPartition.keySet().removeAll(task.changelogPartitions());
+    /**
+     * Removes the passed in task (and its corresponding partitions) from all 
state maps and sets,
+     * except for the one it currently resides in.
+     *
+     * @param task the task to be removed
+     * @param currentStateMap the current state map, which the task should not 
be removed from
+     */
+    void removeTaskFromAllStateMaps(final T task, final Map<TaskId, T> 
currentStateMap) {
+        final TaskId id = task.id();
+        final Set<TopicPartition> taskPartitions = new 
HashSet<>(task.partitions());
+        taskPartitions.addAll(task.changelogPartitions());
+
+        if (currentStateMap != running) {
+            running.remove(id);
+            runningByPartition.keySet().removeAll(taskPartitions);
+        }
+        if (currentStateMap != created) {
+            created.remove(id);
+        }
     }
 
     T runningTaskFor(final TopicPartition partition) {
@@ -146,15 +166,16 @@ abstract class AssignedTasks<T extends Task> {
 
     public String toString(final String indent) {
         final StringBuilder builder = new StringBuilder();
-        describe(builder, running.values(), indent, "Running:");
-        describe(builder, created.values(), indent, "New:");
+        describeTasks(builder, running.values(), indent, "Running:");
+        describePartitions(builder, runningByPartition.keySet(), indent, 
"Running Partitions:");
+        describeTasks(builder, created.values(), indent, "New:");
         return builder.toString();
     }
 
-    void describe(final StringBuilder builder,
-                  final Collection<T> tasks,
-                  final String indent,
-                  final String name) {
+    void describeTasks(final StringBuilder builder,
+                       final Collection<T> tasks,
+                       final String indent,
+                       final String name) {
         builder.append(indent).append(name);
         for (final T t : tasks) {
             builder.append(indent).append(t.toString(indent + "\t\t"));
@@ -162,6 +183,17 @@ abstract class AssignedTasks<T extends Task> {
         builder.append("\n");
     }
 
+    void describePartitions(final StringBuilder builder,
+                            final Collection<TopicPartition> partitions,
+                            final String indent,
+                            final String name) {
+        builder.append(indent).append(name);
+        for (final TopicPartition tp : partitions) {
+            builder.append(indent).append(tp.toString());
+        }
+        builder.append("\n");
+    }
+
     List<T> allTasks() {
         final List<T> tasks = new ArrayList<>();
         tasks.addAll(running.values());
@@ -182,18 +214,6 @@ abstract class AssignedTasks<T extends Task> {
         created.clear();
     }
 
-    boolean isEmpty() throws IllegalStateException {
-        if (running.isEmpty() && !runningByPartition.isEmpty()) {
-            log.error("Assigned stream tasks in an inconsistent state: the set 
of running tasks is empty but the " +
-                          "running by partitions map contained {}", 
runningByPartition);
-            throw new IllegalStateException("Found inconsistent state: no 
tasks running but nonempty runningByPartition");
-        } else {
-            return runningByPartition.isEmpty()
-                       && running.isEmpty()
-                       && created.isEmpty();
-        }
-    }
-
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index c14f1cc..669eabf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -207,7 +207,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
                         restoreToOffsets.get(partition));
                 
restorer.setStartingOffset(restoreConsumer.position(partition));
 
-                log.debug("Calling restorer for partition {} of task {}", 
partition, active.restoringTaskFor(partition));
+                log.debug("Calling restorer for partition {}", partition);
                 restorer.restoreStarted();
             } else {
                 log.trace("Did not find checkpoint from changelog {} for store 
{}, rewinding to beginning.", partition, restorer.storeName());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index a4f1f6a..9544a5b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -143,8 +143,8 @@ public class StreamsRebalanceListener implements 
ConsumerRebalanceListener {
         Set<TaskId> lostTasks = new HashSet<>();
         final long start = time.milliseconds();
         try {
-            // close lost active tasks but don't try to commit offsets as we 
no longer own them
-            lostTasks = taskManager.closeLostTasks(lostPartitions);
+            // close all active tasks as lost but don't try to commit offsets 
as we no longer own them
+            lostTasks = taskManager.closeLostTasks();
         } catch (final Throwable t) {
             log.error(
                 "Error caught during partitions lost, " +
@@ -154,7 +154,7 @@ public class StreamsRebalanceListener implements 
ConsumerRebalanceListener {
             streamThread.setRebalanceException(t);
         } finally {
             log.info("partitions lost took {} ms.\n" +
-                    "\tsuspended lost active tasks: {}\n",
+                    "\tclosed lost active tasks: {}\n",
                 time.milliseconds() - start,
                 lostTasks);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9c3539e..21126d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -257,56 +257,32 @@ public class TaskManager {
 
     /**
      * Closes active tasks as zombies, as these partitions have been lost and 
are no longer owned.
+     * NOTE this method assumes that when it is called, EVERY task/partition 
has been lost and must
+     * be closed as a zombie.
      * @return list of lost tasks
      */
-    Set<TaskId> closeLostTasks(final Collection<TopicPartition> 
lostPartitions) {
-        final Set<TaskId> zombieTasks = partitionsToTaskSet(lostPartitions);
-        log.debug("Closing lost tasks as zombies: {}", zombieTasks);
+    Set<TaskId> closeLostTasks() {
+        final Set<TaskId> lostTasks = new 
HashSet<>(assignedActiveTasks.keySet());
+        log.debug("Closing lost active tasks as zombies: {}", lostTasks);
 
-        final List<TopicPartition> lostTaskChangelogs = new ArrayList<>();
+        final RuntimeException exception = active.closeAllTasksAsZombies();
 
-        final RuntimeException exception = 
active.closeZombieTasks(zombieTasks, lostTaskChangelogs);
+        log.debug("Clearing assigned active tasks: {}", assignedActiveTasks);
+        assignedActiveTasks.clear();
 
-        assignedActiveTasks.keySet().removeAll(zombieTasks);
-        changelogReader.remove(lostTaskChangelogs);
-        removeChangelogsFromRestoreConsumer(lostTaskChangelogs, false);
+        log.debug("Clearing the store changelog reader: {}", changelogReader);
+        changelogReader.clear();
 
-        if (exception != null) {
-            throw exception;
-        }
-
-        verifyActiveTaskStateIsEmpty();
-
-        return zombieTasks;
-    }
-
-    private void verifyActiveTaskStateIsEmpty() throws RuntimeException {
-        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
-
-        // Verify active has no remaining state, and catch if active.isEmpty 
throws so we can log any non-empty state
-        try {
-            if (!(active.isEmpty())) {
-                log.error("The set of active tasks was non-empty: {}", active);
-                firstException.compareAndSet(null, new 
IllegalStateException("TaskManager found leftover active task state after 
closing all zombies"));
-            }
-        } catch (final IllegalStateException e) {
-            firstException.compareAndSet(null, e);
-        }
-
-        if (!(assignedActiveTasks.isEmpty())) {
-            log.error("The set assignedActiveTasks was non-empty: {}", 
assignedActiveTasks);
-            firstException.compareAndSet(null, new 
IllegalStateException("TaskManager found leftover assignedActiveTasks after 
closing all zombies"));
+        if (!restoreConsumerAssignedStandbys) {
+            log.debug("Clearing the restore consumer's assignment: {}", 
restoreConsumer.assignment());
+            restoreConsumer.unsubscribe();
         }
 
-        if (!(changelogReader.isEmpty())) {
-            log.error("The changelog-reader's internal state was non-empty: 
{}", changelogReader);
-            firstException.compareAndSet(null, new 
IllegalStateException("TaskManager found leftover changelog reader state after 
closing all zombies"));
+        if (exception != null) {
+            throw exception;
         }
 
-        final RuntimeException fatalException = firstException.get();
-        if (fatalException != null) {
-            throw fatalException;
-        }
+        return lostTasks;
     }
 
     void shutdown(final boolean clean) {
@@ -413,6 +389,8 @@ public class TaskManager {
             final Collection<TopicPartition> restored = 
changelogReader.restore(active);
             active.updateRestored(restored);
             removeChangelogsFromRestoreConsumer(restored, false);
+        } else {
+            active.clearRestoringPartitions();
         }
 
         if (active.allTasksRunning()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 42dc58b..6d98ada 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -84,8 +84,8 @@ public class AssignedStreamsTasksTest {
     public void shouldInitializeNewTasks() {
         t1.initializeMetadata();
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
+        
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -99,15 +99,15 @@ public class AssignedStreamsTasksTest {
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         t1.initializeTopology();
         EasyMock.expectLastCall().once();
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
+        
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         t2.initializeMetadata();
         EasyMock.expect(t2.initializeStateStores()).andReturn(true);
         t2.initializeTopology();
         EasyMock.expectLastCall().once();
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
-        EasyMock.expect(t2.partitions()).andReturn(t2partitions);
-        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
+        EasyMock.expect(t2.partitions()).andReturn(t2partitions).anyTimes();
+        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
 
         EasyMock.replay(t1, t2);
 
@@ -127,8 +127,8 @@ public class AssignedStreamsTasksTest {
         EasyMock.expect(t2.initializeStateStores()).andReturn(true);
         t2.initializeTopology();
         EasyMock.expectLastCall().once();
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
+        
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
+        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
 
         EasyMock.replay(t2);
 
@@ -173,8 +173,8 @@ public class AssignedStreamsTasksTest {
     public void shouldCloseRestoringTasks() {
         t1.initializeMetadata();
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
-        
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).times(2);
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(3);
+        
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         t1.closeStateManager(true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -186,8 +186,9 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldClosedUnInitializedTasksOnSuspend() {
-        
EasyMock.expect(t1.changelogPartitions()).andAnswer(Collections::emptyList);
+    public void shouldCloseUnInitializedTasksOnSuspend() {
+        
EasyMock.expect(t1.partitions()).andAnswer(Collections::emptySet).anyTimes();
+        
EasyMock.expect(t1.changelogPartitions()).andAnswer(Collections::emptyList).anyTimes();
 
         t1.close(false, false);
         EasyMock.expectLastCall();
@@ -213,8 +214,6 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldCloseTaskOnSuspendWhenRuntimeException() {
         mockTaskInitialization();
-        
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
 
         t1.suspend();
         EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
@@ -232,8 +231,6 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
         mockTaskInitialization();
-        
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
 
         t1.suspend();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
@@ -281,8 +278,8 @@ public class AssignedStreamsTasksTest {
         EasyMock.expect(t1.initializeStateStores()).andReturn(true);
         t1.initializeTopology();
         EasyMock.expectLastCall().once();
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList());
+        
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
     }
 
     @Test
@@ -537,10 +534,6 @@ public class AssignedStreamsTasksTest {
                 return assignedTasks.created.keySet();
             }
 
-            @Override
-            public List<TopicPartition> expectedLostChangelogs() {
-                return clearingPartitions;
-            }
         }.createTaskAndClear();
     }
 
@@ -549,7 +542,6 @@ public class AssignedStreamsTasksTest {
         new TaskTestSuite() {
             @Override
             public void additionalSetup(final StreamTask task) {
-                
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
                 task.closeStateManager(false);
             }
 
@@ -563,10 +555,6 @@ public class AssignedStreamsTasksTest {
                 return assignedTasks.restoringTaskIds();
             }
 
-            @Override
-            public List<TopicPartition> expectedLostChangelogs() {
-                return clearingPartitions;
-            }
         }.createTaskAndClear();
     }
 
@@ -576,7 +564,6 @@ public class AssignedStreamsTasksTest {
             @Override
             public void additionalSetup(final StreamTask task) {
                 task.initializeTopology();
-                
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
                 task.close(false, true);
             }
 
@@ -590,10 +577,6 @@ public class AssignedStreamsTasksTest {
                 return assignedTasks.runningTaskIds();
             }
 
-            @Override
-            public List<TopicPartition> expectedLostChangelogs() {
-                return clearingPartitions;
-            }
         }.createTaskAndClear();
     }
 
@@ -603,7 +586,6 @@ public class AssignedStreamsTasksTest {
             @Override
             public void additionalSetup(final StreamTask task) {
                 task.initializeTopology();
-                
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
                 task.suspend();
                 task.closeSuspended(false, null);
             }
@@ -621,11 +603,7 @@ public class AssignedStreamsTasksTest {
             public Set<TaskId> taskIds() {
                 return assignedTasks.suspendedTaskIds();
             }
-
-            @Override
-            public List<TopicPartition> expectedLostChangelogs() {
-                return Collections.emptyList();
-            }
+            
         }.createTaskAndClear();
     }
 
@@ -640,23 +618,21 @@ public class AssignedStreamsTasksTest {
 
         abstract Set<TaskId> taskIds();
 
-        abstract List<TopicPartition> expectedLostChangelogs();
-
         void createTaskAndClear() {
             final StreamTask task = EasyMock.createMock(StreamTask.class);
             EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
+            
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
             
EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
+            
EasyMock.expect(task.toString(EasyMock.anyString())).andReturn("task").anyTimes();
             additionalSetup(task);
             EasyMock.replay(task);
 
             action(task);
-            final List<TopicPartition> changelogs = new ArrayList<>();
             final Set<TaskId> ids = new 
HashSet<>(Collections.singleton(task.id()));
             assertEquals(ids, taskIds());
 
-            assignedTasks.closeZombieTasks(ids, changelogs);
+            assignedTasks.closeAllTasksAsZombies();
             assertEquals(Collections.emptySet(), taskIds());
-            assertEquals(expectedLostChangelogs(), changelogs);
         }
     }
 

Reply via email to