This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22db7b9f4c3 KAFKA-19683: More test replacements [5/N] (#20818)
22db7b9f4c3 is described below

commit 22db7b9f4c3f9e936a0eb6457b91edda01f44099
Author: Shashank <[email protected]>
AuthorDate: Fri Nov 7 03:18:54 2025 -0800

    KAFKA-19683: More test replacements [5/N] (#20818)
    
    Replaced more tests in `TaskManagerTest.java`
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../processor/internals/TaskManagerTest.java       | 381 +++++++++++----------
 1 file changed, 208 insertions(+), 173 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 0156d101ed8..7db9c957fe0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -61,6 +61,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -2911,49 +2913,50 @@ public class TaskManagerTest {
 
     @Test
     public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
-        final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
-            mkEntry(taskId00, taskId00Partitions)
-        );
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true, stateManager) {
-            @Override
-            public void completeRestoration(final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
-                throw new TimeoutException("timeout!");
-            }
-        };
-
-        when(activeTaskCreator.createTasks(any(), 
eq(assignment))).thenReturn(singletonList(task00));
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RESTORING)
+            .build();
 
-        taskManager.handleAssignment(assignment, emptyMap());
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        assertThat(task00.state(), is(Task.State.CREATED));
+        when(stateUpdater.restoresActiveTasks()).thenReturn(true);
+        
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00));
+        final TimeoutException timeoutException = new 
TimeoutException("timeout!");
+        doThrow(timeoutException).when(task00).completeRestoration(any());
 
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(false));
+        final boolean restorationComplete = 
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        assertThat(task00.state(), is(Task.State.RESTORING));
-        assertThat(
-            taskManager.activeTaskMap(),
-            Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
-        );
-        assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(changeLogReader).enforceRestoreActive();
+        assertFalse(restorationComplete);
+        verify(task00).completeRestoration(any());
+        verify(stateUpdater).add(task00);
+        verify(tasks, never()).addTask(task00);
         verifyNoInteractions(consumer);
     }
 
     @Test
     public void shouldSuspendActiveTasksDuringRevocation() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00));
+
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
-        task00.setCommittableOffsetsAndMetadata(offsets);
 
-        when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(task00.commitNeeded()).thenReturn(true);
+        when(task00.prepareCommit(true)).thenReturn(offsets);
 
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
-        assertThat(task00.state(), is(Task.State.SUSPENDED));
+
+        verify(task00).prepareCommit(true);
+        verify(task00).postCommit(true);
+        verify(task00).suspend();
     }
 
     @SuppressWarnings("removal")
@@ -3041,218 +3044,233 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCommitAllNeededTasksOnHandleRevocation() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
+        // revoked task that needs commit
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
         final Map<TopicPartition, OffsetAndMetadata> offsets00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
-        task00.setCommittableOffsetsAndMetadata(offsets00);
-        task00.setCommitNeeded();
+        when(task00.commitNeeded()).thenReturn(true);
+        when(task00.prepareCommit(true)).thenReturn(offsets00);
 
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
+        // non revoked task that needs commit
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
         final Map<TopicPartition, OffsetAndMetadata> offsets01 = 
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
-        task01.setCommittableOffsetsAndMetadata(offsets01);
-        task01.setCommitNeeded();
+        when(task01.commitNeeded()).thenReturn(true);
+        when(task01.prepareCommit(true)).thenReturn(offsets01);
 
-        final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true, stateManager);
-        final Map<TopicPartition, OffsetAndMetadata> offsets02 = 
singletonMap(t1p2, new OffsetAndMetadata(2L, null));
-        task02.setCommittableOffsetsAndMetadata(offsets02);
+        // non revoked task that does NOT need commit
+        final StreamTask task02 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .withInputPartitions(taskId02Partitions)
+            .inState(State.RUNNING)
+            .build();
+        when(task02.commitNeeded()).thenReturn(false);
 
-        final StateMachineTask task10 = new StateMachineTask(taskId10, 
taskId10Partitions, false, stateManager);
+        // standby task (not be affected by revocation)
+        final StandbyTask task03 = standbyTask(taskId03, 
taskId03ChangelogPartitions)
+            .withInputPartitions(taskId03Partitions)
+            .inState(State.RUNNING)
+            .build();
 
         final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets 
= new HashMap<>();
         expectedCommittedOffsets.putAll(offsets00);
         expectedCommittedOffsets.putAll(offsets01);
 
-        final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
-            mkEntry(taskId00, taskId00Partitions),
-            mkEntry(taskId01, taskId01Partitions),
-            mkEntry(taskId02, taskId02Partitions)
-        );
-
-        final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
-            mkEntry(taskId10, taskId10Partitions)
-        );
-        when(consumer.assignment()).thenReturn(assignment);
-
-        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
-            .thenReturn(asList(task00, task01, task02));
-        when(standbyTaskCreator.createTasks(assignmentStandby))
-            .thenReturn(singletonList(task10));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
 
