This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 959818f982a KAFKA-19683: More test replacements [6/N] (#20858)
959818f982a is described below

commit 959818f982a9c8736d1a74e32d091e240bd577e0
Author: Shashank <[email protected]>
AuthorDate: Fri Nov 14 04:41:53 2025 -0800

    KAFKA-19683: More test replacements [6/N] (#20858)
    
    Rewrote more tests in `TaskManagerTest.java`
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../processor/internals/TaskManagerTest.java       | 490 +++++++++------------
 1 file changed, 219 insertions(+), 271 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 01aa1d26541..dc323f019b1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -85,7 +85,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -1928,7 +1927,7 @@ public class TaskManagerTest {
                 mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
             ));
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
@@ -2180,7 +2179,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
-        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
 
         taskManager.handleLostAll();
 
@@ -3323,94 +3322,26 @@ public class TaskManagerTest {
     }
 
     private void 
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final 
ProcessingMode processingMode) {
-        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(processingMode, null, false);
-
-        final TopicPartition changelog = new TopicPartition("changelog", 0);
-        final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
-            mkEntry(taskId00, taskId00Partitions),
-            mkEntry(taskId01, taskId01Partitions),
-            mkEntry(taskId02, taskId02Partitions),
-            mkEntry(taskId03, taskId03Partitions)
-        );
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true, stateManager) {
-            @Override
-            public Set<TopicPartition> changelogPartitions() {
-                return singleton(changelog);
-            }
-        };
-        final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false);
-        final AtomicBoolean closedDirtyTask02 = new AtomicBoolean(false);
-        final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false);
-        final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, 
true, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new TaskMigratedException("migrated", new 
RuntimeException("cause"));
-            }
 
-            @Override
-            public void closeDirty() {
-                super.closeDirty();
-                closedDirtyTask01.set(true);
-            }
-        };
-        final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, 
true, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new RuntimeException("oops");
-            }
-
-            @Override
-            public void closeDirty() {
-                super.closeDirty();
-                closedDirtyTask02.set(true);
-            }
-        };
-        final Task task03 = new StateMachineTask(taskId03, taskId03Partitions, 
true, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new RuntimeException("oops");
-            }
-
-            @Override
-            public void closeDirty() {
-                super.closeDirty();
-                closedDirtyTask03.set(true);
-            }
-        };
-
-        when(activeTaskCreator.createTasks(any(), eq(assignment)))
-            .thenReturn(asList(task00, task01, task02, task03));
-
-        taskManager.handleAssignment(assignment, emptyMap());
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId01Partitions).build();
+        final StreamTask task02 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId02Partitions).build();
 
-        assertThat(task00.state(), is(Task.State.CREATED));
-        assertThat(task01.state(), is(Task.State.CREATED));
-        assertThat(task02.state(), is(Task.State.CREATED));
-        assertThat(task03.state(), is(Task.State.CREATED));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(processingMode, tasks);
 
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+        doThrow(new TaskMigratedException("migrated", new 
RuntimeException("cause")))
+            .when(task01).suspend();
+        doThrow(new RuntimeException("oops"))
+            .when(task02).suspend();
 
