This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 560ab2cc319 KAFKA-14133: Migrate ActiveTaskCreator mock in 
TaskManagerTest to Mockito (#13681)
560ab2cc319 is described below

commit 560ab2cc319899d7ce1181408507061e90752d3e
Author: Mehari Beyene <[email protected]>
AuthorDate: Wed May 31 23:47:31 2023 -0700

    KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito 
(#13681)
    
    This pull requests migrates the ActiveTaskCreator mock in TaskManagerTest 
from EasyMock to Mockito
    The change is restricted to a single mock to minimize the scope and make it 
easier for review.
    Please see two examples that follow the same pattern below:
    #13529
    #13621
    
    Reviewers: Divij Vaidya <[email protected]>, Manyanda Chitimbo 
<[email protected]>, Christo Lolov <[email protected]>, Bruno Cadonna 
<[email protected]>
---
 .../processor/internals/TaskManagerTest.java       | 613 ++++++++++-----------
 1 file changed, 276 insertions(+), 337 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 48d046e1b90..3f0023971a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -68,6 +68,7 @@ import org.junit.runner.RunWith;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
+import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.io.File;
@@ -128,9 +129,11 @@ import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
 @RunWith(EasyMockRunner.class)
 public class TaskManagerTest {
 
@@ -188,7 +191,7 @@ public class TaskManagerTest {
     private ChangelogReader changeLogReader;
     @Mock(type = MockType.STRICT)
     private Consumer<byte[], byte[]> consumer;
-    @Mock(type = MockType.STRICT)
+    @org.mockito.Mock
     private ActiveTaskCreator activeTaskCreator;
     @Mock(type = MockType.NICE)
     private StandbyTaskCreator standbyTaskCreator;
@@ -291,16 +294,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             Collections.emptyMap(),
             mkMap(mkEntry(activeTaskToRecycle.id(), 
activeTaskToRecycle.inputPartitions()))
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         
Mockito.verify(tasks).addPendingTaskToRecycle(activeTaskToRecycle.id(), 
activeTaskToRecycle.inputPartitions());
     }
 
@@ -312,16 +315,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToRecycle));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(standbyTaskToRecycle.id(), 
standbyTaskToRecycle.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
         
Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), 
standbyTaskToRecycle.inputPartitions());
     }
@@ -334,13 +337,13 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
         
Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
     }
@@ -353,13 +356,13 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(stateUpdater).remove(standbyTaskToClose.id());
         
Mockito.verify(tasks).addPendingTaskToCloseClean(standbyTaskToClose.id());
     }
@@ -373,16 +376,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(), 
newInputPartitions)),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         
Mockito.verify(stateUpdater).remove(activeTaskToUpdateInputPartitions.id());
         
Mockito.verify(tasks).addPendingTaskToUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(),
 newInputPartitions);
     }
@@ -395,16 +398,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(reassignedActiveTask.id(), 
reassignedActiveTask.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
     }
 
     @Test
@@ -415,16 +418,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         
when(stateUpdater.getTasks()).thenReturn(mkSet(reAssignedRevokedActiveTask));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(reAssignedRevokedActiveTask.id(), 
reAssignedRevokedActiveTask.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         
Mockito.verify(tasks).removePendingActiveTaskToSuspend(reAssignedRevokedActiveTask.id());
     }
 
@@ -437,16 +440,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             Collections.emptyMap(),
             mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), 
newInputPartitions))
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(stateUpdater, 
never()).remove(standbyTaskToUpdateInputPartitions.id());
         Mockito.verify(tasks, never())
             
.addPendingTaskToUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), 
newInputPartitions);
@@ -460,16 +463,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reAssignedStandbyTask));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             Collections.emptyMap(),
             mkMap(mkEntry(reAssignedStandbyTask.id(), 
reAssignedStandbyTask.inputPartitions()))
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
     }
 
     @Test
@@ -483,16 +486,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose, 
standbyTaskToRecycle));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(standbyTaskToRecycle.id(), 
standbyTaskToRecycle.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
         
Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
         Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
@@ -539,18 +542,15 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final Set<Task> createdTasks = mkSet(activeTaskToBeCreated);
-        expect(activeTaskCreator.createTasks(consumer, mkMap(
-            mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions())))
-        ).andReturn(createdTasks);
+        final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
+            mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions()));
+        when(activeTaskCreator.createTasks(consumer, 
tasksToBeCreated)).thenReturn(createdTasks);
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
-        taskManager.handleAssignment(
-            mkMap(mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions())),
-            Collections.emptyMap()
-        );
+        taskManager.handleAssignment(tasksToBeCreated, Collections.emptyMap());
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
         Mockito.verify(tasks).addPendingTaskToInit(createdTasks);
     }
 
