This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ea88a8ec3f0 KAFKA-19683: Clean up TaskManagerTest (#20994)
ea88a8ec3f0 is described below

commit ea88a8ec3f06c89b2c4d72221b6530eae5ab2f79
Author: Shashank <[email protected]>
AuthorDate: Mon Dec 1 04:52:47 2025 -0800

    KAFKA-19683: Clean up TaskManagerTest (#20994)
    
    Final cleanup of `TaskManagerTest.java`
    - Renamed `setUpTaskManagerWithStateUpdater` to `setUpTaskManager`
    - Removed `WithStateUpdaterEnabled` from test names
    - Removed the method `setUpTaskManagerWithoutStateUpdater`
    - Rewrote the newly added test
    `shouldCloseTasksIfStateUpdaterTimesOutOnRemove` to use new mocking
    patterns
    - Completely removed the utility class `StateMachineTask` and also
    `handleAssignment` method
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../processor/internals/TaskManagerTest.java       | 645 ++++++---------------
 1 file changed, 162 insertions(+), 483 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7cfa428d337..63638d12c9b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -38,14 +37,12 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 import 
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
@@ -76,13 +73,10 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -123,7 +117,6 @@ import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
@@ -212,16 +205,16 @@ public class TaskManagerTest {
 
     @BeforeEach
     public void setUp() {
-        taskManager = 
setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 null, false);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
     }
 
-    private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode 
processingMode, final TasksRegistry tasks) {
-        return setUpTaskManagerWithStateUpdater(processingMode, tasks, false);
+    private TaskManager setUpTaskManager(final ProcessingMode processingMode, 
final TasksRegistry tasks) {
+        return setUpTaskManager(processingMode, tasks, false);
     }
 
-    private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode 
processingMode,
-                                                         final TasksRegistry 
tasks,
-                                                         final boolean 
processingThreadsEnabled) {
+    private TaskManager setUpTaskManager(final ProcessingMode processingMode,
+                                         final TasksRegistry tasks,
+                                         final boolean 
processingThreadsEnabled) {
         topologyMetadata = new TopologyMetadata(topologyBuilder, new 
DummyStreamsConfig(processingMode));
         final TaskManager taskManager = new TaskManager(
             time,
@@ -241,35 +234,13 @@ public class TaskManagerTest {
         return taskManager;
     }
 
-    private TaskManager setUpTaskManagerWithoutStateUpdater(final 
ProcessingMode processingMode,
-                                                            final 
TasksRegistry tasks,
-                                                            final boolean 
processingThreadsEnabled) {
-        topologyMetadata = new TopologyMetadata(topologyBuilder, new 
DummyStreamsConfig(processingMode));
-        final TaskManager taskManager = new TaskManager(
-            time,
-            changeLogReader,
-            ProcessId.randomProcessId(),
-            "taskManagerTest",
-            activeTaskCreator,
-            standbyTaskCreator,
-            tasks != null ? tasks : new Tasks(new LogContext()),
-            topologyMetadata,
-            adminClient,
-            stateDirectory,
-            null,
-            processingThreadsEnabled ? schedulingTaskManager : null
-        );
-        taskManager.setMainConsumer(consumer);
-        return taskManager;
-    }
-
     @Test
     public void shouldLockAllTasksOnCorruptionWithProcessingThreads() {
         final StreamTask activeTask1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
         when(tasks.task(taskId00)).thenReturn(activeTask1);
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -291,7 +262,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -304,7 +275,7 @@ public class TaskManagerTest {
     @Test
     public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -327,7 +298,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -347,7 +318,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -367,7 +338,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
 
         taskManager.resumePollingForPartitionsWithAvailableSpace();
@@ -385,7 +356,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
 
         taskManager.updateLags();
@@ -400,7 +371,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -420,7 +391,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -441,7 +412,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -461,7 +432,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -482,7 +453,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
@@ -509,7 +480,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         
when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
@@ -537,7 +508,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId03Partitions))
             .thenReturn(recycledStandbyTask);
@@ -561,7 +532,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
activeTaskToRecycle.inputPartitions()))
             .thenThrow(new RuntimeException());
@@ -591,7 +562,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
         
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, 
taskId03Partitions, consumer))
             .thenReturn(recycledActiveTask);
@@ -615,7 +586,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
         when(activeTaskCreator.createActiveTaskFromStandby(
             standbyTaskToRecycle,
@@ -645,7 +616,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask));
 
         taskManager.handleAssignment(
@@ -664,7 +635,7 @@ public class TaskManagerTest {
             .inState(State.SUSPENDED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
 
         taskManager.handleAssignment(
@@ -684,7 +655,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle));
         final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
         when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
@@ -714,7 +685,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle));
         final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
         when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