-        assertThat(task00.state(), is(Task.State.RESTORING));
-        assertThat(task01.state(), is(Task.State.RUNNING));
-        assertThat(task02.state(), is(Task.State.RUNNING));
-        assertThat(task03.state(), is(Task.State.RUNNING));
-        assertThat(
-            taskManager.activeTaskMap(),
-            Matchers.equalTo(
-                mkMap(
-                    mkEntry(taskId00, task00),
-                    mkEntry(taskId01, task01),
-                    mkEntry(taskId02, task02),
-                    mkEntry(taskId03, task03)
-                )
-            )
-        );
-        assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(changeLogReader).enforceRestoreActive();
-        verify(changeLogReader).completedChangelogs();
+        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
 
         final RuntimeException exception = assertThrows(
             RuntimeException.class,
@@ -3418,62 +3349,51 @@ public class TaskManagerTest {
         );
         assertThat(exception.getCause().getMessage(), is("oops"));
 
-        assertThat(closedDirtyTask01.get(), is(true));
-        assertThat(closedDirtyTask02.get(), is(true));
-        assertThat(closedDirtyTask03.get(), is(true));
-        assertThat(task00.state(), is(Task.State.CLOSED));
-        assertThat(task01.state(), is(Task.State.CLOSED));
-        assertThat(task02.state(), is(Task.State.CLOSED));
-        assertThat(task03.state(), is(Task.State.CLOSED));
+        // Verify tasks that threw exceptions were closed dirty
+        verify(task00).prepareCommit(true);
+        verify(task00).suspend();
+        verify(task00).closeClean();
+        verify(task01).prepareCommit(true);
+        verify(task01, times(2)).suspend();
+        verify(task01).closeDirty();
+        verify(task02).prepareCommit(true);
+        verify(task02, times(2)).suspend();
+        verify(task02).closeDirty();
+
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
         verify(activeTaskCreator).close();
+        verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
     }
 
     @Test
     public void 
shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanShutdown() {
-        final TopicPartition changelog = new TopicPartition("changelog", 0);
-        final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
-            mkEntry(taskId00, taskId00Partitions)
-        );
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true, stateManager) {
-            @Override
-            public Set<TopicPartition> changelogPartitions() {
-                return singleton(changelog);
-            }
-        };
-
-        when(activeTaskCreator.createTasks(any(), 
eq(assignment))).thenReturn(singletonList(task00));
-        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions)
+            .build();
 
-        taskManager.handleAssignment(assignment, emptyMap());
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        assertThat(task00.state(), is(Task.State.CREATED));
+        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
 
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+        when(tasks.activeTasks()).thenReturn(Set.of(task00));
 
-        assertThat(task00.state(), is(Task.State.RESTORING));
-        assertThat(
-            taskManager.activeTaskMap(),
-            Matchers.equalTo(
-                mkMap(
-                    mkEntry(taskId00, task00)
-                )
-            )
+        final RuntimeException exception = assertThrows(
+            RuntimeException.class,
+            () -> taskManager.shutdown(true)
         );
-        assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(changeLogReader).enforceRestoreActive();
-        verify(changeLogReader).completedChangelogs();
 
-        final RuntimeException exception = 
assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
-
-        assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(exception.getMessage(), is("whatever"));
+
+        verify(task00).prepareCommit(true);
+        verify(task00).suspend();
+        verify(task00).closeClean();
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
         verify(activeTaskCreator).close();
+        verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
     }
 
     @Test
@@ -3509,31 +3429,35 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true, stateManager);
+        // will not be revoked
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions).build();
 
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new RuntimeException("task 0_1 suspend boom!");
-            }
-        };
+        // will be revoked and throws exception during suspend
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId01Partitions).build();
+        doThrow(new RuntimeException("task 0_1 suspend 
boom!")).when(task01).suspend();
 
-        final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true, stateManager);
+        // will be revoked with no exception
+        final StreamTask task02 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId02Partitions).build();
 
-        taskManager.addTask(task00);
-        taskManager.addTask(task01);
-        taskManager.addTask(task02);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
+        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
 
         final RuntimeException thrown = assertThrows(RuntimeException.class,
             () -> taskManager.handleRevocation(union(HashSet::new, 
taskId01Partitions, taskId02Partitions)));
-        assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend 
boom!"));
 
-        assertThat(task00.state(), is(Task.State.CREATED));
-        assertThat(task01.state(), is(Task.State.SUSPENDED));
-        assertThat(task02.state(), is(Task.State.SUSPENDED));
+        assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend 
boom!"));
 
+        verify(task01).suspend();
+        verify(task02).suspend();
+        verify(task00, never()).suspend();
         verifyNoInteractions(activeTaskCreator);
     }
 