@@ -562,18 +562,18 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final Set<Task> createdTasks = mkSet(standbyTaskToBeCreated);
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         expect(standbyTaskCreator.createTasks(mkMap(
             mkEntry(standbyTaskToBeCreated.id(), 
standbyTaskToBeCreated.inputPartitions())))
         ).andReturn(createdTasks);
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             Collections.emptyMap(),
             mkMap(mkEntry(standbyTaskToBeCreated.id(), 
standbyTaskToBeCreated.inputPartitions()))
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(tasks).addPendingTaskToInit(createdTasks);
     }
 
@@ -590,17 +590,17 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         
expect(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
activeTaskToRecycle.inputPartitions()))
             .andReturn(recycledStandbyTask);
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             Collections.emptyMap(),
             mkMap(mkEntry(activeTaskToRecycle.id(), 
activeTaskToRecycle.inputPartitions()))
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(activeTaskToRecycle).prepareCommit();
         Mockito.verify(tasks).replaceActiveWithStandby(recycledStandbyTask);
     }
@@ -613,7 +613,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -624,7 +624,8 @@ public class TaskManagerTest {
         );
 
         assertEquals(illegalStateException.getMessage(), "Standby tasks should 
only be managed by the state updater");
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -635,14 +636,14 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
         Mockito.verify(activeTaskToClose).prepareCommit();
         Mockito.verify(activeTaskToClose).closeClean();
         Mockito.verify(tasks).removeTask(activeTaskToClose);
@@ -656,7 +657,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -664,7 +665,8 @@ public class TaskManagerTest {
         );
 
         assertEquals(illegalStateException.getMessage(), "Standby tasks should 
only be managed by the state updater");
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -677,16 +679,16 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
         
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, 
newInputPartitions)).thenReturn(true);
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(), 
newInputPartitions)),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         
Mockito.verify(activeTaskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(newInputPartitions),
 any());
     }
 
@@ -698,16 +700,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
     }
 
     @Test
@@ -718,16 +720,16 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
-        expect(activeTaskCreator.createTasks(consumer, 
Collections.emptyMap())).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(activeTaskToResume).resume();
         Mockito.verify(stateUpdater).add(activeTaskToResume);
         Mockito.verify(tasks).removeTask(activeTaskToResume);
@@ -742,7 +744,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -753,7 +755,8 @@ public class TaskManagerTest {
         );
 
         assertEquals(illegalStateException.getMessage(), "Standby tasks should 
only be managed by the state updater");
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -767,20 +770,19 @@ public class TaskManagerTest {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(
-            consumer,
-            mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions()))
-        )).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(
+            consumer,
+            mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions()))
+        );
         Mockito.verify(activeTaskToClose).closeClean();
     }
 
@@ -845,17 +847,16 @@ public class TaskManagerTest {
         
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(task01, 
taskId01Partitions,
+            consumer)).thenReturn(task01Converted);
         expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), 
eq(taskId00Partitions)))
             .andStubReturn(task00Converted);
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
         Mockito.verify(task00).suspend();
         Mockito.verify(task01).suspend();
         Mockito.verify(task00Converted).initializeIfNeeded();
@@ -879,13 +880,10 @@ public class TaskManagerTest {
         
when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true);
         
when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true);
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        replay(activeTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
         Mockito.verify(task00).suspend();
         Mockito.verify(task00).closeClean();
         Mockito.verify(task01).suspend();
@@ -960,10 +958,8 @@ public class TaskManagerTest {
         when(stateUpdater.drainRemovedTasks())
             .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, 
taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        
expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
+        when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, 
taskId01Partitions, consumer))
+            .thenReturn(convertedTask1);
         
expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0), 
eq(taskId00Partitions)))
             .andStubReturn(convertedTask0);
         expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
@@ -980,11 +976,12 @@ public class TaskManagerTest {
         
when(tasks.removePendingTaskToUpdateInputPartitions(taskToUpdateInputPartitions.id())).thenReturn(taskId04Partitions);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         taskManager.setMainConsumer(consumer);
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { 
});
 
