This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c77417f  KAFKA-8972 (2.4 blocker): clear all state for zombie task on 
TaskMigratedException (#7608)
c77417f is described below

commit c77417f01df03c1248ddc72088d3396a69ad1b40
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed Oct 30 00:20:01 2019 -0400

    KAFKA-8972 (2.4 blocker): clear all state for zombie task on 
TaskMigratedException (#7608)
    
    Third bugfix for the failing broker bounce system test with cooperative 
rebalancing:
    
    tl;dr We need to remove everything associated with a task when it is 
closed, but in some cases (eg AssignedTasks#commit) on a 
TaskMigratedExceptionwe would close it as a zombie and then (only) remove the 
taskId from therunning` map. This left its partitions, restorers, state stores, 
etc around and in an undefined state, causing exceptions when closing and/or 
opening the stores again.
    
    Longer explanation:
    In AssignedTasks (the abstract class from which the standby and active task 
variations extend) a commit failure (even due to broker down/unavailable) is 
treated as a TaskMigratedException after which the failed task is closed as a 
zombie and removed from running -- the remaining tasks (ie those still in 
running are then also closed as zombies in the subsequent onPartitionsLost
    
    However we do not remove the closed task from runningByPartition nor do we 
remove the corresponding changelogs, if restoring, from the 
StoreChangelogReader since that applies only to active tasks, and AssignedTasks 
is generic/abstract. The changelog reader then retains a mapping from the 
closed task's changelog partition to its CompositeRestoreListener (and does not 
replace this when the new one comes along after the rebalance). The restore 
listener has a reference to a specific Rocks [...]
    
    Although technically this bug existed before KIP-429, it was only uncovered 
now that we remove tasks and clear their state/partitions/etc one at a time. We 
don't technically need to cherrypick the fix back earlier as before we just 
blindly clear all data structures entirely during an eager rebalance.
    
    Reviewers: Boyang Chen <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../processor/internals/AssignedStandbyTasks.java  | 10 +--
 .../processor/internals/AssignedStreamsTasks.java  | 96 ++++++++++------------
 .../streams/processor/internals/AssignedTasks.java | 24 ++++--
 .../processor/internals/ChangelogReader.java       |  4 +
 .../processor/internals/StoreChangelogReader.java  | 12 +++
 .../streams/processor/internals/TaskManager.java   |  4 +-
 .../internals/AssignedStreamsTasksTest.java        | 87 ++++++++------------
 .../processor/internals/MockChangelogReader.java   |  5 ++
 .../processor/internals/StreamThreadTest.java      | 12 +--
 9 files changed, 126 insertions(+), 128 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index 0c9a70d..f217a55 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -34,8 +34,8 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> 
{
     public void shutdown(final boolean clean) {
         final String shutdownType = clean ? "Clean" : "Unclean";
         log.debug(shutdownType + " shutdown of all standby tasks" + "\n" +
-                      "created tasks to close: {}" + "\n" +
-                      "running tasks to close: {}",
+                      "non-initialized standby tasks to close: {}" + "\n" +
+                      "running standby tasks to close: {}",
             clean, created.keySet(), running.keySet());
         super.shutdown(clean);
     }
@@ -63,7 +63,7 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> 
{
         final List<TopicPartition> revokedChangelogs = new ArrayList<>();
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
revokedTasks.entrySet()) {
             final TaskId taskId = entry.getKey();
-            final Task task;
+            final StandbyTask task;
 
             if (running.containsKey(taskId)) {
                 task = running.get(taskId);
@@ -77,9 +77,9 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> 
{
             try {
                 task.close(true, false);
             } catch (final RuntimeException e) {
-                log.error("Closing the {} {} failed due to the following 
error:", taskTypeName, task.id(), e);
+                log.error("Closing the standby task {} failed due to the 
following error:", task.id(), e);
             } finally {
-                running.remove(taskId);
+                removeTaskFromRunning(task);
                 revokedChangelogs.addAll(task.changelogPartitions());
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 3515824..63dac25 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -136,12 +136,12 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 suspended.put(id, task);
             } catch (final TaskMigratedException closeAsZombieAndSwallow) {
                 // swallow and move on since we are rebalancing
-                log.info("Failed to suspend the stream task {} since it got 
migrated to another thread already. " +
+                log.info("Failed to suspend stream task {} since it got 
migrated to another thread already. " +
                     "Closing it as zombie and moving on.", id);
                 firstException.compareAndSet(null, closeZombieTask(task));
                 prevActiveTasks.remove(id);
             } catch (final RuntimeException e) {
-                log.error("Suspending the stream task {} failed due to the 
following error:", id, e);
+                log.error("Suspending stream task {} failed due to the 
following error:", id, e);
                 firstException.compareAndSet(null, e);
                 try {
                     prevActiveTasks.remove(id);
@@ -152,9 +152,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                         id, f);
                 }
             } finally {
-                running.remove(id);
-                runningByPartition.keySet().removeAll(task.partitions());
-                
runningByPartition.keySet().removeAll(task.changelogPartitions());
+                removeTaskFromRunning(task);
                 taskChangelogs.addAll(task.changelogPartitions());
             }
         }
@@ -193,9 +191,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     private RuntimeException closeRunning(final boolean isZombie,
                                           final StreamTask task,
                                           final List<TopicPartition> 
closedTaskChangelogs) {
-        running.remove(task.id());
-        runningByPartition.keySet().removeAll(task.partitions());
-        runningByPartition.keySet().removeAll(task.changelogPartitions());
+        removeTaskFromRunning(task);
         closedTaskChangelogs.addAll(task.changelogPartitions());
 
         try {
@@ -228,12 +224,8 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     private RuntimeException closeRestoring(final boolean isZombie,
                                             final StreamTask task,
                                             final List<TopicPartition> 
closedTaskChangelogs) {
-        restoring.remove(task.id());
+        removeTaskFromRestoring(task);
         closedTaskChangelogs.addAll(task.changelogPartitions());
-        for (final TopicPartition tp : task.partitions()) {
-            restoredPartitions.remove(tp);
-            restoringByPartition.remove(tp);
-        }
 
         try {
             final boolean clean = !isZombie;
@@ -282,15 +274,19 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
 
         for (final TaskId id : lostTasks) {
             if (suspended.containsKey(id)) {
+                log.debug("Closing the zombie suspended stream task {}.", id);
                 firstException.compareAndSet(null, closeSuspended(true, 
suspended.get(id)));
             } else if (created.containsKey(id)) {
+                log.debug("Closing the zombie created stream task {}.", id);
                 firstException.compareAndSet(null, closeNonRunning(true, 
created.get(id), lostTaskChangelogs));
             } else if (restoring.containsKey(id)) {
+                log.debug("Closing the zombie restoring stream task {}.", id);
                 firstException.compareAndSet(null, closeRestoring(true, 
created.get(id), lostTaskChangelogs));
             } else if (running.containsKey(id)) {
+                log.debug("Closing the zombie running stream task {}.", id);
                 firstException.compareAndSet(null, closeRunning(true, 
running.get(id), lostTaskChangelogs));
             } else {
-                // task may have already been closed as a zombie and removed 
from all task maps
+                log.warn("Skipping closing the zombie stream task {} as it was 
already removed.", id);
             }
         }
 
@@ -324,14 +320,8 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 } catch (final TaskMigratedException e) {
                     // we need to catch migration exception internally since 
this function
                     // is triggered in the rebalance callback
-                    log.info("Failed to resume the stream task {} since it got 
migrated to another thread already. " +
-                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
-                    final RuntimeException fatalException = 
closeZombieTask(task);
-                    running.remove(taskId);
-
-                    if (fatalException != null) {
-                        throw fatalException;
-                    }
+                    log.info("Failed to resume stream task {} since it got 
migrated to another thread already. " +
+                             "Will trigger a new rebalance and close all tasks 
as zombies together.", task.id());
                     throw e;
                 }
                 log.trace("Resuming the suspended stream task {}", task.id());
@@ -374,7 +364,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         }
     }
 
-    void addToRestoring(final StreamTask task) {
+    void addTaskToRestoring(final StreamTask task) {
         restoring.put(task.id(), task);
         for (final TopicPartition topicPartition : task.partitions()) {
             restoringByPartition.put(topicPartition, task);
@@ -384,6 +374,18 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         }
     }
 
+    private void removeTaskFromRestoring(final StreamTask task) {
+        restoring.remove(task.id());
+        for (final TopicPartition topicPartition : task.partitions()) {
+            restoredPartitions.remove(topicPartition);
+            restoringByPartition.remove(topicPartition);
+        }
+        for (final TopicPartition topicPartition : task.changelogPartitions()) 
{
+            restoredPartitions.remove(topicPartition);
+            restoringByPartition.remove(topicPartition);
+        }
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -392,8 +394,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
         int committed = 0;
         RuntimeException firstException = null;
 
-        for (final Iterator<StreamTask> it = running().iterator(); 
it.hasNext(); ) {
-            final StreamTask task = it.next();
+        for (final StreamTask task : running.values()) {
             try {
                 if (task.commitRequested() && task.commitNeeded()) {
                     task.commit();
@@ -402,12 +403,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to commit stream task {} since it got 
migrated to another thread already. " +
-                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                         "Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
                 throw e;
             } catch (final RuntimeException t) {
                 log.error("Failed to commit stream task {} due to the 
following error:", task.id(), t);
@@ -443,21 +439,14 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     int process(final long now) {
         int processed = 0;
 
-        final Iterator<Map.Entry<TaskId, StreamTask>> it = 
running.entrySet().iterator();
-        while (it.hasNext()) {
-            final StreamTask task = it.next().getValue();
+        for (final StreamTask task : running.values()) {
             try {
                 if (task.isProcessable(now) && task.process()) {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to process stream task {} since it got 
migrated to another thread already. " +
-                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                        "Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
                 throw e;
             } catch (final RuntimeException e) {
                 log.error("Failed to process stream task {} due to the 
following error:", task.id(), e);
@@ -473,9 +462,8 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
      */
     int punctuate() {
         int punctuated = 0;
-        final Iterator<Map.Entry<TaskId, StreamTask>> it = 
running.entrySet().iterator();
-        while (it.hasNext()) {
-            final StreamTask task = it.next().getValue();
+
+        for (final StreamTask task : running.values()) {
             try {
                 if (task.maybePunctuateStreamTime()) {
                     punctuated++;
@@ -485,12 +473,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
-                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                        "Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
                 throw e;
             } catch (final KafkaException e) {
                 log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
@@ -512,14 +495,23 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     public void shutdown(final boolean clean) {
         final String shutdownType = clean ? "Clean" : "Unclean";
         log.debug(shutdownType + " shutdown of all active tasks" + "\n" +
-                      "non-initialized tasks to close: {}" + "\n" +
+                      "non-initialized stream tasks to close: {}" + "\n" +
                       "restoring tasks to close: {}" + "\n" +
-                      "running tasks to close: {}" + "\n" +
-                      "suspended tasks to close: {}",
+                      "running stream tasks to close: {}" + "\n" +
+                      "suspended stream tasks to close: {}",
             clean, created.keySet(), restoring.keySet(), running.keySet(), 
suspended.keySet());
         super.shutdown(clean);
     }
 
+    @Override
+    public boolean isEmpty() {
+        return super.isEmpty()
+            && restoring.isEmpty()
+            && restoringByPartition.isEmpty()
+            && restoredPartitions.isEmpty()
+            && suspended.isEmpty();
+    }
+
     public String toString(final String indent) {
         final StringBuilder builder = new StringBuilder();
         builder.append(super.toString(indent));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 56fefe0..6d64d40 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -71,7 +71,7 @@ abstract class AssignedTasks<T extends Task> {
                 task.initializeMetadata();
                 if (!task.initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
-                    ((AssignedStreamsTasks) this).addToRestoring((StreamTask) 
task);
+                    ((AssignedStreamsTasks) 
this).addTaskToRestoring((StreamTask) task);
                 } else {
                     transitionToRunning(task);
                 }
@@ -121,6 +121,12 @@ abstract class AssignedTasks<T extends Task> {
         }
     }
 
+    void removeTaskFromRunning(final T task) {
+        running.remove(task.id());
+        runningByPartition.keySet().removeAll(task.partitions());
+        runningByPartition.keySet().removeAll(task.changelogPartitions());
+    }
+
     T runningTaskFor(final TopicPartition partition) {
         return runningByPartition.get(partition);
     }
@@ -176,6 +182,12 @@ abstract class AssignedTasks<T extends Task> {
         created.clear();
     }
 
+    boolean isEmpty() {
+        return runningByPartition.isEmpty()
+                   && running.isEmpty()
+                   && created.isEmpty();
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -184,8 +196,7 @@ abstract class AssignedTasks<T extends Task> {
         int committed = 0;
         RuntimeException firstException = null;
 
-        for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
-            final T task = it.next();
+        for (final T task : running.values()) {
             try {
                 if (task.commitNeeded()) {
                     task.commit();
@@ -193,12 +204,7 @@ abstract class AssignedTasks<T extends Task> {
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to commit {} {} since it got migrated to 
another thread already. " +
-                        "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                        "Will trigger a new rebalance and close all tasks as 
zombies together.", taskTypeName, task.id());
                 throw e;
             } catch (final RuntimeException t) {
                 log.error("Failed to commit {} {} due to the following error:",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 782be15..df01e7c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -56,4 +56,8 @@ public interface ChangelogReader {
      */
     void clear();
 
+    /**
+     * @return whether the changelog reader has just been cleared or is 
uninitialized.
+     */
+    boolean isEmpty();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 81f76a3..4e5ae57 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -69,6 +69,8 @@ public class StoreChangelogReader implements ChangelogReader {
             stateRestorers.put(restorer.partition(), restorer);
 
             log.trace("Added restorer for changelog {}", restorer.partition());
+        } else {
+            log.debug("Skip re-adding restorer for changelog {}", 
restorer.partition());
         }
 
         needsInitializing.add(restorer.partition());
@@ -296,6 +298,16 @@ public class StoreChangelogReader implements 
ChangelogReader {
         completedRestorers.clear();
     }
 
+    @Override
+    public boolean isEmpty() {
+        return partitionInfo.isEmpty()
+            && stateRestorers.isEmpty()
+            && needsRestoring.isEmpty()
+            && restoreToOffsets.isEmpty()
+            && needsInitializing.isEmpty()
+            && completedRestorers.isEmpty();
+    }
+
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> 
records,
                              final StateRestorer restorer,
                              final Long endOffset) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 82496df..72cff77 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -269,8 +269,8 @@ public class TaskManager {
 
         if (exception != null) {
             throw exception;
-        } else if (!assignedActiveTasks.isEmpty()) {
-            throw new IllegalStateException("TaskManager had leftover tasks 
after removing all zombies");
+        } else if (!(active.isEmpty() && assignedActiveTasks.isEmpty() && 
changelogReader.isEmpty())) {
+            throw new IllegalStateException("TaskManager found leftover active 
task state after closing all zombies");
         }
 
         return zombieTasks;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index a8f96e4..68ca9bd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -43,6 +43,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import org.junit.function.ThrowingRunnable;
 
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -50,6 +51,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -170,7 +172,7 @@ public class AssignedStreamsTasksTest {
         t1.initializeMetadata();
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).times(2);
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(2);
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(3);
         t1.closeStateManager(true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -259,24 +261,17 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
+    public void 
shouldNotCloseTaskWithinResumeSuspendedIfTaskMigratedException() {
         mockRunningTaskSuspension();
         t1.resume();
         t1.initializeTopology();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
         assertThat(suspendTask(), nullValue());
 
-        try {
-            assignedTasks.maybeResumeSuspendedTask(taskId1, 
Collections.singleton(tp1));
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.maybeResumeSuspendedTask(taskId1, 
Collections.singleton(tp1)));
     }
 
     private void mockTaskInitialization() {
@@ -303,23 +298,16 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnCommitIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinCommitIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.commit();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.commit());
     }
 
     @Test
@@ -357,44 +345,30 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinMaybeCommitIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.commitRequested()).andReturn(true);
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.maybeCommitPerUserRequested();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.maybeCommitPerUserRequested());
     }
 
     @Test
-    public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinProcessIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.isProcessable(0L)).andReturn(true);
         t1.process();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.process(0L);
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.process(0L));
     }
 
     @Test
@@ -438,39 +412,33 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void 
shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
+    public void 
shouldNotCloseTaskWithinMaybePunctuateStreamTimeIfTaskMigratedException() {
         mockTaskInitialization();
         t1.maybePunctuateStreamTime();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.punctuate();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
 
-        assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.punctuate());
     }
 
     @Test
-    public void 
shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
+    public void 
shouldNotloseTaskWithinMaybePunctuateSystemTimeIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
         t1.maybePunctuateSystemTime();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
         try {
             assignedTasks.punctuate();
             fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
+        } catch (final TaskMigratedException expected) {
+            assertThat(assignedTasks.runningTaskIds(), 
equalTo(Collections.singleton(taskId1)));
+        }
         EasyMock.verify(t1);
     }
 
@@ -571,5 +539,16 @@ public class AssignedStreamsTasksTest {
         EasyMock.expectLastCall();
     }
 
+    private void verifyTaskMigratedExceptionDoesNotCloseTask(final 
ThrowingRunnable action) {
+        final Set<TaskId> expectedRunningTaskIds = 
Collections.singleton(taskId1);
+
+        // This action is expected to throw a TaskMigratedException
+        assertThrows(TaskMigratedException.class, action);
+
+        // This task should be closed as a zombie with all the other tasks 
during onPartitionsLost
+        assertThat(assignedTasks.runningTaskIds(), 
equalTo(expectedRunningTaskIds));
+
+        EasyMock.verify(t1);
+    }
 
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
index 9cc51ce..25c5a13 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
@@ -60,6 +60,11 @@ public class MockChangelogReader implements ChangelogReader {
         }
     }
 
+    @Override
+    public boolean isEmpty() {
+        return restoredOffsets.isEmpty() && registered.isEmpty();
+    }
+
     public boolean wasRegistered(final TopicPartition partition) {
         return registered.contains(partition);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b56eb8..2c9e948 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -842,7 +842,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing()
 throws Exception {
+    public void 
shouldNotCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing()
 throws Exception {
         internalTopologyBuilder.addSource(null, "source", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, 
null, "source");
 
@@ -898,16 +898,16 @@ public class StreamThreadTest {
         try {
             thread.runOnce();
             fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-        TestUtils.waitForCondition(
-            () -> thread.tasks().isEmpty(),
-            "StreamsThread did not remove fenced zombie task.");
+        } catch (final TaskMigratedException expected) {
+            assertTrue("StreamsThread removed the fenced zombie task already, 
should wait for rebalance to close all zombies together.",
+                        thread.tasks().containsKey(task1));
+        }
 
         assertThat(producer.commitCount(), equalTo(1L));
     }
 
     @Test
-    public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks()
 {
+    public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTasks()
 {
         final StreamThread thread = createStreamThread(CLIENT_ID, new 
StreamsConfig(configProps(true)), true);
 
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);

Reply via email to