@@ -3607,29 +3531,36 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCloseStandbyTasksOnShutdown() {
-        final Map<TaskId, Set<TopicPartition>> assignment = 
singletonMap(taskId00, taskId00Partitions);
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
false, stateManager);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final StandbyTask standbyTask00 = standbyTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions)
+            .build();
 
-        // `handleAssignment`
-        
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
+        when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask00));
+        when(stateUpdater.standbyTasks()).thenReturn(Set.of(standbyTask00));
 
-        taskManager.handleAssignment(emptyMap(), assignment);
-        assertThat(task00.state(), is(Task.State.CREATED));
+        final CompletableFuture<StateUpdater.RemovedTaskResult> 
futureForStandbyTask = new CompletableFuture<>();
+        when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
 
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
-        assertThat(taskManager.standbyTaskMap(), 
Matchers.equalTo(singletonMap(taskId00, task00)));
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        futureForStandbyTask.complete(new 
StateUpdater.RemovedTaskResult(standbyTask00)); // simulate successful removal
 
         taskManager.shutdown(true);
-        assertThat(task00.state(), is(Task.State.CLOSED));
-        assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
-        assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+
+        verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+
+        verify(tasks).addTask(standbyTask00);
+
+        verify(standbyTask00).prepareCommit(true);
+        verify(standbyTask00).postCommit(true);
+        verify(standbyTask00).suspend();
+        verify(standbyTask00).closeClean();
+
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
         verify(activeTaskCreator).close();
-        // `tryToCompleteRestoration`
-        verify(consumer).assignment();
-        verify(consumer).resume(eq(emptySet()));
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -3739,36 +3670,29 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldInitializeNewActiveTasks() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
-        when(consumer.assignment()).thenReturn(assignment);
+    public void shouldInitializeNewStandbyTasks() {
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.CREATED)
+            .withInputPartitions(taskId01Partitions)
+            .build();
 
-        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
-            .thenReturn(singletonList(task00));
+        final Map<TaskId, Set<TopicPartition>> assignment = taskId01Assignment;
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task01));
 
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(taskManager.activeTaskMap(), 
Matchers.equalTo(singletonMap(taskId00, task00)));
-        assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        // verifies that we actually resume the assignment at the end of 
restoration.
-        verify(consumer).resume(assignment);
-    }
+        taskManager.handleAssignment(emptyMap(), assignment);
 
-    @Test
-    public void shouldInitialiseNewStandbyTasks() {
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager);
+        verify(tasks).addPendingTasksToInit(singletonList(task01));
 
-        when(consumer.assignment()).thenReturn(assignment);
-        
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
+        when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task01));
 
-        taskManager.handleAssignment(emptyMap(), taskId01Assignment);
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        assertThat(task01.state(), is(Task.State.RUNNING));
-        assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
-        assertThat(taskManager.standbyTaskMap(), 
Matchers.equalTo(singletonMap(taskId01, task01)));
+        verify(task01).initializeIfNeeded();
+        verify(stateUpdater).add(task01);
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -3945,14 +3869,25 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCommitViaConsumerIfEosDisabled() {
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
-        task01.setCommittableOffsetsAndMetadata(offsets);
-        task01.setCommitNeeded();
-        taskManager.addTask(task01);
 
-        taskManager.commitAll();
+        when(task01.commitNeeded()).thenReturn(true);
+        when(task01.prepareCommit(true)).thenReturn(offsets);
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task01));
 
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        assertThat(taskManager.commitAll(), equalTo(1));
+
+        verify(task01, times(2)).commitNeeded();
+        verify(task01).prepareCommit(true);
+        verify(task01).postCommit(false);
         verify(consumer).commitSync(offsets);
     }
 
