This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 59f51fb3cac KAFKA-19683: Remove more dead tests and rewrote 3 tests in 
TaskManagerTest [2/N] (#20544)
59f51fb3cac is described below

commit 59f51fb3cac66d8096e647eb90be6c5e4ba9f485
Author: Shashank <[email protected]>
AuthorDate: Fri Oct 10 06:31:28 2025 -0700

    KAFKA-19683: Remove more dead tests and rewrote 3 tests in TaskManagerTest 
[2/N] (#20544)
    
    Changes made
    - Additional `setUpTaskManager()` overloaded method -- Created this
    temporarily to pass the CI pipelines so that I can work on the failing
    tests incrementally
    - Rewrote 3 tests to use stateUpdater thread
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../processor/internals/TaskManagerTest.java       | 360 ++++++++++-----------
 1 file changed, 163 insertions(+), 197 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8d83d1e99fa..f89faffd971 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -61,8 +61,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -216,25 +214,38 @@ public class TaskManagerTest {
 
     @BeforeEach
     public void setUp() {
-        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
+        taskManager = 
setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 null, false);
     }
 
-    private TaskManager setUpTaskManager(final ProcessingMode processingMode, 
final TasksRegistry tasks) {
-        return setUpTaskManager(processingMode, tasks, false);
+    private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode 
processingMode, final TasksRegistry tasks) {
+        return setUpTaskManagerWithStateUpdater(processingMode, tasks, false);
     }
 
-    private TaskManager setUpTaskManager(final ProcessingMode processingMode, 
final boolean stateUpdaterEnabled) {
-        return setUpTaskManager(processingMode, null, stateUpdaterEnabled, 
false);
-    }
-
-    private TaskManager setUpTaskManager(final ProcessingMode processingMode, 
final TasksRegistry tasks, final boolean stateUpdaterEnabled) {
-        return setUpTaskManager(processingMode, tasks, stateUpdaterEnabled, 
false);
+    private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode 
processingMode,
+                                                         final TasksRegistry 
tasks,
+                                                         final boolean 
processingThreadsEnabled) {
+        topologyMetadata = new TopologyMetadata(topologyBuilder, new 
DummyStreamsConfig(processingMode));
+        final TaskManager taskManager = new TaskManager(
+            time,
+            changeLogReader,
+            ProcessId.randomProcessId(),
+            "taskManagerTest",
+            activeTaskCreator,
+            standbyTaskCreator,
+            tasks != null ? tasks : new Tasks(new LogContext()),
+            topologyMetadata,
+            adminClient,
+            stateDirectory,
+            stateUpdater,
+            processingThreadsEnabled ? schedulingTaskManager : null
+        );
+        taskManager.setMainConsumer(consumer);
+        return taskManager;
     }
 
-    private TaskManager setUpTaskManager(final ProcessingMode processingMode,
-                                         final TasksRegistry tasks,
-                                         final boolean stateUpdaterEnabled,
-                                         final boolean 
processingThreadsEnabled) {
+    private TaskManager setUpTaskManagerWithoutStateUpdater(final 
ProcessingMode processingMode,
+                                                            final 
TasksRegistry tasks,
+                                                            final boolean 
processingThreadsEnabled) {
         topologyMetadata = new TopologyMetadata(topologyBuilder, new 
DummyStreamsConfig(processingMode));
         final TaskManager taskManager = new TaskManager(
             time,
@@ -247,7 +258,7 @@ public class TaskManagerTest {
             topologyMetadata,
             adminClient,
             stateDirectory,
-            stateUpdaterEnabled ? stateUpdater : null,
+            null,
             processingThreadsEnabled ? schedulingTaskManager : null
         );
         taskManager.setMainConsumer(consumer);
@@ -261,7 +272,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
         when(tasks.task(taskId00)).thenReturn(activeTask1);
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -283,7 +294,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -296,14 +307,14 @@ public class TaskManagerTest {
     @Test
     public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(taskId00, taskId00Partitions)),
-                mkMap(mkEntry(taskId01, taskId01Partitions))
+            mkMap(mkEntry(taskId01, taskId01Partitions))
         );
 
         verify(schedulingTaskManager).lockTasks(Set.of(taskId00, taskId01));
@@ -319,7 +330,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -339,7 +350,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -359,7 +370,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
 
         taskManager.resumePollingForPartitionsWithAvailableSpace();
@@ -377,7 +388,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
 
         taskManager.updateLags();
@@ -392,7 +403,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -412,7 +423,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -433,7 +444,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -453,7 +464,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -474,7 +485,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
@@ -501,7 +512,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
         
when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
@@ -529,7 +540,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId03Partitions))
             .thenReturn(recycledStandbyTask);
@@ -553,7 +564,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
activeTaskToRecycle.inputPartitions()))
             .thenThrow(new RuntimeException());