-        verify(activeTaskCreator, standbyTaskCreator, consumer);
+        verify(standbyTaskCreator, consumer);
+        Mockito.verify(activeTaskCreator, 
times(2)).closeAndRemoveTaskProducerIfNeeded(any());
         Mockito.verify(convertedTask0).initializeIfNeeded();
         Mockito.verify(convertedTask1).initializeIfNeeded();
         Mockito.verify(stateUpdater).add(convertedTask0);
@@ -1155,12 +1152,12 @@ public class TaskManagerTest {
         final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
         expect(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, 
statefulTask.inputPartitions()))
             .andStubReturn(standbyTask);
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
         Mockito.verify(statefulTask).suspend();
         Mockito.verify(standbyTask).initializeIfNeeded();
         Mockito.verify(stateUpdater).add(standbyTask);
@@ -1226,12 +1223,10 @@ public class TaskManagerTest {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpCloseCleanRestoredTask(statefulTask, tasks);
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        replay(activeTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
         Mockito.verify(statefulTask).suspend();
         Mockito.verify(statefulTask).closeClean();
         Mockito.verify(statefulTask, never()).closeDirty();
@@ -1246,15 +1241,13 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpCloseCleanRestoredTask(statefulTask, tasks);
         doThrow(RuntimeException.class).when(statefulTask).closeClean();
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        replay(activeTaskCreator);
 
         assertThrows(
             RuntimeException.class,
             () -> taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter)
         );
 
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
         Mockito.verify(statefulTask).closeDirty();
         Mockito.verify(tasks, never()).removeTask(statefulTask);
     }
@@ -1266,16 +1259,15 @@ public class TaskManagerTest {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpCloseCleanRestoredTask(statefulTask, tasks);
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        expectLastCall().andThrow(new RuntimeException("Something happened"));
-        replay(activeTaskCreator);
+        final TaskId taskId = statefulTask.id();
+        doThrow(new RuntimeException("Something happened"))
+            
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId);
 
         assertThrows(
             RuntimeException.class,
             () -> taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter)
         );
 
-        verify(activeTaskCreator);
         Mockito.verify(statefulTask, never()).closeDirty();
         Mockito.verify(tasks, never()).removeTask(statefulTask);
     }
@@ -1300,13 +1292,11 @@ public class TaskManagerTest {
         
when(tasks.removePendingTaskToCloseDirty(statefulTask.id())).thenReturn(true);
         
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(activeTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
         Mockito.verify(statefulTask).prepareCommit();
         Mockito.verify(statefulTask).suspend();
         Mockito.verify(statefulTask).closeDirty();
@@ -1539,15 +1529,15 @@ public class TaskManagerTest {
             mkEntry(taskId03, mkSet(t1p3)),
             mkEntry(taskId04, mkSet(t1p4))
         );
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(activeTasksAssignment))).andStubReturn(emptyList());
         
expect(standbyTaskCreator.createTasks(eq(standbyTasksAssignment))).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(activeTasksAssignment, 
standbyTasksAssignment);
 
         
Mockito.verify(topologyBuilder).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p1,
 t1p2, t2p2)), Mockito.anyString());
         Mockito.verify(topologyBuilder, 
never()).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p3, t1p4)), 
Mockito.anyString());
-        verify(activeTaskCreator, standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(any(), 
Mockito.eq(activeTasksAssignment));
+        verify(standbyTaskCreator);
     }
 
     @Test
@@ -1739,9 +1729,9 @@ public class TaskManagerTest {
 
         taskManager.handleRebalanceStart(singleton("topic"));
         final StateMachineTask uninitializedTask = new 
StateMachineTask(taskId00, taskId00Partitions, true);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
         assertThat(uninitializedTask.state(), is(State.CREATED));
@@ -1765,9 +1755,9 @@ public class TaskManagerTest {
         final StateMachineTask closedTask = new StateMachineTask(taskId00, 
taskId00Partitions, true);
 
         taskManager.handleRebalanceStart(singleton("topic"));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singleton(closedTask));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singleton(closedTask));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
         closedTask.suspend();
@@ -1827,9 +1817,7 @@ public class TaskManagerTest {
 
         // first `handleAssignment`
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(emptyMap()))).andStubReturn(emptyList());
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         expectLastCall();
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
 