@@ -4007,50 +3942,48 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPropagateExceptionFromActiveCommit() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
-            @Override
-            public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
-                throw new RuntimeException("opsh.");
-            }
-        };
-
-        when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        when(task00.commitNeeded()).thenReturn(true);
+        when(task00.prepareCommit(true)).thenThrow(new 
RuntimeException("opsh."));
 
-        assertThat(task00.state(), is(Task.State.RUNNING));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        task00.setCommitNeeded();
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final RuntimeException thrown =
-            assertThrows(RuntimeException.class, () -> 
taskManager.commitAll());
+            assertThrows(RuntimeException.class, taskManager::commitAll);
         assertThat(thrown.getMessage(), equalTo("opsh."));
+
+        verify(task00).commitNeeded();
+        verify(task00).prepareCommit(true);
     }
 
     @Test
     public void shouldPropagateExceptionFromStandbyCommit() {
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager) {
-            @Override
-            public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final 
boolean clean) {
-                throw new RuntimeException("opsh.");
-            }
-        };
-
-        when(consumer.assignment()).thenReturn(assignment);
-        
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        taskManager.handleAssignment(emptyMap(), taskId01Assignment);
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        when(task01.commitNeeded()).thenReturn(true);
+        when(task01.prepareCommit(true)).thenThrow(new 
RuntimeException("opsh."));
 
-        assertThat(task01.state(), is(Task.State.RUNNING));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task01));
 
-        task01.setCommitNeeded();
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final RuntimeException thrown =
             assertThrows(RuntimeException.class, () -> 
taskManager.commitAll());
         assertThat(thrown.getMessage(), equalTo("opsh."));
+
+        verify(task01).commitNeeded();
+        verify(task01).prepareCommit(true);
     }
 
     @Test
@@ -4062,27 +3995,24 @@ public class TaskManagerTest {
 
         final InOrder inOrder = inOrder(adminClient);
 
-        final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
-            @Override
-            public Map<TopicPartition, Long> purgeableOffsets() {
-                return purgableOffsets;
-            }
-        };
-
-        when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        when(task00.purgeableOffsets())
+            .thenReturn(new HashMap<>())
+            .thenReturn(singletonMap(t1p1, 5L))
+            .thenReturn(singletonMap(t1p1, 17L));
 
-        assertThat(task00.state(), is(Task.State.RUNNING));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        purgableOffsets.put(t1p1, 5L);
-        taskManager.maybePurgeCommittedRecords();
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        purgableOffsets.put(t1p1, 17L);
-        taskManager.maybePurgeCommittedRecords();
+        taskManager.maybePurgeCommittedRecords(); // no-op
+        taskManager.maybePurgeCommittedRecords(); // sends purge for offset 5L
+        taskManager.maybePurgeCommittedRecords(); // sends purge for offset 17L
 
         inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, 
RecordsToDelete.beforeOffset(5L)));
         inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, 
RecordsToDelete.beforeOffset(17L)));
@@ -4095,29 +4025,27 @@ public class TaskManagerTest {
         when(adminClient.deleteRecords(singletonMap(t1p1, 
RecordsToDelete.beforeOffset(5L))))
             .thenReturn(new DeleteRecordsResult(singletonMap(t1p1, 
futureDeletedRecords)));
 
-        final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
-            @Override
-            public Map<TopicPartition, Long> purgeableOffsets() {
-                return purgableOffsets;
-            }
-        };
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(task00.purgeableOffsets())
+            .thenReturn(new HashMap<>())
+            .thenReturn(singletonMap(t1p1, 5L))
+            .thenReturn(singletonMap(t1p1, 17L));
 
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00));
 
-        assertThat(task00.state(), is(Task.State.RUNNING));
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        purgableOffsets.put(t1p1, 5L);
+        taskManager.maybePurgeCommittedRecords();
         taskManager.maybePurgeCommittedRecords();
 
         // this call should be a no-op.