@@ -583,7 +594,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
         
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, 
taskId03Partitions, consumer))
             .thenReturn(recycledActiveTask);
@@ -607,7 +618,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
         when(activeTaskCreator.createActiveTaskFromStandby(
             standbyTaskToRecycle,
@@ -637,7 +648,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask));
 
         taskManager.handleAssignment(
@@ -656,7 +667,7 @@ public class TaskManagerTest {
             .inState(State.SUSPENDED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
 
         taskManager.handleAssignment(
@@ -676,7 +687,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle));
         final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
         when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
@@ -706,7 +717,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle));
         final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
         when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
@@ -736,7 +747,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign));
         final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
         when(stateUpdater.remove(failedActiveTaskToReassign.id()))
@@ -769,7 +780,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
         when(stateUpdater.remove(reassignedActiveTask2.id()))
@@ -795,7 +806,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
 
         taskManager.handleAssignment(
@@ -813,7 +824,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask));
 
         taskManager.handleAssignment(
@@ -837,7 +848,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose, 
standbyTaskToRecycle));
         final CompletableFuture<StateUpdater.RemovedTaskResult> 
futureForActiveTaskToClose = new CompletableFuture<>();
         
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
@@ -872,7 +883,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
runningActiveTask)));
@@ -896,7 +907,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
activeTask)));
         assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, 
activeTask)));
@@ -908,7 +919,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         final Set<Task> createdTasks = Set.of(activeTaskToBeCreated);
         final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
             mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions()));
@@ -926,7 +937,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         final Set<Task> createdTasks = Set.of(standbyTaskToBeCreated);
         when(standbyTaskCreator.createTasks(mkMap(
             mkEntry(standbyTaskToBeCreated.id(), 
standbyTaskToBeCreated.inputPartitions())))
@@ -953,7 +964,7 @@ public class TaskManagerTest {
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
             .thenReturn(standbyTask);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
 
@@ -964,28 +975,6 @@ public class TaskManagerTest {
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
-    @Test
-    public void 
shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdaterDisabled() 
{
-        final StreamTask activeTaskToRecycle = statefulTask(taskId01, 
taskId01ChangelogPartitions)
-            .withInputPartitions(taskId01Partitions)
-            .inState(State.RUNNING).build();
-        final StandbyTask standbyTask = standbyTask(taskId01, 
taskId01ChangelogPartitions)
-            .withInputPartitions(taskId01Partitions)
-            .inState(State.CREATED).build();
-        final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(activeTaskToRecycle));
-        
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
-            .thenReturn(standbyTask);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
-
-        taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
-
-        verify(activeTaskToRecycle).prepareCommit(true);
-        verify(tasks).replaceActiveWithStandby(standbyTask);
-        verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
-        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-    }
-
     @Test
     public void 
shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled()
 {
         final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, 
taskId03ChangelogPartitions)
@@ -993,7 +982,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -1014,7 +1003,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
@@ -1032,7 +1021,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
 
         final IllegalStateException illegalStateException = assertThrows(
@@ -1052,7 +1041,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
         
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, 
newInputPartitions)).thenReturn(true);
 
@@ -1072,7 +1061,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
@@ -1090,7 +1079,7 @@ public class TaskManagerTest {
             .inState(State.SUSPENDED)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
@@ -1112,7 +1101,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
 
         final IllegalStateException illegalStateException = assertThrows(
@@ -1137,7 +1126,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(
@@ -1163,7 +1152,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, 
task01));
-        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
@@ -1185,7 +1174,7 @@ public class TaskManagerTest {
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, 
task01));
         final LockException lockException = new LockException("Where are my 
keys??");
         doThrow(lockException).when(task00).initializeIfNeeded();
-        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
@@ -1209,7 +1198,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, 
task01));
         doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