@@ -1841,7 +1829,7 @@ public class TaskManagerTest {
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -1854,6 +1842,7 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -1867,13 +1856,11 @@ public class TaskManagerTest {
 
         // first `handleAssignment`
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(emptyMap()))).andStubReturn(emptyList());
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         expectLastCall();
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         taskManager.handleRevocation(taskId00Partitions);
@@ -1889,6 +1876,7 @@ public class TaskManagerTest {
             is("Encounter unexpected fatal error for task 0_0")
         );
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -1898,7 +1886,7 @@ public class TaskManagerTest {
 
         // `handleAssignment`
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
 
         makeTaskFolders(taskId00.toString(), taskId01.toString());
@@ -1912,17 +1900,14 @@ public class TaskManagerTest {
         taskManager.handleRebalanceStart(emptySet());
         assertThat(taskManager.lockedTaskDirectories(), 
Matchers.is(mkSet(taskId00, taskId01)));
 
-        // `handleLostAll`
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expectLastCall();
-
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertThat(task01.state(), is(Task.State.RUNNING));
 
+        // `handleLostAll`
         taskManager.handleLostAll();
         assertThat(task00.commitPrepared, is(true));
         assertThat(task00.state(), is(Task.State.CLOSED));
@@ -1936,20 +1921,16 @@ public class TaskManagerTest {
         taskManager.handleRebalanceStart(emptySet());
 
         assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
     public void 
shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
-        activeTaskCreator.reInitializeThreadProducer();
-        expectLastCall();
-
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
 
-        replay(activeTaskCreator);
-
         taskManager.handleLostAll();
 
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator).reInitializeThreadProducer();
     }
 
     @Test
@@ -1960,16 +1941,15 @@ public class TaskManagerTest {
 
         // `handleAssignment`
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
 
         // `handleAssignment`
         consumer.commitSync(offsets);
         expectLastCall();
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expectLastCall().andThrow(new RuntimeException("KABOOM!"));
+        doThrow(new 
RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2010,10 +1990,10 @@ public class TaskManagerTest {
 
         // `handleAssignment`
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
tp -> assertThat(tp, is(empty()))), is(true));
@@ -2048,10 +2028,10 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
tp -> assertThat(tp, is(empty()))), is(true));
@@ -2082,8 +2062,8 @@ public class TaskManagerTest {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+            .thenReturn(asList(corruptedTask, nonCorruptedTask));
         expect(standbyTaskCreator.createTasks(anyObject()))
             .andStubReturn(Collections.emptySet());
         expectRestoreToBeCompleted(consumer);
@@ -2091,8 +2071,7 @@ public class TaskManagerTest {
         // check that we should not commit empty map either
         consumer.commitSync(eq(emptyMap()));
         expectLastCall().andStubThrow(new AssertionError("should not invoke 
commitSync when offset map is empty"));
-
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
tp -> assertThat(tp, is(empty()))), is(true));
@@ -2125,12 +2104,12 @@ public class TaskManagerTest {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+            .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
         expect(standbyTaskCreator.createTasks(anyObject()))
             .andStubReturn(Collections.emptySet());
         expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2141,7 +2120,6 @@ public class TaskManagerTest {
         assertThat(nonRunningNonCorruptedTask.partitionsForOffsetReset, 
equalTo(Collections.emptySet()));
         assertThat(corruptedTask.partitionsForOffsetReset, 
equalTo(taskId00Partitions));
 
-        verify(activeTaskCreator);
         assertFalse(nonRunningNonCorruptedTask.commitPrepared);
         verify(consumer);
     }
@@ -2162,11 +2140,12 @@ public class TaskManagerTest {
 
         // handleAssignment
         
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId01Assignment)))
+            .thenReturn(singleton(runningNonCorruptedActive));
 
         expectRestoreToBeCompleted(consumer);
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2202,14 +2181,15 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+            .thenReturn(asList(corruptedActive, uncorruptedActive));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
         expectRestoreToBeCompleted(consumer);
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, 
stateDirectory, stateManager);
+        replay(standbyTaskCreator, consumer, stateDirectory, stateManager);
 
         uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
 
@@ -2254,7 +2234,8 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+            .thenReturn(asList(corruptedActive, uncorruptedActive));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
         expectRestoreToBeCompleted(consumer);