@@ -744,7 +715,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign));
         final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
         when(stateUpdater.remove(failedActiveTaskToReassign.id()))
@@ -777,7 +748,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
         when(stateUpdater.remove(reassignedActiveTask2.id()))
@@ -803,7 +774,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
 
         taskManager.handleAssignment(
@@ -821,7 +792,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask));
 
         taskManager.handleAssignment(
@@ -845,7 +816,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose, 
standbyTaskToRecycle));
         final CompletableFuture<StateUpdater.RemovedTaskResult> 
futureForActiveTaskToClose = new CompletableFuture<>();
         
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
@@ -880,7 +851,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
runningActiveTask)));
@@ -904,7 +875,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
activeTask)));
         assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, 
activeTask)));
@@ -916,7 +887,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         final Set<Task> createdTasks = Set.of(activeTaskToBeCreated);
         final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
             mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions()));
@@ -934,7 +905,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         final Set<Task> createdTasks = Set.of(standbyTaskToBeCreated);
         when(standbyTaskCreator.createTasks(mkMap(
             mkEntry(standbyTaskToBeCreated.id(), 
standbyTaskToBeCreated.inputPartitions())))
@@ -950,7 +921,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled()
 {
+    public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInit() {
         final StreamTask activeTaskToRecycle = statefulTask(taskId01, 
taskId01ChangelogPartitions)
             .withInputPartitions(taskId01Partitions)
             .inState(State.RUNNING).build();
@@ -961,7 +932,7 @@ public class TaskManagerTest {
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
             .thenReturn(standbyTask);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
 
@@ -973,13 +944,13 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled()
 {
+    public void 
shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistry() {
         final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, 
taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -995,12 +966,12 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdaterEnabled() 
{
+    public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanly() {
         final StreamTask activeTaskToClose = statefulTask(taskId03, 
taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
@@ -1013,12 +984,12 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistryWithStateUpdaterEnabled()
 {
+    public void 
shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistry() {
         final StandbyTask standbyTaskToClose = standbyTask(taskId03, 
taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
 
         final IllegalStateException illegalStateException = assertThrows(
@@ -1032,13 +1003,13 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStateUpdaterEnabled()
 {
+    public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitions() 
{
         final StreamTask activeTaskToUpdateInputPartitions = 
statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
         
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, 
newInputPartitions)).thenReturn(true);
 
@@ -1053,12 +1024,12 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldResumeActiveRunningTaskInTasksRegistryWithStateUpdaterEnabled() {
+    public void shouldResumeActiveRunningTaskInTasksRegistry() {
         final StreamTask activeTaskToResume = statefulTask(taskId03, 
taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
@@ -1076,7 +1047,7 @@ public class TaskManagerTest {
             .inState(State.SUSPENDED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
@@ -1092,13 +1063,13 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistryWithStateUpdaterEnabled()
 {
+    public void 
shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistry()
 {
         final StandbyTask standbyTaskToUpdateInputPartitions = 
standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
 
         final IllegalStateException illegalStateException = assertThrows(
@@ -1115,7 +1086,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
+    public void shouldAssignMultipleTasksInTasksRegistry() {
         final StreamTask activeTaskToClose = statefulTask(taskId03, 
taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
@@ -1123,7 +1094,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(
@@ -1149,7 +1120,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, 
task01));
-        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
@@ -1171,7 +1142,7 @@ public class TaskManagerTest {
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, 
task01));
         final LockException lockException = new LockException("Where are my 
keys??");
         doThrow(lockException).when(task00).initializeIfNeeded();
-        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
@@ -1198,7 +1169,7 @@ public class TaskManagerTest {
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, 
task01));
         final TimeoutException timeoutException = new TimeoutException("Timed 
out!");
         doThrow(timeoutException).when(task00).initializeIfNeeded();
-        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
@@ -1225,7 +1196,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, 
task01));
         doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
-        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
@@ -1263,7 +1234,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() {
+    public void shouldRethrowRuntimeExceptionInInitTask() {
         final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
             .withInputPartitions(taskId00Partitions)
             .inState(State.CREATED).build();
@@ -1271,7 +1242,7 @@ public class TaskManagerTest {
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
         final RuntimeException runtimeException = new 
RuntimeException("KABOOM!");
         doThrow(runtimeException).when(task00).initializeIfNeeded();
-        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
         final StreamsException streamsException = assertThrows(
             StreamsException.class,
@@ -1297,7 +1268,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0, 
statefulTask1, statefulTask2));
         doThrow(new 
TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
         doThrow(new 
TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
@@ -1318,7 +1289,7 @@ public class TaskManagerTest {
     public void 
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertFalse(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
     }
@@ -1327,7 +1298,7 @@ public class TaskManagerTest {
     public void 
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit()
 {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.hasPendingTasksToInit()).thenReturn(true);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertFalse(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
     }
@@ -1335,7 +1306,7 @@ public class TaskManagerTest {
     @Test
     public void 
shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit()
 {
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertTrue(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
     }
@@ -1555,7 +1526,7 @@ public class TaskManagerTest {
 
     private TaskManager setupForRevocationAndLost(final Set<Task> 
tasksInStateUpdater,
                                                   final TasksRegistry tasks) {
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater);
 
         return taskManager;
@@ -1624,12 +1595,12 @@ public class TaskManagerTest {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks);
 
-        return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, 
tasks);
+        return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
     }
 
     @Test
-    public void 
shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
+    public void shouldReturnCorrectBooleanWhenTryingToCompleteRestoration() {
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null, false);
         when(stateUpdater.restoresActiveTasks()).thenReturn(false);
         assertTrue(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@@ -1647,7 +1618,7 @@ public class TaskManagerTest {
         
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final StreamsException thrown = assertThrows(
             StreamsException.class,
@@ -1674,7 +1645,7 @@ public class TaskManagerTest {
         
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0,
 exceptionAndTasks1));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final TaskCorruptedException thrown = assertThrows(
             TaskCorruptedException.class,
@@ -1746,7 +1717,7 @@ public class TaskManagerTest {
     public void shouldPauseAllTopicsOnRebalanceComplete() {
         final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
         when(consumer.assignment()).thenReturn(assigned);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null);
         taskManager.handleRebalanceComplete();
 
         verify(consumer).pause(assigned);
@@ -1758,7 +1729,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
         final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
         when(consumer.assignment()).thenReturn(assigned);
@@ -1780,7 +1751,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, 
restoringStatefulTask));
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
@@ -1814,7 +1785,7 @@ public class TaskManagerTest {
         );
         
when(runningStatefulTask.changelogOffsets()).thenReturn(changelogOffsets);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
 
         assertThat(
@@ -1837,7 +1808,7 @@ public class TaskManagerTest {
         when(restoringStatefulTask.changelogOffsets())
             .thenReturn(changelogOffsets);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
@@ -1857,7 +1828,7 @@ public class TaskManagerTest {
         final Map<TopicPartition, Long> changelogOffsetInCheckpoint = 
mkMap(mkEntry(t1p0changelog, 24L));
         writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
         taskManager.handleRebalanceStart(singleton("topic"));
 
@@ -1875,7 +1846,7 @@ public class TaskManagerTest {
         final Map<TopicPartition, Long> changelogOffsetInCheckpoint = 
mkMap(mkEntry(t1p0changelog, 24L));
         writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
         taskManager.handleRebalanceStart(singleton("topic"));
 
@@ -1900,7 +1871,7 @@ public class TaskManagerTest {
         when(restoringStandbyTask.changelogOffsets())
             .thenReturn(mkMap(mkEntry(t1p2changelog, 
changelogOffsetOfRestoringStandbyTask)));
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, 
restoringStatefulTask));
 
@@ -1925,7 +1896,7 @@ public class TaskManagerTest {
                 mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
             ));
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
@@ -1952,7 +1923,7 @@ public class TaskManagerTest {
         when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask));
 
@@ -2000,7 +1971,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task)));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
@@ -2060,7 +2031,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleAssignment(emptyMap(), emptyMap());
 
@@ -2081,7 +2052,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final RuntimeException thrown = assertThrows(
             RuntimeException.class,
@@ -2124,7 +2095,7 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00, taskId01);
         expectDirectoryNotEmpty(taskId00, taskId01);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRebalanceStart(emptySet());
         assertThat(taskManager.lockedTaskDirectories(), is(Set.of(taskId00, 
taskId01)));
@@ -2155,7 +2126,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, null, false);
 
         taskManager.handleLostAll();
 
@@ -2171,7 +2142,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
         when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
 
@@ -2208,7 +2179,7 @@ public class TaskManagerTest {
 
         when(consumer.assignment()).thenReturn(taskId00Partitions);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleCorruption(singleton(taskId00));
 
@@ -2239,7 +2210,7 @@ public class TaskManagerTest {
         
when(task00.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
         doThrow(new RuntimeException("oops")).when(task00).suspend();
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleCorruption(singleton(taskId00));
 
@@ -2280,7 +2251,7 @@ public class TaskManagerTest {
 
         when(consumer.assignment()).thenReturn(taskId00Partitions);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleCorruption(Set.of(taskId00));
 
@@ -2308,7 +2279,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
         when(tasks.task(taskId02)).thenReturn(corruptedTask);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(consumer.assignment()).thenReturn(intersection(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
         taskManager.handleCorruption(Set.of(taskId02));
@@ -2347,7 +2318,7 @@ public class TaskManagerTest {
         doNothing().when(corruptedStandby).suspend();
         doNothing().when(corruptedStandby).postCommit(anyBoolean());
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThrows(TaskMigratedException.class, () -> 
taskManager.handleCorruption(singleton(taskId00)));
 
@@ -2392,7 +2363,7 @@ public class TaskManagerTest {
 
         when(consumer.assignment()).thenReturn(taskId00Partitions);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRebalanceStart(singleton(topic1));
         assertThat(taskManager.rebalanceInProgress(), is(true));
@@ -2459,7 +2430,7 @@ public class TaskManagerTest {
 
         doThrow(new 
TimeoutException()).when(producer).commitTransaction(offsets, groupMetadata);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
 
         taskManager.handleCorruption(singleton(taskId00));
 
@@ -2532,7 +2503,7 @@ public class TaskManagerTest {
         doThrow(new TimeoutException()).when(consumer).commitSync(offsets);
         when(consumer.assignment()).thenReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleCorruption(singleton(taskId00));
 
@@ -2614,7 +2585,7 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
         doThrow(new 
TimeoutException()).when(consumer).commitSync(expectedCommittedOffsets);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
@@ -2707,7 +2678,7 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
         doThrow(new 
TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, 
groupMetadata);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
@@ -2751,7 +2722,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
 
-        taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.tasks()).thenReturn(Set.of(task00));
 
@@ -2788,7 +2759,7 @@ public class TaskManagerTest {
         when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
         when(tasks.hasPendingTasksToInit()).thenReturn(false);
 
-        taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.tasks()).thenReturn(Set.of(task01));
         when(stateUpdater.restoresActiveTasks()).thenReturn(false);
@@ -2824,7 +2795,7 @@ public class TaskManagerTest {
         when(tasks.hasPendingTasksToInit()).thenReturn(false);
         when(tasks.updateActiveTaskInputPartitions(task00, 
newPartitionsSet)).thenReturn(true);
 
-        taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.tasks()).thenReturn(emptySet());
         when(stateUpdater.restoresActiveTasks()).thenReturn(false);
@@ -2850,7 +2821,7 @@ public class TaskManagerTest {
 
         final Map<TaskId, Set<TopicPartition>> assignment = taskId00Assignment;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         // first, we need to handle assignment -- creates tasks and adds to 
pending initialization
         when(activeTaskCreator.createTasks(any(), 
eq(assignment))).thenReturn(singletonList(task00));
@@ -2888,7 +2859,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
             mkEntry(taskId00, taskId00Partitions),
             mkEntry(taskId01, taskId01Partitions)
@@ -2931,7 +2902,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00));
@@ -2962,7 +2933,7 @@ public class TaskManagerTest {
         when(task00.commitNeeded()).thenReturn(true);
         when(task00.prepareCommit(true)).thenReturn(offsets);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
@@ -3028,7 +2999,7 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(offsets00);
         expectedCommittedOffsets.putAll(offsets01);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
@@ -3093,7 +3064,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
@@ -3148,7 +3119,7 @@ public class TaskManagerTest {
         when(task01.commitNeeded()).thenReturn(true); // only task01 needs 
commit
         when(task02.commitNeeded()).thenReturn(false);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(processingMode, tasks);
+        final TaskManager taskManager = setUpTaskManager(processingMode, 
tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
@@ -3171,7 +3142,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
         when(stateUpdater.tasks()).thenReturn(Set.of(task01));
@@ -3205,7 +3176,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
         when(stateUpdater.tasks()).thenReturn(Set.of(task01));
@@ -3238,7 +3209,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(activeTaskCreator.createTasks(consumer, taskId00Assignment))
             .thenReturn(singletonList(task00));
@@ -3277,7 +3248,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThrows(RuntimeException.class, () -> 
taskManager.handleRevocation(taskId00Partitions));
 
@@ -3308,7 +3279,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(processingMode, tasks);
+        final TaskManager taskManager = setUpTaskManager(processingMode, 
tasks);
 
         doThrow(new TaskMigratedException("migrated", new 
RuntimeException("cause")))
             .when(task01).suspend();
@@ -3348,7 +3319,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
 
@@ -3373,14 +3344,14 @@ public class TaskManagerTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseTasksIfStateUpdaterTimesOutOnRemove() throws 
Exception {
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
-        final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
-                mkEntry(taskId00, taskId00Partitions)
-        );
-        final Task task00 = spy(new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager));
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions)
+            .build();
 
-        when(activeTaskCreator.createTasks(any(), 
eq(assignment))).thenReturn(singletonList(task00));
-        taskManager.handleAssignment(assignment, emptyMap());
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
         when(stateUpdater.tasks()).thenReturn(singleton(task00));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = 
mock(CompletableFuture.class);
@@ -3406,7 +3377,7 @@ public class TaskManagerTest {
         doThrow(new RuntimeException("task 0_1 suspend 
boom!")).when(task01).suspend();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
 
         when(stateUpdater.tasks()).thenReturn(Set.of(task00, task01));
 
@@ -3450,7 +3421,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
 
@@ -3481,7 +3452,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         doThrow(new TaskMigratedException("migrated", new 
RuntimeException("cause")))
             .when(task01).suspend();
@@ -3527,7 +3498,7 @@ public class TaskManagerTest {
         final CompletableFuture<StateUpdater.RemovedTaskResult> 
futureForStandbyTask = new CompletableFuture<>();
         when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         futureForStandbyTask.complete(new 
StateUpdater.RemovedTaskResult(standbyTask00)); // simulate successful removal
 
@@ -3560,7 +3531,7 @@ public class TaskManagerTest {
                 new ExceptionAndTask(new RuntimeException(), 
failedStandbyTask))
             )
             .thenReturn(Collections.emptyList());
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.shutdown(true);
 
@@ -3574,7 +3545,7 @@ public class TaskManagerTest {
     @Test
     public void shouldShutdownSchedulingTaskManager() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
         taskManager.shutdown(true);
 
@@ -3624,7 +3595,7 @@ public class TaskManagerTest {
                     new ExceptionAndTask(new StreamsException("KABOOM!"), 
removedFailedStatefulTaskDuringRemoval),
                     new ExceptionAndTask(new StreamsException("KABOOM!"), 
removedFailedStandbyTaskDuringRemoval))
                 ).thenReturn(Collections.emptyList());
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         futureForRemovedStatefulTask.complete(new 
StateUpdater.RemovedTaskResult(removedStatefulTask));
         futureForRemovedStandbyTask.complete(new 
StateUpdater.RemovedTaskResult(removedStandbyTask));
         futureForRemovedFailedStatefulTask
@@ -3664,7 +3635,7 @@ public class TaskManagerTest {
 
         final Map<TaskId, Set<TopicPartition>> assignment = taskId01Assignment;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task01));
 
@@ -3714,7 +3685,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThat(taskManager.commitAll(), equalTo(2));
 
@@ -3774,7 +3745,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThat(taskManager.commit(Set.of(task00, task02, task03, task05)), 
equalTo(2));
 
@@ -3810,7 +3781,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThat(taskManager.commitAll(), equalTo(1));
 
@@ -3838,7 +3809,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRebalanceStart(emptySet());
 
@@ -3867,7 +3838,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThat(taskManager.commitAll(), equalTo(1));
 
@@ -3912,7 +3883,7 @@ public class TaskManagerTest {
         final ConsumerGroupMetadata groupMetadata = 
mock(ConsumerGroupMetadata.class);
         when(consumer.groupMetadata()).thenReturn(groupMetadata);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
 
         taskManager.commitAll();
 
@@ -3939,7 +3910,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final RuntimeException thrown =
             assertThrows(RuntimeException.class, taskManager::commitAll);
@@ -3962,7 +3933,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final RuntimeException thrown =
             assertThrows(RuntimeException.class, () -> 
taskManager.commitAll());
@@ -3994,7 +3965,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.maybePurgeCommittedRecords(); // no-op
         taskManager.maybePurgeCommittedRecords(); // sends purge for offset 5L
@@ -4024,7 +3995,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.maybePurgeCommittedRecords();
         taskManager.maybePurgeCommittedRecords();
@@ -4052,7 +4023,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.maybePurgeCommittedRecords();
         taskManager.maybePurgeCommittedRecords();
@@ -4109,7 +4080,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         // maybeCommitActiveTasksPerUserRequested checks if any task has both 
commitRequested AND commitNeeded
         // If found, commits all active running tasks that have commitNeeded
@@ -4169,7 +4140,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.activeTasks()).thenReturn(Set.of(task00, task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         // check that we should be processing at most max num records
         assertThat(taskManager.process(3, time), is(6));
@@ -4214,7 +4185,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         // should only process 2 records, because task01 throws 
TimeoutException
         assertThat(taskManager.process(1, time), is(2));
@@ -4241,7 +4212,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.activeTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThrows(TaskMigratedException.class, () -> taskManager.process(1, 
time));
     }
@@ -4258,7 +4229,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.activeTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final StreamsException exception = 
assertThrows(StreamsException.class, () -> taskManager.process(1, time));
         assertThat(exception.taskId().isPresent(), is(true));
@@ -4279,7 +4250,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.activeTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThrows(TaskMigratedException.class, taskManager::punctuate);
     }
@@ -4296,7 +4267,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.activeTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThrows(KafkaException.class, taskManager::punctuate);
     }
@@ -4314,7 +4285,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.activeTasks()).thenReturn(Set.of(task00));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         // one for stream and one for system time
         assertThat(taskManager.punctuate(), equalTo(2));
@@ -4326,7 +4297,7 @@ public class TaskManagerTest {
     @Test
     public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         // mock that the state updater is still restoring active tasks
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@@ -4347,7 +4318,7 @@ public class TaskManagerTest {
         when(task00.prepareCommit(false)).thenReturn(offsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allTasks()).thenReturn(Set.of(task00));
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(TaskManager.class)) {
@@ -4385,7 +4356,7 @@ public class TaskManagerTest {
             .when(migratedTask02).suspend();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, 
migratedTask02));
 
@@ -4431,7 +4402,7 @@ public class TaskManagerTest {
             .when(migratedTask02).suspend();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, 
migratedTask02));
 
@@ -4476,7 +4447,7 @@ public class TaskManagerTest {
             .when(migratedTask02).suspend();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, 
migratedTask02));
 
@@ -4522,53 +4493,6 @@ public class TaskManagerTest {
         assertThat(taskManager.producerMetrics(), is(dummyProducerMetrics));
     }
 
-    private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId, 
Set<TopicPartition>> runningActiveAssignment,
-                                                           final Map<TaskId, 
Set<TopicPartition>> standbyAssignment,
-                                                           final Map<TaskId, 
Set<TopicPartition>> restoringActiveAssignment) {
-        final Set<Task> runningTasks = 
runningActiveAssignment.entrySet().stream()
-                                           .map(t -> new 
StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
-                                           .collect(Collectors.toSet());
-        final Set<Task> standbyTasks = standbyAssignment.entrySet().stream()
-                                           .map(t -> new 
StateMachineTask(t.getKey(), t.getValue(), false, stateManager))
-                                           .collect(Collectors.toSet());
-        final Set<Task> restoringTasks = 
restoringActiveAssignment.entrySet().stream()
-                                           .map(t -> new 
StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
-                                           .collect(Collectors.toSet());
-        // give the restoring tasks some uncompleted changelog partitions so 
they'll stay in restoring
-        restoringTasks.forEach(t -> ((StateMachineTask) 
t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L)));
-
-        // Initially assign only the active tasks we want to complete 
restoration
-        final Map<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new 
HashMap<>(runningActiveAssignment);
-        allActiveTasksAssignment.putAll(restoringActiveAssignment);
-        final Set<Task> allActiveTasks = new HashSet<>(runningTasks);
-        allActiveTasks.addAll(restoringTasks);
-
-        
when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
-        when(activeTaskCreator.createTasks(any(), 
eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
-
-        lenient().when(consumer.assignment()).thenReturn(assignment);
-
-        taskManager.handleAssignment(allActiveTasksAssignment, 
standbyAssignment);
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
-        final Map<TaskId, StateMachineTask> allTasks = new HashMap<>();
-
-        // Just make sure all tasks ended up in the expected state
-        for (final Task task : runningTasks) {
-            assertThat(task.state(), is(Task.State.RUNNING));
-            allTasks.put(task.id(), (StateMachineTask) task);
-        }
-        for (final Task task : restoringTasks) {
-            assertThat(task.state(), is(Task.State.RESTORING));
-            allTasks.put(task.id(), (StateMachineTask) task);
-        }
-        for (final Task task : standbyTasks) {
-            assertThat(task.state(), is(Task.State.RUNNING));
-            allTasks.put(task.id(), (StateMachineTask) task);
-        }
-        return allTasks;
-    }
-
     private void expectLockObtainedFor(final TaskId... tasks) {
         for (final TaskId task : tasks) {
             when(stateDirectory.lock(task)).thenReturn(true);
@@ -4602,7 +4526,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         doThrow(new 
CommitFailedException()).when(consumer).commitSync(offsets);
 
@@ -4640,7 +4564,7 @@ public class TaskManagerTest {
         when(task01.commitNeeded()).thenReturn(false);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         doThrow(new 
TimeoutException("KABOOM!")).doNothing().when(consumer).commitSync(any(Map.class));
 
@@ -4688,7 +4612,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
 
         final TaskCorruptedException exception = assertThrows(
             TaskCorruptedException.class,
@@ -4717,7 +4641,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         doThrow(new KafkaException()).when(consumer).commitSync(offsets);
 
@@ -4748,7 +4672,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         doThrow(new 
RuntimeException("KABOOM")).when(consumer).commitSync(offsets);
 
@@ -4780,7 +4704,7 @@ public class TaskManagerTest {
 
         when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final RuntimeException thrown = assertThrows(
             RuntimeException.class,
@@ -4804,7 +4728,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(activeTaskCreator.createTasks(consumer, 
taskId00Assignment)).thenReturn(singletonList(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId00Partitions))
@@ -4836,7 +4760,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTaskToRecycle));
         
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, 
taskId00Partitions, consumer))
@@ -4874,7 +4798,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
 
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(stateUpdater.tasks()).thenReturn(Collections.emptySet());
 
@@ -4890,7 +4814,7 @@ public class TaskManagerTest {
     @Test
     public void shouldRecycleStartupTasksFromStateDirectoryAsActive() {
         final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
         final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
 
         final StreamTask activeTask = statefulTask(taskId00, 
taskId00ChangelogPartitions).build();
@@ -4928,7 +4852,7 @@ public class TaskManagerTest {
     @Test
     public void shouldUseStartupTasksFromStateDirectoryAsStandby() {
         final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
         final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
 
         when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
@@ -4960,7 +4884,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldStartStateUpdaterOnInit() {
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null);
         taskManager.init();
         verify(stateUpdater).start();
     }
@@ -4996,249 +4920,4 @@ public class TaskManagerTest {
     private File getCheckpointFile(final TaskId task) {
         return new File(new File(testFolder.toAbsolutePath().toString(), 
task.toString()), StateManagerUtil.CHECKPOINT_FILE_NAME);
     }
-
-    private static ConsumerRecord<byte[], byte[]> getConsumerRecord(final 
TopicPartition topicPartition, final long offset) {
-        return new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), offset, null, null);
-    }
-
-    private static class StateMachineTask extends AbstractTask implements Task 
{
-        private final boolean active;
-
-        // TODO: KAFKA-12569 clean up usage of these flags and use the new 
commitCompleted flag where appropriate
-        private boolean commitNeeded = false;
-        private boolean commitRequested = false;
-        private boolean commitPrepared = false;
-        private boolean commitCompleted = false;
-        private Map<TopicPartition, OffsetAndMetadata> committableOffsets = 
Collections.emptyMap();
-        private Map<TopicPartition, Long> purgeableOffsets;
-        private Map<TopicPartition, Long> changelogOffsets = 
Collections.emptyMap();
-        private Set<TopicPartition> partitionsForOffsetReset = 
Collections.emptySet();
-        private Long timeout = null;
-
-        private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], 
byte[]>>> queue = new HashMap<>();
-
-        StateMachineTask(final TaskId id,
-                         final Set<TopicPartition> partitions,
-                         final boolean active,
-                         final ProcessorStateManager processorStateManager) {
-            super(id, null, null, processorStateManager, partitions, (new 
TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task", 
StateMachineTask.class);
-            this.active = active;
-        }
-
-        @Override
-        public void initializeIfNeeded() {
-            if (state() == State.CREATED) {
-                transitionTo(State.RESTORING);
-                if (!active) {
-                    transitionTo(State.RUNNING);
-                }
-            }
-        }
-
-        @Override
-        public void addPartitionsForOffsetReset(final Set<TopicPartition> 
partitionsForOffsetReset) {
-            this.partitionsForOffsetReset = partitionsForOffsetReset;
-        }
-
-        @Override
-        public void completeRestoration(final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
-            if (state() == State.RUNNING) {
-                return;
-            }
-            transitionTo(State.RUNNING);
-        }
-
-        public void setCommitNeeded() {
-            commitNeeded = true;
-        }
-
-        @Override
-        public boolean commitNeeded() {
-            return commitNeeded;
-        }
-
-        public void setCommitRequested() {
-            commitRequested = true;
-        }
-
-        @Override
-        public boolean commitRequested() {
-            return commitRequested;
-        }
-
-        @Override
-        public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
-            commitPrepared = true;
-
-            if (commitNeeded) {
-                if (!clean) {
-                    return null;
-                }
-                return committableOffsets;
-            } else {
-                return Collections.emptyMap();
-            }
-        }
-
-        @Override
-        public void postCommit(final boolean enforceCheckpoint) {
-            commitNeeded = false;
-            commitCompleted = true;
-        }
-
-        @Override
-        public void suspend() {
-            if (state() == State.CLOSED) {
-                throw new IllegalStateException("Illegal state " + state() + " 
while suspending active task " + id);
-            } else if (state() == State.SUSPENDED) {
-                // do nothing
-            } else {
-                transitionTo(State.SUSPENDED);
-            }
-        }
-
-        @Override
-        public void resume() {
-            if (state() == State.SUSPENDED) {
-                transitionTo(State.RUNNING);
-            }
-        }
-
-        @Override
-        public void revive() {
-            //TODO: KAFKA-12569 move clearing of commit-required statuses to 
closeDirty/Clean/AndRecycle methods
-            commitNeeded = false;
-            commitRequested = false;
-            super.revive();
-        }
-
-        @Override
-        public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                                final Exception cause) {
-            timeout = currentWallClockMs;
-        }
-
-        @Override
-        public void clearTaskTimeout() {
-            timeout = null;
-        }
-
-        @Override
-        public void recordRestoration(final Time time, final long numRecords, 
final boolean initRemaining) {
-            // do nothing
-        }
-
-        @Override
-        public void closeClean() {
-            transitionTo(State.CLOSED);
-        }
-
-        @Override
-        public void closeDirty() {
-            transitionTo(State.CLOSED);
-        }
-
-        @Override
-        public void prepareRecycle() {
-            transitionTo(State.CLOSED);
-        }
-
-        @Override
-        public void resumePollingForPartitionsWithAvailableSpace() {
-            // noop
-        }
-
-        @Override
-        public void updateLags() {
-            // noop
-        }
-
-        @Override
-        public void updateInputPartitions(final Set<TopicPartition> 
topicPartitions, final Map<String, List<String>> 
allTopologyNodesToSourceTopics) {
-            inputPartitions = topicPartitions;
-        }
-
-        void setCommittableOffsetsAndMetadata(final Map<TopicPartition, 
OffsetAndMetadata> committableOffsets) {
-            if (!active) {
-                throw new IllegalStateException("Cannot set 
CommittableOffsetsAndMetadate for StandbyTasks");
-            }
-            this.committableOffsets = committableOffsets;
-        }
-
-        @Override
-        public StateStore store(final String name) {
-            return null;
-        }
-
-        @Override
-        public Set<TopicPartition> changelogPartitions() {
-            return changelogOffsets.keySet();
-        }
-
-        public boolean isActive() {
-            return active;
-        }
-
-        void setPurgeableOffsets(final Map<TopicPartition, Long> 
purgeableOffsets) {
-            this.purgeableOffsets = purgeableOffsets;
-        }
-
-        @Override
-        public Map<TopicPartition, Long> purgeableOffsets() {
-            return purgeableOffsets;
-        }
-
-        void setChangelogOffsets(final Map<TopicPartition, Long> 
changelogOffsets) {
-            this.changelogOffsets = changelogOffsets;
-        }
-
-        @Override
-        public Map<TopicPartition, Long> changelogOffsets() {
-            return changelogOffsets;
-        }
-
-        @Override
-        public Map<TopicPartition, Long> committedOffsets() {
-            return Collections.emptyMap();
-        }
-
-        @Override
-        public Map<TopicPartition, Long> highWaterMark() {
-            return Collections.emptyMap();
-        }
-
-        @Override
-        public Optional<Long> timeCurrentIdlingStarted() {
-            return Optional.empty();
-        }
-
-        @Override
-        public void addRecords(final TopicPartition partition, final 
Iterable<ConsumerRecord<byte[], byte[]>> records) {
-            if (isActive()) {
-                final Deque<ConsumerRecord<byte[], byte[]>> partitionQueue =
-                    queue.computeIfAbsent(partition, k -> new LinkedList<>());
-
-                for (final ConsumerRecord<byte[], byte[]> record : records) {
-                    partitionQueue.add(record);
-                }
-            } else {
-                throw new IllegalStateException("Can't add records to an 
inactive task.");
-            }
-        }
-
-        @Override
-        public boolean process(final long wallClockTime) {
-            if (isActive() && state() == State.RUNNING) {
-                for (final LinkedList<ConsumerRecord<byte[], byte[]>> records 
: queue.values()) {
-                    final ConsumerRecord<byte[], byte[]> record = 
records.poll();
-                    if (record != null) {
-                        return true;
-                    }
-                }
-                return false;
-            } else {
-                throw new IllegalStateException("Can't process an inactive or 
non-running task.");
-            }
-        }
-    }
 }

Reply via email to