This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22db7b9f4c3 KAFKA-19683: More test replacements [5/N] (#20818)
22db7b9f4c3 is described below
commit 22db7b9f4c3f9e936a0eb6457b91edda01f44099
Author: Shashank <[email protected]>
AuthorDate: Fri Nov 7 03:18:54 2025 -0800
KAFKA-19683: More test replacements [5/N] (#20818)
Replaced more tests in `TaskManagerTest.java`
Reviewers: Lucas Brutschy <[email protected]>
---
.../processor/internals/TaskManagerTest.java | 381 +++++++++++----------
1 file changed, 208 insertions(+), 173 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 0156d101ed8..7db9c957fe0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -61,6 +61,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -2911,49 +2913,50 @@ public class TaskManagerTest {
@Test
public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
- final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
- mkEntry(taskId00, taskId00Partitions)
- );
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
- @Override
- public void completeRestoration(final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
- throw new TimeoutException("timeout!");
- }
- };
-
- when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(singletonList(task00));
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING)
+ .build();
- taskManager.handleAssignment(assignment, emptyMap());
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- assertThat(task00.state(), is(Task.State.CREATED));
+ when(stateUpdater.restoresActiveTasks()).thenReturn(true);
+
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00));
+ final TimeoutException timeoutException = new
TimeoutException("timeout!");
+ doThrow(timeoutException).when(task00).completeRestoration(any());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(false));
+ final boolean restorationComplete =
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- assertThat(task00.state(), is(Task.State.RESTORING));
- assertThat(
- taskManager.activeTaskMap(),
- Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
- );
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(changeLogReader).enforceRestoreActive();
+ assertFalse(restorationComplete);
+ verify(task00).completeRestoration(any());
+ verify(stateUpdater).add(task00);
+ verify(tasks, never()).addTask(task00);
verifyNoInteractions(consumer);
}
@Test
public void shouldSuspendActiveTasksDuringRevocation() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00));
+
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets);
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(task00.commitNeeded()).thenReturn(true);
+ when(task00.prepareCommit(true)).thenReturn(offsets);
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.state(), is(Task.State.SUSPENDED));
+
+ verify(task00).prepareCommit(true);
+ verify(task00).postCommit(true);
+ verify(task00).suspend();
}
@SuppressWarnings("removal")
@@ -3041,218 +3044,233 @@ public class TaskManagerTest {
@Test
public void shouldCommitAllNeededTasksOnHandleRevocation() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
+ // revoked task that needs commit
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets00);
- task00.setCommitNeeded();
+ when(task00.commitNeeded()).thenReturn(true);
+ when(task00.prepareCommit(true)).thenReturn(offsets00);
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
+ // non revoked task that needs commit
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> offsets01 =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
- task01.setCommittableOffsetsAndMetadata(offsets01);
- task01.setCommitNeeded();
+ when(task01.commitNeeded()).thenReturn(true);
+ when(task01.prepareCommit(true)).thenReturn(offsets01);
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
- final Map<TopicPartition, OffsetAndMetadata> offsets02 =
singletonMap(t1p2, new OffsetAndMetadata(2L, null));
- task02.setCommittableOffsetsAndMetadata(offsets02);
+ // non revoked task that does NOT need commit
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .withInputPartitions(taskId02Partitions)
+ .inState(State.RUNNING)
+ .build();
+ when(task02.commitNeeded()).thenReturn(false);
- final StateMachineTask task10 = new StateMachineTask(taskId10,
taskId10Partitions, false, stateManager);
+ // standby task (not be affected by revocation)
+ final StandbyTask task03 = standbyTask(taskId03,
taskId03ChangelogPartitions)
+ .withInputPartitions(taskId03Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets
= new HashMap<>();
expectedCommittedOffsets.putAll(offsets00);
expectedCommittedOffsets.putAll(offsets01);
- final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions)
- );
-
- final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
- mkEntry(taskId10, taskId10Partitions)
- );
- when(consumer.assignment()).thenReturn(assignment);
-
- when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
- .thenReturn(asList(task00, task01, task02));
- when(standbyTaskCreator.createTasks(assignmentStandby))
- .thenReturn(singletonList(task10));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
- taskManager.handleAssignment(assignmentActive, assignmentStandby);
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
- assertThat(task10.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.commitNeeded, is(false));
- assertThat(task00.commitPrepared, is(true));
- assertThat(task01.commitNeeded, is(false));
- assertThat(task01.commitPrepared, is(true));
- assertThat(task02.commitPrepared, is(false));
- assertThat(task10.commitPrepared, is(false));
+ // both tasks needing commit had prepareCommit called
+ verify(task00).prepareCommit(true);
+ verify(task01).prepareCommit(true);
+ verify(task02, never()).prepareCommit(anyBoolean());
+ verify(task03, never()).prepareCommit(anyBoolean());
verify(consumer).commitSync(expectedCommittedOffsets);
- }
-
- @Test
- public void shouldNotCommitIfNoRevokedTasksNeedCommitting() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
- task01.setCommitNeeded();
-
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
-
- final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions)
- );
-
- when(consumer.assignment()).thenReturn(assignment);
-
- when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
- .thenReturn(asList(task00, task01, task02));
-
- taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
+ // revoked task suspended
+ verify(task00).suspend();
+ verify(task00).postCommit(true);
- taskManager.handleRevocation(taskId00Partitions);
+ // non-revoked task with commit was also post-committed (but not
suspended)
+ verify(task01).postCommit(false);
+ verify(task01, never()).suspend();
- assertThat(task00.commitPrepared, is(false));
- assertThat(task01.commitPrepared, is(false));
- assertThat(task02.commitPrepared, is(false));
+ // task02 and task03 should not be affected
+ verify(task02, never()).postCommit(anyBoolean());
+ verify(task02, never()).suspend();
+ verify(task03, never()).postCommit(anyBoolean());
+ verify(task03, never()).suspend();
}
- @Test
- public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
- final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
-
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
-
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
- task01.setCommitNeeded();
+ @ParameterizedTest
+ @EnumSource(ProcessingMode.class)
+ public void shouldNotCommitIfNoRevokedTasksNeedCommitting(final
ProcessingMode processingMode) {
+ // task00 being revoked, no commit needed
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
+ // task01 NOT being revoked, commit needed
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
- final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions)
- );
+ // task02 NOT being revoked, no commit needed
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .withInputPartitions(taskId02Partitions)
+ .inState(State.RUNNING)
+ .build();
- when(consumer.assignment()).thenReturn(assignment);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
- when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
- .thenReturn(asList(task00, task01, task02));
+ when(task00.commitNeeded()).thenReturn(false);
+ when(task01.commitNeeded()).thenReturn(true); // only task01 needs
commit
+ when(task02.commitNeeded()).thenReturn(false);
- taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(processingMode, tasks);
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.commitPrepared, is(false));
- assertThat(task01.commitPrepared, is(false));
- assertThat(task02.commitPrepared, is(false));
+ verify(task00, never()).prepareCommit(anyBoolean());
+ verify(task01, never()).prepareCommit(anyBoolean());
+ verify(task02, never()).prepareCommit(anyBoolean());
+
+ verify(task00).suspend();
+ verify(task01, never()).suspend();
+ verify(task02, never()).suspend();
}
@Test
public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets00);
- task00.setCommitNeeded();
-
- final StateMachineTask task10 = new StateMachineTask(taskId10,
taskId10Partitions, false, stateManager);
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
- final Map<TaskId, Set<TopicPartition>> assignmentActive =
singletonMap(taskId00, taskId00Partitions);
- final Map<TaskId, Set<TopicPartition>> assignmentStandby =
singletonMap(taskId10, taskId10Partitions);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(consumer.assignment()).thenReturn(assignment);
+ when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(stateUpdater.tasks()).thenReturn(Set.of(task01));
- when(activeTaskCreator.createTasks(any(),
eq(assignmentActive))).thenReturn(singleton(task00));
-
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
+ final Map<TaskId, Set<TopicPartition>> assignmentActive =
singletonMap(taskId00, taskId00Partitions);
+ final Map<TaskId, Set<TopicPartition>> assignmentStandby =
singletonMap(taskId01, taskId01Partitions);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task10.state(), is(Task.State.RUNNING));
- taskManager.handleAssignment(assignmentActive, assignmentStandby);
+ // active task stays in task manager
+ verify(tasks, never()).removeTask(task00);
+ verify(task00, never()).prepareCommit(anyBoolean());
+ verify(task00, never()).postCommit(anyBoolean());
+
+ // standby task not removed from state updater
+ verify(stateUpdater, never()).remove(task01.id());
+ verify(task01, never()).prepareCommit(anyBoolean());
+ verify(task01, never()).postCommit(anyBoolean());
- assertThat(task00.commitNeeded, is(true));
- assertThat(task10.commitPrepared, is(false));
+ verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets00);
- task00.setCommitNeeded();
-
- final StateMachineTask task10 = new StateMachineTask(taskId10,
taskId10Partitions, false, stateManager);
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
- final Map<TaskId, Set<TopicPartition>> assignmentActive =
singletonMap(taskId00, taskId00Partitions);
- final Map<TaskId, Set<TopicPartition>> assignmentStandby =
singletonMap(taskId10, taskId10Partitions);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(consumer.assignment()).thenReturn(assignment);
+ when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(stateUpdater.tasks()).thenReturn(Set.of(task01));
- when(activeTaskCreator.createTasks(any(),
eq(assignmentActive))).thenReturn(singleton(task00));
-
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
+ // mock to remove standby task from state updater
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
+ when(stateUpdater.remove(task01.id())).thenReturn(future);
+ future.complete(new StateUpdater.RemovedTaskResult(task01));
- taskManager.handleAssignment(assignmentActive, assignmentStandby);
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task10.state(), is(Task.State.RUNNING));
+ final Map<TaskId, Set<TopicPartition>> assignmentActive =
singletonMap(taskId00, taskId00Partitions);
taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
- assertThat(task00.commitNeeded, is(true));
+ verify(task00, never()).prepareCommit(anyBoolean());
+ verify(task00, never()).postCommit(anyBoolean());
+
+ verify(stateUpdater).remove(task01.id());
+ verify(task01).suspend();
+ verify(task01).closeClean();
+
+ verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.CREATED)
+ .withInputPartitions(taskId00Partitions)
+ .build();
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ when(activeTaskCreator.createTasks(consumer, taskId00Assignment))
+ .thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(task00.state(), is(Task.State.CREATED));
+ verify(tasks).addPendingTasksToInit(singletonList(task00));
+ // when handle revocation is called, the tasks in pendingTasksToInit
are NOT affected
+ // by revocation. They remain in the pending queue untouched
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.state(), is(Task.State.SUSPENDED));
+ // tasks in pendingTasksToInit are not managed by handleRevocation
+ verify(task00, never()).suspend();
+ verify(task00, never()).prepareCommit(anyBoolean());
+
+ when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
+
+ // this calls handleTasksPendingInitialization()
+ // which drains pendingTasksToInit and closes those tasks
taskManager.handleAssignment(emptyMap(), emptyMap());
- assertThat(task00.state(), is(Task.State.CLOSED));
+
+ // close clean without ever being committed
+ verify(task00).closeClean();
+ verify(task00, never()).prepareCommit(anyBoolean());
}
@Test
public void shouldPassUpIfExceptionDuringSuspend() {
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new RuntimeException("KABOOM!");
- }
- };
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
+ doThrow(new RuntimeException("KABOOM!")).when(task00).suspend();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00));
+
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThrows(RuntimeException.class, () ->
taskManager.handleRevocation(taskId00Partitions));
- assertThat(task00.state(), is(Task.State.SUSPENDED));
+
+ verify(task00).suspend();
}
@Test
@@ -4805,9 +4823,26 @@ public class TaskManagerTest {
@Test
public void shouldListNotPausedTasks() {
- handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
+
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ when(stateUpdater.tasks()).thenReturn(Collections.emptySet());
assertEquals(2, taskManager.notPausedTasks().size());
+ assertTrue(taskManager.notPausedTasks().containsKey(taskId00));
+ assertTrue(taskManager.notPausedTasks().containsKey(taskId01));
topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);