-        taskManager.handleAssignment(assignmentActive, assignmentStandby);
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(task01.state(), is(Task.State.RUNNING));
-        assertThat(task02.state(), is(Task.State.RUNNING));
-        assertThat(task10.state(), is(Task.State.RUNNING));
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
-        assertThat(task00.commitNeeded, is(false));
-        assertThat(task00.commitPrepared, is(true));
-        assertThat(task01.commitNeeded, is(false));
-        assertThat(task01.commitPrepared, is(true));
-        assertThat(task02.commitPrepared, is(false));
-        assertThat(task10.commitPrepared, is(false));
+        // both tasks needing commit had prepareCommit called
+        verify(task00).prepareCommit(true);
+        verify(task01).prepareCommit(true);
+        verify(task02, never()).prepareCommit(anyBoolean());
+        verify(task03, never()).prepareCommit(anyBoolean());
 
         verify(consumer).commitSync(expectedCommittedOffsets);
-    }
-
-    @Test
-    public void shouldNotCommitIfNoRevokedTasksNeedCommitting() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
 
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
-        task01.setCommitNeeded();
-
-        final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true, stateManager);
-
-        final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
-            mkEntry(taskId00, taskId00Partitions),
-            mkEntry(taskId01, taskId01Partitions),
-            mkEntry(taskId02, taskId02Partitions)
-        );
-
-        when(consumer.assignment()).thenReturn(assignment);
-
-        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
-            .thenReturn(asList(task00, task01, task02));
-
-        taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(task01.state(), is(Task.State.RUNNING));
-        assertThat(task02.state(), is(Task.State.RUNNING));
+        // revoked task suspended
+        verify(task00).suspend();
+        verify(task00).postCommit(true);
 
-        taskManager.handleRevocation(taskId00Partitions);
+        // non-revoked task with commit was also post-committed (but not 
suspended)
+        verify(task01).postCommit(false);
+        verify(task01, never()).suspend();
 
-        assertThat(task00.commitPrepared, is(false));
-        assertThat(task01.commitPrepared, is(false));
-        assertThat(task02.commitPrepared, is(false));
+        // task02 and task03 should not be affected
+        verify(task02, never()).postCommit(anyBoolean());
+        verify(task02, never()).suspend();
+        verify(task03, never()).postCommit(anyBoolean());
+        verify(task03, never()).suspend();
     }
 
-    @Test
-    public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
-        final TaskManager taskManager = 
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, 
false);
-
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
-
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
-        task01.setCommitNeeded();
+    @ParameterizedTest
+    @EnumSource(ProcessingMode.class)
+    public void shouldNotCommitIfNoRevokedTasksNeedCommitting(final 
ProcessingMode processingMode) {
+        // task00 being revoked, no commit needed
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true, stateManager);
+        // task01 NOT being revoked, commit needed
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
-            mkEntry(taskId00, taskId00Partitions),
-            mkEntry(taskId01, taskId01Partitions),
-            mkEntry(taskId02, taskId02Partitions)
-        );
+        // task02 NOT being revoked, no commit needed
+        final StreamTask task02 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .withInputPartitions(taskId02Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        when(consumer.assignment()).thenReturn(assignment);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
 
-        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
-            .thenReturn(asList(task00, task01, task02));
+        when(task00.commitNeeded()).thenReturn(false);
+        when(task01.commitNeeded()).thenReturn(true); // only task01 needs 
commit
+        when(task02.commitNeeded()).thenReturn(false);
 
-        taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(task01.state(), is(Task.State.RUNNING));
-        assertThat(task02.state(), is(Task.State.RUNNING));
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(processingMode, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
-        assertThat(task00.commitPrepared, is(false));
-        assertThat(task01.commitPrepared, is(false));
-        assertThat(task02.commitPrepared, is(false));
+        verify(task00, never()).prepareCommit(anyBoolean());
+        verify(task01, never()).prepareCommit(anyBoolean());
+        verify(task02, never()).prepareCommit(anyBoolean());
+
+        verify(task00).suspend();
+        verify(task01, never()).suspend();
+        verify(task02, never()).suspend();
     }
 
     @Test
     public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
-        final Map<TopicPartition, OffsetAndMetadata> offsets00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
-        task00.setCommittableOffsetsAndMetadata(offsets00);
-        task00.setCommitNeeded();
-
-        final StateMachineTask task10 = new StateMachineTask(taskId10, 
taskId10Partitions, false, stateManager);
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId01Partitions).build();
 
-        final Map<TaskId, Set<TopicPartition>> assignmentActive = 
singletonMap(taskId00, taskId00Partitions);
-        final Map<TaskId, Set<TopicPartition>> assignmentStandby = 
singletonMap(taskId10, taskId10Partitions);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(consumer.assignment()).thenReturn(assignment);
+        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(stateUpdater.tasks()).thenReturn(Set.of(task01));
 