-        // this is verified, as there is no expectation on adminClient for 
this second call,
-        // so it would fail verification if we invoke the admin client again.
-        purgableOffsets.put(t1p1, 17L);
+        // because the previous deleteRecords request
+        // has not completed yet, so no new request is sent.
         taskManager.maybePurgeCommittedRecords();
     }
 
@@ -4425,7 +4353,6 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPunctuateActiveTasks() {
-
         final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
             .withInputPartitions(taskId00Partitions)
             .inState(State.RUNNING)
@@ -4818,45 +4745,66 @@ public class TaskManagerTest {
 
     @Test
     public void shouldConvertActiveTaskToStandbyTask() {
-        final StreamTask activeTask = mock(StreamTask.class);
-        when(activeTask.id()).thenReturn(taskId00);
-        when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
-        when(activeTask.isActive()).thenReturn(true);
-
-        final StandbyTask standbyTask = mock(StandbyTask.class);
-        when(standbyTask.id()).thenReturn(taskId00);
+        final StreamTask activeTaskToRecycle = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StandbyTask recycledStandbyTask = standbyTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.CREATED)
+            .withInputPartitions(taskId00Partitions).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
-        when(standbyTaskCreator.createStandbyTaskFromActive(any(), 
eq(taskId00Partitions))).thenReturn(standbyTask);
+        when(activeTaskCreator.createTasks(consumer, 
taskId00Assignment)).thenReturn(singletonList(activeTaskToRecycle));
+        
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId00Partitions))
+            .thenReturn(recycledStandbyTask);
 
+        // create active task
         taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
+
+        // convert active to standby
+        when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
+        final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
+        when(stateUpdater.remove(activeTaskToRecycle.id())).thenReturn(future);
+        future.complete(new 
StateUpdater.RemovedTaskResult(activeTaskToRecycle));
+
         taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
 
-        verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
+        verify(activeTaskCreator).createTasks(consumer, emptyMap());
         verify(standbyTaskCreator, 
times(2)).createTasks(Collections.emptyMap());
-        verifyNoInteractions(consumer);
+        
verify(standbyTaskCreator).createStandbyTaskFromActive(activeTaskToRecycle, 
taskId00Partitions);
+        
verify(tasks).addPendingTasksToInit(Collections.singleton(recycledStandbyTask));
     }
 
     @Test
     public void shouldConvertStandbyTaskToActiveTask() {
-        final StandbyTask standbyTask = mock(StandbyTask.class);
-        when(standbyTask.id()).thenReturn(taskId00);
-        when(standbyTask.isActive()).thenReturn(false);
-        
when(standbyTask.prepareCommit(true)).thenReturn(Collections.emptyMap());
-
-        final StreamTask activeTask = mock(StreamTask.class);
-        when(activeTask.id()).thenReturn(taskId00);
-        when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
-        
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTask));
-        when(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), 
eq(taskId00Partitions), any()))
-            .thenReturn(activeTask);
+        final StandbyTask standbyTaskToRecycle = standbyTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamTask recycledActiveTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.CREATED)
+            .withInputPartitions(taskId00Partitions).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
+        
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTaskToRecycle));
+        
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, 
taskId00Partitions, consumer))
+            .thenReturn(recycledActiveTask);
+
+        // create standby task
         taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
+
+        // convert standby to active
+        when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
+        final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
+        
when(stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(future);
+        future.complete(new 
StateUpdater.RemovedTaskResult(standbyTaskToRecycle));
+
         taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
 
-        verify(activeTaskCreator, times(2)).createTasks(any(), eq(emptyMap()));
+        verify(activeTaskCreator, times(2)).createTasks(consumer, emptyMap());
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        verifyNoInteractions(consumer);
+        
verify(activeTaskCreator).createActiveTaskFromStandby(standbyTaskToRecycle, 
taskId00Partitions, consumer);
+        
verify(tasks).addPendingTasksToInit(Collections.singleton(recycledActiveTask));
     }
 
     @Test

Reply via email to