-        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
@@ -1255,7 +1244,7 @@ public class TaskManagerTest {
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
         final RuntimeException runtimeException = new 
RuntimeException("KABOOM!");
         doThrow(runtimeException).when(task00).initializeIfNeeded();
-        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
 
         final StreamsException streamsException = assertThrows(
             StreamsException.class,
@@ -1281,7 +1270,7 @@ public class TaskManagerTest {
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
         when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0, 
statefulTask1, statefulTask2));
         doThrow(new 
TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
         doThrow(new 
TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
@@ -1302,7 +1291,7 @@ public class TaskManagerTest {
     public void 
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertFalse(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
     }
@@ -1311,7 +1300,7 @@ public class TaskManagerTest {
     public void 
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit()
 {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.hasPendingTasksToInit()).thenReturn(true);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertFalse(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
     }
@@ -1319,7 +1308,7 @@ public class TaskManagerTest {
     @Test
     public void 
shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit()
 {
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertTrue(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
     }
@@ -1539,7 +1528,7 @@ public class TaskManagerTest {
 
     private TaskManager setupForRevocationAndLost(final Set<Task> 
tasksInStateUpdater,
                                                   final TasksRegistry tasks) {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater);
 
         return taskManager;
@@ -1608,12 +1597,12 @@ public class TaskManagerTest {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks);
 
-        return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, 
tasks);
     }
 
     @Test
     public void 
shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
         when(stateUpdater.restoresActiveTasks()).thenReturn(false);
         assertTrue(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@@ -1631,7 +1620,7 @@ public class TaskManagerTest {
         
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final StreamsException thrown = assertThrows(
             StreamsException.class,
@@ -1658,7 +1647,7 @@ public class TaskManagerTest {
         
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0,
 exceptionAndTasks1));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final TaskCorruptedException thrown = assertThrows(
             TaskCorruptedException.class,
@@ -1742,7 +1731,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
         final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
         when(consumer.assignment()).thenReturn(assigned);
@@ -1787,7 +1776,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, 
restoringStatefulTask));
         
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
@@ -1819,7 +1808,7 @@ public class TaskManagerTest {
         when(runningStatefulTask.changelogOffsets())
             .thenReturn(mkMap(mkEntry(t1p0changelog, 
changelogOffsetOfRunningTask)));
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
 
         assertThat(
@@ -1850,7 +1839,7 @@ public class TaskManagerTest {
         final Map<TopicPartition, Long> changelogOffsetInCheckpoint = 
mkMap(mkEntry(t1p0changelog, 24L));
         writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
         taskManager.handleRebalanceStart(singleton("topic"));
 
@@ -1868,7 +1857,7 @@ public class TaskManagerTest {
         final Map<TopicPartition, Long> changelogOffsetInCheckpoint = 
mkMap(mkEntry(t1p0changelog, 24L));
         writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
         taskManager.handleRebalanceStart(singleton("topic"));
 
@@ -1893,7 +1882,7 @@ public class TaskManagerTest {
         when(restoringStandbyTask.changelogOffsets())
             .thenReturn(mkMap(mkEntry(t1p2changelog, 
changelogOffsetOfRestoringStandbyTask)));
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, 
restoringStatefulTask));
 
@@ -1918,7 +1907,7 @@ public class TaskManagerTest {
                 mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
             ));
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
@@ -1955,17 +1944,23 @@ public class TaskManagerTest {
         );
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
15L));
 
+        final StandbyTask standbyTask = standbyTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions)
+            .build();
+        when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets);
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        taskManager = 
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
 tasks, false);
+
+        when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask));
+
         expectLockObtainedFor(taskId00);
         expectDirectoryNotEmpty(taskId00);
         makeTaskFolders(taskId00.toString());
 
         taskManager.handleRebalanceStart(singleton("topic"));
-        final StateMachineTask restoringTask = handleAssignment(
-            emptyMap(),
-            taskId00Assignment,
-            emptyMap()
-        ).get(taskId00);
-        restoringTask.setChangelogOffsets(changelogOffsets);
+        taskManager.handleAssignment(emptyMap(), taskId00Assignment);
 
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
     }
@@ -2174,7 +2169,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
 
         taskManager.handleLostAll();
 
@@ -2190,7 +2185,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
         when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
 
@@ -2281,38 +2276,43 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
-        final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
-
-        final StateMachineTask corruptedTask = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
-        final StateMachineTask nonCorruptedTask = new 
StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
+        final StreamTask corruptedTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        final Map<TaskId, Set<TopicPartition>> firstAssignment = new 
HashMap<>(taskId00Assignment);
-        firstAssignment.putAll(taskId01Assignment);
+        final StreamTask nonCorruptedTask = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        // `handleAssignment`
-        when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
-            .thenReturn(asList(corruptedTask, nonCorruptedTask));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.task(taskId00)).thenReturn(corruptedTask);
+        when(tasks.allTasksPerId()).thenReturn(mkMap(
+            mkEntry(taskId00, corruptedTask),
+            mkEntry(taskId01, nonCorruptedTask)
+        ));
+        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
 