-        when(activeTaskCreator.createTasks(any(), 
eq(assignmentActive))).thenReturn(singleton(task00));
-        
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
+        final Map<TaskId, Set<TopicPartition>> assignmentActive = 
singletonMap(taskId00, taskId00Partitions);
+        final Map<TaskId, Set<TopicPartition>> assignmentStandby = 
singletonMap(taskId01, taskId01Partitions);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(task10.state(), is(Task.State.RUNNING));
 
-        taskManager.handleAssignment(assignmentActive, assignmentStandby);
+        // active task stays in task manager
+        verify(tasks, never()).removeTask(task00);
+        verify(task00, never()).prepareCommit(anyBoolean());
+        verify(task00, never()).postCommit(anyBoolean());
+
+        // standby task not removed from state updater
+        verify(stateUpdater, never()).remove(task01.id());
+        verify(task01, never()).prepareCommit(anyBoolean());
+        verify(task01, never()).postCommit(anyBoolean());
 
-        assertThat(task00.commitNeeded, is(true));
-        assertThat(task10.commitPrepared, is(false));
+        verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
     public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
-        final Map<TopicPartition, OffsetAndMetadata> offsets00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
-        task00.setCommittableOffsetsAndMetadata(offsets00);
-        task00.setCommitNeeded();
-
-        final StateMachineTask task10 = new StateMachineTask(taskId10, 
taskId10Partitions, false, stateManager);
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId01Partitions).build();
 
-        final Map<TaskId, Set<TopicPartition>> assignmentActive = 
singletonMap(taskId00, taskId00Partitions);
-        final Map<TaskId, Set<TopicPartition>> assignmentStandby = 
singletonMap(taskId10, taskId10Partitions);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(consumer.assignment()).thenReturn(assignment);
+        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(stateUpdater.tasks()).thenReturn(Set.of(task01));
 
-        when(activeTaskCreator.createTasks(any(), 
eq(assignmentActive))).thenReturn(singleton(task00));
-        
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
+        // mock to remove standby task from state updater
+        final CompletableFuture<StateUpdater.RemovedTaskResult> future = new 
CompletableFuture<>();
+        when(stateUpdater.remove(task01.id())).thenReturn(future);
+        future.complete(new StateUpdater.RemovedTaskResult(task01));
 
-        taskManager.handleAssignment(assignmentActive, assignmentStandby);
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(task10.state(), is(Task.State.RUNNING));
+        final Map<TaskId, Set<TopicPartition>> assignmentActive = 
singletonMap(taskId00, taskId00Partitions);
 
         taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
 
-        assertThat(task00.commitNeeded, is(true));
+        verify(task00, never()).prepareCommit(anyBoolean());
+        verify(task00, never()).postCommit(anyBoolean());
+
+        verify(stateUpdater).remove(task01.id());
+        verify(task01).suspend();
+        verify(task01).closeClean();
+
+        verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
     public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.CREATED)
+            .withInputPartitions(taskId00Partitions)
+            .build();
 
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        when(activeTaskCreator.createTasks(consumer, taskId00Assignment))
+            .thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(task00.state(), is(Task.State.CREATED));
+        verify(tasks).addPendingTasksToInit(singletonList(task00));
 
+        // when handle revocation is called, the tasks in pendingTasksToInit 
are NOT affected
+        // by revocation. They remain in the pending queue untouched
         taskManager.handleRevocation(taskId00Partitions);
-        assertThat(task00.state(), is(Task.State.SUSPENDED));
 
+        // tasks in pendingTasksToInit are not managed by handleRevocation
+        verify(task00, never()).suspend();
+        verify(task00, never()).prepareCommit(anyBoolean());
+
+        when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
+
+        // this calls handleTasksPendingInitialization()
+        // which drains pendingTasksToInit and closes those tasks
         taskManager.handleAssignment(emptyMap(), emptyMap());
-        assertThat(task00.state(), is(Task.State.CLOSED));
+
+        // close clean without ever being committed
+        verify(task00).closeClean();
+        verify(task00, never()).prepareCommit(anyBoolean());
     }
 
     @Test
     public void shouldPassUpIfExceptionDuringSuspend() {
-        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new RuntimeException("KABOOM!");
-            }
-        };
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
+        doThrow(new RuntimeException("KABOOM!")).when(task00).suspend();
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00));
+
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         assertThrows(RuntimeException.class, () -> 
taskManager.handleRevocation(taskId00Partitions));
-        assertThat(task00.state(), is(Task.State.SUSPENDED));
+
+        verify(task00).suspend();
     }
 
     @Test
@@ -4805,9 +4823,26 @@ public class TaskManagerTest {
 
     @Test
     public void shouldListNotPausedTasks() {
-        handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
+
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        when(stateUpdater.tasks()).thenReturn(Collections.emptySet());
 
         assertEquals(2, taskManager.notPausedTasks().size());
+        assertTrue(taskManager.notPausedTasks().containsKey(taskId00));
+        assertTrue(taskManager.notPausedTasks().containsKey(taskId01));
 
         topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
 

Reply via email to