@@ -2264,7 +2245,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2301,7 +2282,7 @@ public class TaskManagerTest {
     public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
 {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
         final ProcessorStateManager stateManager = 
EasyMock.createMock(ProcessorStateManager.class);
 
         final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new 
AtomicBoolean(false);
@@ -2330,7 +2311,8 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActiveTask, 
uncorruptedActiveTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+            .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
         expectRestoreToBeCompleted(consumer);
@@ -2342,7 +2324,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager);
+        replay(standbyTaskCreator, consumer, stateManager);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2412,15 +2394,15 @@ public class TaskManagerTest {
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(asList(revokedActiveTask, 
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+            .thenReturn(asList(revokedActiveTask, 
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
         expectLastCall();
         consumer.commitSync(expectedCommittedOffsets);
         expectLastCall().andThrow(new TimeoutException());
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2439,7 +2421,7 @@ public class TaskManagerTest {
     public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
 {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
         final ProcessorStateManager stateManager = 
EasyMock.createMock(ProcessorStateManager.class);
 
         final StateMachineTask revokedActiveTask = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -2476,10 +2458,9 @@ public class TaskManagerTest {
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+            .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expectLastCall();
 
         final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
         expect(consumer.groupMetadata()).andReturn(groupMetadata);
@@ -2488,7 +2469,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager);
+        replay(standbyTaskCreator, consumer, stateManager);
 
         taskManager.handleAssignment(assignmentActive, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2515,11 +2496,10 @@ public class TaskManagerTest {
 
         expectRestoreToBeCompleted(consumer);
         
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(activeTaskCreator.createTasks(anyObject(), 
anyObject())).andStubReturn(Collections.emptySet());
         
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andStubReturn(Collections.emptySet());
         consumer.commitSync(Collections.emptyMap());
         expectLastCall();
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(emptyMap(), taskId00Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2539,11 +2519,10 @@ public class TaskManagerTest {
         expectRestoreToBeCompleted(consumer);
         // expect these calls twice (because we're going to 
tryToCompleteRestoration twice)
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andReturn(singletonList(task00));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andReturn(singletonList(task01)).anyTimes();
         
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2555,7 +2534,7 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertThat(task01.state(), is(Task.State.RUNNING));
 
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(any(), 
Mockito.eq(emptyMap()));
     }
 
     @Test
@@ -2565,10 +2544,9 @@ public class TaskManagerTest {
         expectRestoreToBeCompleted(consumer);
         // expect these calls twice (because we're going to 
tryToCompleteRestoration twice)
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andReturn(singletonList(task00));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2580,7 +2558,8 @@ public class TaskManagerTest {
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertEquals(newPartitionsSet, task00.inputPartitions());
-        verify(activeTaskCreator, consumer);
+        verify(consumer);
+        Mockito.verify(activeTaskCreator).createTasks(any(), 
Mockito.eq(emptyMap()));
     }
 
     @Test
@@ -2591,9 +2570,9 @@ public class TaskManagerTest {
         expect(consumer.assignment()).andReturn(emptySet());
         consumer.resume(eq(emptySet()));
         expectLastCall();
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(consumer, activeTaskCreator, standbyTaskCreator);
+        replay(consumer, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2604,7 +2583,6 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertThat(taskManager.activeTaskMap(), 
Matchers.equalTo(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(activeTaskCreator);
         Mockito.verify(changeLogReader).enforceRestoreActive();
     }
 
@@ -2632,9 +2610,9 @@ public class TaskManagerTest {
         expect(consumer.assignment()).andReturn(emptySet());
         consumer.resume(eq(emptySet()));
         expectLastCall();
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(task00, task01));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(asList(task00, task01));
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(consumer, activeTaskCreator, standbyTaskCreator);
+        replay(consumer, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2650,7 +2628,6 @@ public class TaskManagerTest {
             Matchers.equalTo(mkMap(mkEntry(taskId00, task00), 
mkEntry(taskId01, task01)))
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(activeTaskCreator);
         Mockito.verify(changeLogReader).enforceRestoreActive();
     }
 
@@ -2672,9 +2649,9 @@ public class TaskManagerTest {
         consumer.resume(eq(emptySet()));
         expectLastCall();
         expectLastCall();
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(consumer, activeTaskCreator, standbyTaskCreator);
+        replay(consumer, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2688,7 +2665,6 @@ public class TaskManagerTest {
             Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(activeTaskCreator);
         Mockito.verify(changeLogReader).enforceRestoreActive();
     }
 
@@ -2699,12 +2675,12 @@ public class TaskManagerTest {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2750,11 +2726,10 @@ public class TaskManagerTest {
         );
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+            .thenReturn(asList(task00, task01, task02));
 
-        expect(activeTaskCreator.threadProducer()).andReturn(producer);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
             .andReturn(singletonList(task10));
 
@@ -2772,7 +2747,7 @@ public class TaskManagerTest {
         task10.committedOffsets();
         EasyMock.expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2822,16 +2797,14 @@ public class TaskManagerTest {
         );
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+            .thenReturn(asList(task00, task01, task02));
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
             .andReturn(singletonList(task10));
         consumer.commitSync(expectedCommittedOffsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2864,12 +2837,11 @@ public class TaskManagerTest {
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(singleton(task00));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
         
expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
         
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2896,12 +2868,11 @@ public class TaskManagerTest {
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(singleton(task00));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
         
expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
         
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -2917,12 +2888,9 @@ public class TaskManagerTest {
     public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-        expectLastCall().once();
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(task00.state(), is(Task.State.CREATED));
@@ -2932,6 +2900,7 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(emptyMap(), emptyMap());
         assertThat(task00.state(), is(Task.State.CLOSED));
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
     }
 
     @Test
@@ -2945,9 +2914,9 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
@@ -3016,14 +2985,10 @@ public class TaskManagerTest {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(task00, task01, task02, task03));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(4);
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+            .thenReturn(asList(task00, task01, task02, task03));
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3068,8 +3033,9 @@ public class TaskManagerTest {
         assertThat(task03.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        Mockito.verify(activeTaskCreator, 
times(4)).closeAndRemoveTaskProducerIfNeeded(any());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
     }
 
     @Test
@@ -3087,13 +3053,11 @@ public class TaskManagerTest {
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(singletonList(task00));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-        expectLastCall().andThrow(new RuntimeException("whatever"));
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        doThrow(new RuntimeException("whatever"))
+            
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3120,8 +3084,9 @@ public class TaskManagerTest {
         assertThat(exception.getCause().getMessage(), is("whatever"));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
     }
 
     @Test
@@ -3137,13 +3102,10 @@ public class TaskManagerTest {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(singletonList(task00));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-        expectLastCall();
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall().andThrow(new RuntimeException("whatever"));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3171,7 +3133,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -3223,7 +3185,6 @@ public class TaskManagerTest {
         taskManager.addTask(task01);
         taskManager.addTask(task02);
 
-        replay(activeTaskCreator);
 
         final RuntimeException thrown = assertThrows(RuntimeException.class,
             () -> taskManager.handleRevocation(union(HashSet::new, 
taskId01Partitions, taskId02Partitions)));
@@ -3233,7 +3194,7 @@ public class TaskManagerTest {
         assertThat(task01.state(), is(Task.State.SUSPENDED));
         assertThat(task02.state(), is(Task.State.SUSPENDED));
 
-        verify(activeTaskCreator);
+        Mockito.verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -3265,13 +3226,11 @@ public class TaskManagerTest {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(task00, task01, task02));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().andThrow(new RuntimeException("whatever")).times(3);
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall().andThrow(new RuntimeException("whatever all"));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(asList(task00, task01, task02));
+        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.any());
+        doThrow(new RuntimeException("whatever 
all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3305,8 +3264,9 @@ public class TaskManagerTest {
         assertThat(task02.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        Mockito.verify(activeTaskCreator, 
times(3)).closeAndRemoveTaskProducerIfNeeded(any());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
     }
 
     @Test
@@ -3315,7 +3275,6 @@ public class TaskManagerTest {
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
false);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), 
anyObject())).andStubReturn(Collections.emptySet());
         
expect(standbyTaskCreator.createTasks(eq(assignment))).andStubReturn(singletonList(task00));
 
         // `tryToCompleteRestoration`
@@ -3326,10 +3285,8 @@ public class TaskManagerTest {
         // `shutdown`
         consumer.commitSync(Collections.emptyMap());
         expectLastCall();
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall();
 
-        replay(consumer, activeTaskCreator, standbyTaskCreator);
+        replay(consumer, standbyTaskCreator);
 
         taskManager.handleAssignment(emptyMap(), assignment);
         assertThat(task00.state(), is(Task.State.CREATED));
@@ -3344,7 +3301,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
     }
 
     @Test
@@ -3359,14 +3316,12 @@ public class TaskManagerTest {
                 new ExceptionAndTasks(mkSet(failedStatefulTask), new 
RuntimeException()),
                 new ExceptionAndTasks(mkSet(failedStandbyTask), new 
RuntimeException()))
             );
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
-        activeTaskCreator.closeThreadProducerIfNeeded();
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(activeTaskCreator);
 
         taskManager.shutdown(true);
 
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
+        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
         
Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
         Mockito.verify(failedStatefulTask).prepareCommit();
         Mockito.verify(failedStatefulTask).suspend();
@@ -3384,15 +3339,13 @@ public class TaskManagerTest {
         final Set<Task> restoredTasks = restoredActiveTasks.stream().map(t -> 
(Task) t).collect(Collectors.toSet());
         
when(stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(restoredActiveTasks);
         when(tasks.activeTasks()).thenReturn(restoredTasks);
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
-        activeTaskCreator.closeThreadProducerIfNeeded();
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(activeTaskCreator);
 
         taskManager.shutdown(true);
 
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
+        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
         
Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
         Mockito.verify(tasks).addActiveTasks(restoredTasks);
         Mockito.verify(statefulTask1).closeClean();
@@ -3409,14 +3362,12 @@ public class TaskManagerTest {
         
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(removedStandbyTask, 
removedStatefulTask));
         when(tasks.activeTasks()).thenReturn(mkSet(removedStatefulTask));
         when(tasks.allTasks()).thenReturn(mkSet(removedStatefulTask, 
removedStandbyTask));
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
-        activeTaskCreator.closeThreadProducerIfNeeded();
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(activeTaskCreator);
 
         taskManager.shutdown(true);
 
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
+        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
         
Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
         Mockito.verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
         Mockito.verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
@@ -3428,10 +3379,10 @@ public class TaskManagerTest {
     public void shouldInitializeNewActiveTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true);
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+            .thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3448,10 +3399,9 @@ public class TaskManagerTest {
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
anyObject())).andStubReturn(Collections.emptySet());
         
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(emptyMap(), taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3484,14 +3434,14 @@ public class TaskManagerTest {
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+            .thenReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment)))
             .andStubReturn(singletonList(task01));
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3528,14 +3478,14 @@ public class TaskManagerTest {
         );
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andStubReturn(Arrays.asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+            .thenReturn(Arrays.asList(task00, task01, task02));
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
             .andStubReturn(Arrays.asList(task03, task04, task05));
 
         consumer.commitSync(eq(emptyMap()));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3562,11 +3512,10 @@ public class TaskManagerTest {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, false);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
anyObject())).andStubReturn(Collections.emptySet());
         
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3587,12 +3536,12 @@ public class TaskManagerTest {
         makeTaskFolders(taskId00.toString(), task01.toString());
         expectLockObtainedFor(taskId00, taskId01);
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+            .thenReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment)))
             .andStubReturn(singletonList(task01));
 
-        replay(activeTaskCreator, standbyTaskCreator, stateDirectory, 
consumer);
+        replay(standbyTaskCreator, stateDirectory, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3636,9 +3585,8 @@ public class TaskManagerTest {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+            .thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
         final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = 
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@@ -3654,7 +3602,7 @@ public class TaskManagerTest {
     @Test
     public void shouldCommitViaProducerIfEosV2Enabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        expect(activeTaskCreator.threadProducer()).andReturn(producer);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
         final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = 
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@@ -3685,7 +3633,7 @@ public class TaskManagerTest {
 
         reset(consumer);
         expect(consumer.groupMetadata()).andStubReturn(new 
ConsumerGroupMetadata("appId"));
-        replay(activeTaskCreator, consumer, producer);
+        replay(consumer, producer);
 
         taskManager.commitAll();
 
@@ -3702,9 +3650,9 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3728,10 +3676,9 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
anyObject())).andStubReturn(Collections.emptySet());
         
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(emptyMap(), taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3763,10 +3710,10 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3799,9 +3746,9 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3831,7 +3778,7 @@ public class TaskManagerTest {
         futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
         
expect(adminClient.deleteRecords(anyObject())).andReturn(deleteRecordsResult).times(2);
 
-        replay(activeTaskCreator, adminClient, consumer);
+        replay(adminClient, consumer);
 
         taskManager.addTask(task00);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3876,14 +3823,14 @@ public class TaskManagerTest {
         );
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andStubReturn(asList(task00, task01, task02, task03));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+            .thenReturn(asList(task00, task01, task02, task03));
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
             .andStubReturn(singletonList(task04));
         consumer.commitSync(expectedCommittedOffsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3920,9 +3867,10 @@ public class TaskManagerTest {
         assignment.put(taskId01, taskId01Partitions);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(Arrays.asList(task00, task01));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+            .thenReturn(Arrays.asList(task00, task01));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -3955,7 +3903,7 @@ public class TaskManagerTest {
         // check that we should be processing at most max num records
         assertThat(taskManager.process(3, time), is(6));
 
-        // check that if there's no records proccssible, we would stop early
+        // check that if there's no records processable, we would stop early
         assertThat(taskManager.process(3, time), is(5));
         assertThat(taskManager.process(3, time), is(0));
     }
@@ -4033,9 +3981,9 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -4058,10 +4006,10 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+            .thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -4087,9 +4035,9 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -4109,9 +4057,9 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -4136,11 +4084,10 @@ public class TaskManagerTest {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject()))
             .andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
@@ -4160,9 +4107,9 @@ public class TaskManagerTest {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(false));
@@ -4179,12 +4126,12 @@ public class TaskManagerTest {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(TaskManager.class)) {
             LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
@@ -4311,8 +4258,7 @@ public class TaskManagerTest {
             new MockTime());
         final Map<MetricName, Metric> dummyProducerMetrics = 
singletonMap(testMetricName, testMetric);
 
-        
expect(activeTaskCreator.producerMetrics()).andReturn(dummyProducerMetrics);
-        replay(activeTaskCreator);
+        
when(activeTaskCreator.producerMetrics()).thenReturn(dummyProducerMetrics);
 
         assertThat(taskManager.producerMetrics(), is(dummyProducerMetrics));
     }
@@ -4339,10 +4285,10 @@ public class TaskManagerTest {
         allActiveTasks.addAll(restoringTasks);
 
         
expect(standbyTaskCreator.createTasks(eq(standbyAssignment))).andStubReturn(standbyTasks);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(allActiveTasksAssignment))).andStubReturn(allActiveTasks);
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
 
         expectRestoreToBeCompleted(consumer);
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(allActiveTasksAssignment, 
standbyAssignment);
         taskManager.tryToCompleteRestoration(time.milliseconds(), null);
@@ -4448,10 +4394,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer)
-            .andReturn(producer);
+        
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))).thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = 
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
@@ -4473,7 +4416,7 @@ public class TaskManagerTest {
         when(tasks.allTasks()).thenReturn(mkSet(task00, task01, task02));
         
         expect(consumer.groupMetadata()).andStubReturn(null);
-        replay(activeTaskCreator, consumer);
+        replay(consumer);
 
         task00.setCommitNeeded();
         task01.setCommitNeeded();
@@ -4493,9 +4436,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.threadProducer())
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = 
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
@@ -4511,7 +4452,7 @@ public class TaskManagerTest {
         final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true);
 
         expect(consumer.groupMetadata()).andStubReturn(null);
-        replay(activeTaskCreator, consumer);
+        replay(consumer);
 
         task00.setCommitNeeded();
         task01.setCommitNeeded();
@@ -4582,9 +4523,9 @@ public class TaskManagerTest {
 
         final Map<TaskId, Set<TopicPartition>> assignment = new 
HashMap<>(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andReturn(asList(task00, task01));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(asList(task00, task01));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, Collections.emptyMap());
 
@@ -4608,21 +4549,20 @@ public class TaskManagerTest {
         final StandbyTask standbyTask = EasyMock.mock(StandbyTask.class);
         expect(standbyTask.id()).andStubReturn(taskId00);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andReturn(singletonList(activeTask));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         activeTask.prepareRecycle();
         expectLastCall().once();
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expectLastCall().anyTimes();
         expect(standbyTaskCreator.createStandbyTaskFromActive(anyObject(), 
eq(taskId00Partitions))).andReturn(standbyTask);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
 
-        replay(activeTask, standbyTask, activeTaskCreator, standbyTaskCreator, 
consumer);
+        replay(activeTask, standbyTask, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
         taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        Mockito.verify(activeTaskCreator).createTasks(any(), 
Mockito.eq(emptyMap()));
     }
 
     @Test
@@ -4635,19 +4575,18 @@ public class TaskManagerTest {
         final StreamTask activeTask = mock(StreamTask.class);
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
-
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
         
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask));
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), 
eq(taskId00Partitions), anyObject())).andReturn(activeTask);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+        
when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask), 
Mockito.eq(taskId00Partitions), any()))
+            .thenReturn(activeTask);
         
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
 
-        replay(standbyTaskCreator, activeTaskCreator, consumer);
+        replay(standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
         taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
 
-        verify(standbyTaskCreator, activeTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator, times(2)).createTasks(any(), 
Mockito.eq(emptyMap()));
     }
 
     @Test

Reply via email to