-        when(consumer.assignment())
-            .thenReturn(assignment)
-            .thenReturn(taskId00Partitions);
+        when(nonCorruptedTask.commitNeeded()).thenReturn(true);
+        when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
+        when(corruptedTask.prepareCommit(false)).thenReturn(emptyMap());
+        doNothing().when(corruptedTask).postCommit(anyBoolean());
 
-        taskManager.handleAssignment(firstAssignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
tp -> assertThat(tp, is(empty()))), is(true));
+        when(consumer.assignment()).thenReturn(taskId00Partitions);
 
-        assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
-        nonCorruptedTask.setCommitNeeded();
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
-        taskManager.handleCorruption(singleton(taskId00));
+        taskManager.handleCorruption(Set.of(taskId00));
 
-        assertTrue(nonCorruptedTask.commitPrepared);
-        assertThat(nonCorruptedTask.partitionsForOffsetReset, 
equalTo(Collections.emptySet()));
-        assertThat(corruptedTask.partitionsForOffsetReset, 
equalTo(taskId00Partitions));
+        verify(nonCorruptedTask).prepareCommit(true);
+        verify(nonCorruptedTask, never()).addPartitionsForOffsetReset(any());
+        verify(corruptedTask).addPartitionsForOffsetReset(taskId00Partitions);
+        verify(corruptedTask).changelogPartitions();
+        verify(corruptedTask).postCommit(true);
 
         // check that we should not commit empty map either
         verify(consumer, never()).commitSync(emptyMap());
-        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
@@ -2359,7 +2359,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
         when(tasks.task(taskId02)).thenReturn(corruptedTask);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(consumer.assignment()).thenReturn(intersection(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
         taskManager.handleCorruption(Set.of(taskId02));
@@ -2372,37 +2372,6 @@ public class TaskManagerTest {
         verify(standbyTask, never()).postCommit(anyBoolean());
     }
 
-    @Test
-    public void 
shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStandbyTasksWithStateUpdaterDisabled()
 {
-        final StreamTask activeRestoringTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
-            .withInputPartitions(taskId00Partitions)
-            .inState(State.RESTORING).build();
-        final StandbyTask standbyTask = standbyTask(taskId01, 
taskId01ChangelogPartitions)
-            .withInputPartitions(taskId01Partitions)
-            .inState(State.RUNNING).build();
-        when(standbyTask.commitNeeded()).thenReturn(true);
-        final StreamTask corruptedTask = statefulTask(taskId02, 
taskId02ChangelogPartitions)
-            .withInputPartitions(taskId02Partitions)
-            .inState(State.RUNNING).build();
-        final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
-            mkEntry(taskId00, activeRestoringTask),
-            mkEntry(taskId01, standbyTask),
-            mkEntry(taskId02, corruptedTask)
-        ));
-        when(tasks.task(taskId02)).thenReturn(corruptedTask);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
-        when(consumer.assignment()).thenReturn(intersection(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
-        taskManager.handleCorruption(Set.of(taskId02));
-
-        verify(activeRestoringTask, never()).commitNeeded();
-        verify(activeRestoringTask, never()).prepareCommit(true);
-        verify(activeRestoringTask, never()).postCommit(anyBoolean());
-        verify(standbyTask).prepareCommit(true);
-        verify(standbyTask).postCommit(anyBoolean());
-    }
-
     @Test
     public void 
shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
         final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
@@ -2545,7 +2514,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
 {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
         final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
@@ -2677,7 +2646,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
 {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
         final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
@@ -2919,7 +2888,7 @@ public class TaskManagerTest {
     @Test
     public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
         final StreamsProducer producer = mock(StreamsProducer.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
 
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
         final Map<TopicPartition, OffsetAndMetadata> offsets00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -3076,7 +3045,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
 
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
 
@@ -3212,7 +3181,7 @@ public class TaskManagerTest {
     }
 
     private void 
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final 
ProcessingMode processingMode) {
-        final TaskManager taskManager = setUpTaskManager(processingMode, null, 
false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(processingMode, null, false);
 
         final TopicPartition changelog = new TopicPartition("changelog", 0);
         final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
@@ -3367,7 +3336,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
-        setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, 
null, false);
 
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
false, stateManager);
 
@@ -3533,7 +3502,7 @@ public class TaskManagerTest {
                 new ExceptionAndTask(new RuntimeException(), 
failedStatefulTask),
                 new ExceptionAndTask(new RuntimeException(), 
failedStandbyTask))
             );
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.shutdown(true);
 
@@ -3547,7 +3516,7 @@ public class TaskManagerTest {
     @Test
     public void shouldShutdownSchedulingTaskManager() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
         taskManager.shutdown(true);
 
@@ -3596,7 +3565,7 @@ public class TaskManagerTest {
             new ExceptionAndTask(new StreamsException("KABOOM!"), 
removedFailedStatefulTaskDuringRemoval),
             new ExceptionAndTask(new StreamsException("KABOOM!"), 
removedFailedStandbyTaskDuringRemoval)
         ));
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         futureForRemovedStatefulTask.complete(new 
StateUpdater.RemovedTaskResult(removedStatefulTask));
         futureForRemovedStandbyTask.complete(new 
StateUpdater.RemovedTaskResult(removedStandbyTask));
         futureForRemovedFailedStatefulTask
@@ -3825,7 +3794,7 @@ public class TaskManagerTest {
         allOffsets.putAll(offsetsT01);
         allOffsets.putAll(offsetsT02);
 
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
 
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
         task01.setCommittableOffsetsAndMetadata(offsetsT01);
@@ -4248,28 +4217,30 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPunctuateActiveTasks() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
-            @Override
-            public boolean maybePunctuateStreamTime() {
-                return true;
-            }
 
-            @Override
-            public boolean maybePunctuateSystemTime() {
-                return true;
-            }
-        };
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(task00.maybePunctuateStreamTime()).thenReturn(true);
+        when(task00.maybePunctuateSystemTime()).thenReturn(true);
 
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.activeTasks()).thenReturn(Set.of(task00));
 
-        assertThat(task00.state(), is(Task.State.RUNNING));
+        when(stateUpdater.restoresActiveTasks()).thenReturn(false);
+        when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(false);
+        
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of());
+        
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of());
+
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         // one for stream and one for system time
         assertThat(taskManager.punctuate(), equalTo(2));
+
+        verify(task00).maybePunctuateStreamTime();
+        verify(task00).maybePunctuateSystemTime();
     }
 
     @Test
@@ -4543,7 +4514,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -4748,7 +4719,7 @@ public class TaskManagerTest {
     @Test
     public void 
shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() {
         final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
         final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
 
         final StreamTask activeTask = statefulTask(taskId00, 
taskId00ChangelogPartitions).build();
@@ -4786,7 +4757,7 @@ public class TaskManagerTest {
     @Test
     public void 
shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() {
         final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
         final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
 
         when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
@@ -4816,16 +4787,11 @@ public class TaskManagerTest {
         assertEquals(Collections.singletonMap(taskId00, startupTask), 
taskManager.standbyTaskMap());
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void shouldStartStateUpdaterOnInit(final boolean 
stateUpdaterEnabled) {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled);
+    @Test
+    public void shouldStartStateUpdaterOnInit() {
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
         taskManager.init();
-        if (stateUpdaterEnabled) {
-            verify(stateUpdater).start();
-        } else {
-            verify(stateUpdater, never()).start();
-        }
+        verify(stateUpdater).start();
     }
 
     private static KafkaFutureImpl<DeletedRecords> completedFuture() {

Reply via email to