This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 959818f982a KAFKA-19683: More test replacements [6/N] (#20858)
959818f982a is described below
commit 959818f982a9c8736d1a74e32d091e240bd577e0
Author: Shashank <[email protected]>
AuthorDate: Fri Nov 14 04:41:53 2025 -0800
KAFKA-19683: More test replacements [6/N] (#20858)
Rewrote more tests in `TaskManagerTest.java`
Reviewers: Lucas Brutschy <[email protected]>
---
.../processor/internals/TaskManagerTest.java | 490 +++++++++------------
1 file changed, 219 insertions(+), 271 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 01aa1d26541..dc323f019b1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -85,7 +85,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -1928,7 +1927,7 @@ public class TaskManagerTest {
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
@@ -2180,7 +2179,7 @@ public class TaskManagerTest {
@Test
public void
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
- final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
taskManager.handleLostAll();
@@ -3323,94 +3322,26 @@ public class TaskManagerTest {
}
private void
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final
ProcessingMode processingMode) {
- final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(processingMode, null, false);
-
- final TopicPartition changelog = new TopicPartition("changelog", 0);
- final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions),
- mkEntry(taskId03, taskId03Partitions)
- );
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
- @Override
- public Set<TopicPartition> changelogPartitions() {
- return singleton(changelog);
- }
- };
- final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false);
- final AtomicBoolean closedDirtyTask02 = new AtomicBoolean(false);
- final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false);
- final Task task01 = new StateMachineTask(taskId01, taskId01Partitions,
true, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new TaskMigratedException("migrated", new
RuntimeException("cause"));
- }
- @Override
- public void closeDirty() {
- super.closeDirty();
- closedDirtyTask01.set(true);
- }
- };
- final Task task02 = new StateMachineTask(taskId02, taskId02Partitions,
true, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new RuntimeException("oops");
- }
-
- @Override
- public void closeDirty() {
- super.closeDirty();
- closedDirtyTask02.set(true);
- }
- };
- final Task task03 = new StateMachineTask(taskId03, taskId03Partitions,
true, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new RuntimeException("oops");
- }
-
- @Override
- public void closeDirty() {
- super.closeDirty();
- closedDirtyTask03.set(true);
- }
- };
-
- when(activeTaskCreator.createTasks(any(), eq(assignment)))
- .thenReturn(asList(task00, task01, task02, task03));
-
- taskManager.handleAssignment(assignment, emptyMap());
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions).build();
- assertThat(task00.state(), is(Task.State.CREATED));
- assertThat(task01.state(), is(Task.State.CREATED));
- assertThat(task02.state(), is(Task.State.CREATED));
- assertThat(task03.state(), is(Task.State.CREATED));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(processingMode, tasks);
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+ doThrow(new TaskMigratedException("migrated", new
RuntimeException("cause")))
+ .when(task01).suspend();
+ doThrow(new RuntimeException("oops"))
+ .when(task02).suspend();
- assertThat(task00.state(), is(Task.State.RESTORING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
- assertThat(task03.state(), is(Task.State.RUNNING));
- assertThat(
- taskManager.activeTaskMap(),
- Matchers.equalTo(
- mkMap(
- mkEntry(taskId00, task00),
- mkEntry(taskId01, task01),
- mkEntry(taskId02, task02),
- mkEntry(taskId03, task03)
- )
- )
- );
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(changeLogReader).enforceRestoreActive();
- verify(changeLogReader).completedChangelogs();
+ when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
final RuntimeException exception = assertThrows(
RuntimeException.class,
@@ -3418,62 +3349,51 @@ public class TaskManagerTest {
);
assertThat(exception.getCause().getMessage(), is("oops"));
- assertThat(closedDirtyTask01.get(), is(true));
- assertThat(closedDirtyTask02.get(), is(true));
- assertThat(closedDirtyTask03.get(), is(true));
- assertThat(task00.state(), is(Task.State.CLOSED));
- assertThat(task01.state(), is(Task.State.CLOSED));
- assertThat(task02.state(), is(Task.State.CLOSED));
- assertThat(task03.state(), is(Task.State.CLOSED));
+ // Verify tasks that threw exceptions were closed dirty
+ verify(task00).prepareCommit(true);
+ verify(task00).suspend();
+ verify(task00).closeClean();
+ verify(task01).prepareCommit(true);
+ verify(task01, times(2)).suspend();
+ verify(task01).closeDirty();
+ verify(task02).prepareCommit(true);
+ verify(task02, times(2)).suspend();
+ verify(task02).closeDirty();
+
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- // the active task creator should also get closed (so that it closes
the thread producer if applicable)
verify(activeTaskCreator).close();
+ verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
}
@Test
public void
shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanShutdown() {
- final TopicPartition changelog = new TopicPartition("changelog", 0);
- final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
- mkEntry(taskId00, taskId00Partitions)
- );
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
- @Override
- public Set<TopicPartition> changelogPartitions() {
- return singleton(changelog);
- }
- };
-
- when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(singletonList(task00));
- doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
- taskManager.handleAssignment(assignment, emptyMap());
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- assertThat(task00.state(), is(Task.State.CREATED));
+ doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+ when(tasks.activeTasks()).thenReturn(Set.of(task00));
- assertThat(task00.state(), is(Task.State.RESTORING));
- assertThat(
- taskManager.activeTaskMap(),
- Matchers.equalTo(
- mkMap(
- mkEntry(taskId00, task00)
- )
- )
+ final RuntimeException exception = assertThrows(
+ RuntimeException.class,
+ () -> taskManager.shutdown(true)
);
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(changeLogReader).enforceRestoreActive();
- verify(changeLogReader).completedChangelogs();
- final RuntimeException exception =
assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
-
- assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(exception.getMessage(), is("whatever"));
+
+ verify(task00).prepareCommit(true);
+ verify(task00).suspend();
+ verify(task00).closeClean();
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- // the active task creator should also get closed (so that it closes
the thread producer if applicable)
verify(activeTaskCreator).close();
+ verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
}
@Test
@@ -3509,31 +3429,35 @@ public class TaskManagerTest {
@Test
public void
shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager);
+ // will not be revoked
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new RuntimeException("task 0_1 suspend boom!");
- }
- };
+ // will be revoked and throws exception during suspend
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
+ doThrow(new RuntimeException("task 0_1 suspend
boom!")).when(task01).suspend();
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
+ // will be revoked with no exception
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions).build();
- taskManager.addTask(task00);
- taskManager.addTask(task01);
- taskManager.addTask(task02);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
final RuntimeException thrown = assertThrows(RuntimeException.class,
() -> taskManager.handleRevocation(union(HashSet::new,
taskId01Partitions, taskId02Partitions)));
- assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend
boom!"));
- assertThat(task00.state(), is(Task.State.CREATED));
- assertThat(task01.state(), is(Task.State.SUSPENDED));
- assertThat(task02.state(), is(Task.State.SUSPENDED));
+ assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend
boom!"));
+ verify(task01).suspend();
+ verify(task02).suspend();
+ verify(task00, never()).suspend();
verifyNoInteractions(activeTaskCreator);
}
@@ -3607,29 +3531,36 @@ public class TaskManagerTest {
@Test
public void shouldCloseStandbyTasksOnShutdown() {
- final Map<TaskId, Set<TopicPartition>> assignment =
singletonMap(taskId00, taskId00Partitions);
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
false, stateManager);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final StandbyTask standbyTask00 = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
- // `handleAssignment`
-
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
+ when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask00));
+ when(stateUpdater.standbyTasks()).thenReturn(Set.of(standbyTask00));
- taskManager.handleAssignment(emptyMap(), assignment);
- assertThat(task00.state(), is(Task.State.CREATED));
+ final CompletableFuture<StateUpdater.RemovedTaskResult>
futureForStandbyTask = new CompletableFuture<>();
+ when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
- assertThat(taskManager.standbyTaskMap(),
Matchers.equalTo(singletonMap(taskId00, task00)));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ futureForStandbyTask.complete(new
StateUpdater.RemovedTaskResult(standbyTask00)); // simulate successful removal
taskManager.shutdown(true);
- assertThat(task00.state(), is(Task.State.CLOSED));
- assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+
+ verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+
+ verify(tasks).addTask(standbyTask00);
+
+ verify(standbyTask00).prepareCommit(true);
+ verify(standbyTask00).postCommit(true);
+ verify(standbyTask00).suspend();
+ verify(standbyTask00).closeClean();
+
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
verify(activeTaskCreator).close();
- // `tryToCompleteRestoration`
- verify(consumer).assignment();
- verify(consumer).resume(eq(emptySet()));
+ verifyNoInteractions(consumer);
}
@Test
@@ -3739,36 +3670,29 @@ public class TaskManagerTest {
}
@Test
- public void shouldInitializeNewActiveTasks() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- when(consumer.assignment()).thenReturn(assignment);
+ public void shouldInitializeNewStandbyTasks() {
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.CREATED)
+ .withInputPartitions(taskId01Partitions)
+ .build();
- when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
- .thenReturn(singletonList(task00));
+ final Map<TaskId, Set<TopicPartition>> assignment = taskId01Assignment;
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task01));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(taskManager.activeTaskMap(),
Matchers.equalTo(singletonMap(taskId00, task00)));
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- // verifies that we actually resume the assignment at the end of
restoration.
- verify(consumer).resume(assignment);
- }
+ taskManager.handleAssignment(emptyMap(), assignment);
- @Test
- public void shouldInitialiseNewStandbyTasks() {
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, false, stateManager);
+ verify(tasks).addPendingTasksToInit(singletonList(task01));
- when(consumer.assignment()).thenReturn(assignment);
-
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
+ when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task01));
- taskManager.handleAssignment(emptyMap(), taskId01Assignment);
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
- assertThat(taskManager.standbyTaskMap(),
Matchers.equalTo(singletonMap(taskId01, task01)));
+ verify(task01).initializeIfNeeded();
+ verify(stateUpdater).add(task01);
+ verifyNoInteractions(consumer);
}
@Test
@@ -3945,14 +3869,25 @@ public class TaskManagerTest {
@Test
public void shouldCommitViaConsumerIfEosDisabled() {
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
- task01.setCommittableOffsetsAndMetadata(offsets);
- task01.setCommitNeeded();
- taskManager.addTask(task01);
- taskManager.commitAll();
+ when(task01.commitNeeded()).thenReturn(true);
+ when(task01.prepareCommit(true)).thenReturn(offsets);
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task01));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ assertThat(taskManager.commitAll(), equalTo(1));
+
+ verify(task01, times(2)).commitNeeded();
+ verify(task01).prepareCommit(true);
+ verify(task01).postCommit(false);
verify(consumer).commitSync(offsets);
}
@@ -4007,50 +3942,48 @@ public class TaskManagerTest {
@Test
public void shouldPropagateExceptionFromActiveCommit() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
- @Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
- throw new RuntimeException("opsh.");
- }
- };
-
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ when(task00.commitNeeded()).thenReturn(true);
+ when(task00.prepareCommit(true)).thenThrow(new
RuntimeException("opsh."));
- assertThat(task00.state(), is(Task.State.RUNNING));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00));
- task00.setCommitNeeded();
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final RuntimeException thrown =
- assertThrows(RuntimeException.class, () ->
taskManager.commitAll());
+ assertThrows(RuntimeException.class, taskManager::commitAll);
assertThat(thrown.getMessage(), equalTo("opsh."));
+
+ verify(task00).commitNeeded();
+ verify(task00).prepareCommit(true);
}
@Test
public void shouldPropagateExceptionFromStandbyCommit() {
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, false, stateManager) {
- @Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
- throw new RuntimeException("opsh.");
- }
- };
-
- when(consumer.assignment()).thenReturn(assignment);
-
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
- taskManager.handleAssignment(emptyMap(), taskId01Assignment);
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ when(task01.commitNeeded()).thenReturn(true);
+ when(task01.prepareCommit(true)).thenThrow(new
RuntimeException("opsh."));
- assertThat(task01.state(), is(Task.State.RUNNING));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task01));
- task01.setCommitNeeded();
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final RuntimeException thrown =
assertThrows(RuntimeException.class, () ->
taskManager.commitAll());
assertThat(thrown.getMessage(), equalTo("opsh."));
+
+ verify(task01).commitNeeded();
+ verify(task01).prepareCommit(true);
}
@Test
@@ -4062,27 +3995,24 @@ public class TaskManagerTest {
final InOrder inOrder = inOrder(adminClient);
- final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
- @Override
- public Map<TopicPartition, Long> purgeableOffsets() {
- return purgableOffsets;
- }
- };
-
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ when(task00.purgeableOffsets())
+ .thenReturn(new HashMap<>())
+ .thenReturn(singletonMap(t1p1, 5L))
+ .thenReturn(singletonMap(t1p1, 17L));
- assertThat(task00.state(), is(Task.State.RUNNING));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00));
- purgableOffsets.put(t1p1, 5L);
- taskManager.maybePurgeCommittedRecords();
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- purgableOffsets.put(t1p1, 17L);
- taskManager.maybePurgeCommittedRecords();
+ taskManager.maybePurgeCommittedRecords(); // no-op
+ taskManager.maybePurgeCommittedRecords(); // sends purge for offset 5L
+ taskManager.maybePurgeCommittedRecords(); // sends purge for offset 17L
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1,
RecordsToDelete.beforeOffset(5L)));
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1,
RecordsToDelete.beforeOffset(17L)));
@@ -4095,29 +4025,27 @@ public class TaskManagerTest {
when(adminClient.deleteRecords(singletonMap(t1p1,
RecordsToDelete.beforeOffset(5L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1,
futureDeletedRecords)));
- final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
- @Override
- public Map<TopicPartition, Long> purgeableOffsets() {
- return purgableOffsets;
- }
- };
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(task00.purgeableOffsets())
+ .thenReturn(new HashMap<>())
+ .thenReturn(singletonMap(t1p1, 5L))
+ .thenReturn(singletonMap(t1p1, 17L));
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00));
- assertThat(task00.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- purgableOffsets.put(t1p1, 5L);
+ taskManager.maybePurgeCommittedRecords();
taskManager.maybePurgeCommittedRecords();
// this call should be a no-op.
- // this is verified, as there is no expectation on adminClient for
this second call,
- // so it would fail verification if we invoke the admin client again.
- purgableOffsets.put(t1p1, 17L);
+ // because the previous deleteRecords request
+ // has not completed yet, so no new request is sent.
taskManager.maybePurgeCommittedRecords();
}
@@ -4425,7 +4353,6 @@ public class TaskManagerTest {
@Test
public void shouldPunctuateActiveTasks() {
-
final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RUNNING)
@@ -4818,45 +4745,66 @@ public class TaskManagerTest {
@Test
public void shouldConvertActiveTaskToStandbyTask() {
- final StreamTask activeTask = mock(StreamTask.class);
- when(activeTask.id()).thenReturn(taskId00);
- when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
- when(activeTask.isActive()).thenReturn(true);
-
- final StandbyTask standbyTask = mock(StandbyTask.class);
- when(standbyTask.id()).thenReturn(taskId00);
+ final StreamTask activeTaskToRecycle = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StandbyTask recycledStandbyTask = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.CREATED)
+ .withInputPartitions(taskId00Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
- when(standbyTaskCreator.createStandbyTaskFromActive(any(),
eq(taskId00Partitions))).thenReturn(standbyTask);
+ when(activeTaskCreator.createTasks(consumer,
taskId00Assignment)).thenReturn(singletonList(activeTaskToRecycle));
+
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId00Partitions))
+ .thenReturn(recycledStandbyTask);
+ // create active task
taskManager.handleAssignment(taskId00Assignment,
Collections.emptyMap());
+
+ // convert active to standby
+ when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
+ when(stateUpdater.remove(activeTaskToRecycle.id())).thenReturn(future);
+ future.complete(new
StateUpdater.RemovedTaskResult(activeTaskToRecycle));
+
taskManager.handleAssignment(Collections.emptyMap(),
taskId00Assignment);
- verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
+ verify(activeTaskCreator).createTasks(consumer, emptyMap());
verify(standbyTaskCreator,
times(2)).createTasks(Collections.emptyMap());
- verifyNoInteractions(consumer);
+
verify(standbyTaskCreator).createStandbyTaskFromActive(activeTaskToRecycle,
taskId00Partitions);
+
verify(tasks).addPendingTasksToInit(Collections.singleton(recycledStandbyTask));
}
@Test
public void shouldConvertStandbyTaskToActiveTask() {
- final StandbyTask standbyTask = mock(StandbyTask.class);
- when(standbyTask.id()).thenReturn(taskId00);
- when(standbyTask.isActive()).thenReturn(false);
-
when(standbyTask.prepareCommit(true)).thenReturn(Collections.emptyMap());
-
- final StreamTask activeTask = mock(StreamTask.class);
- when(activeTask.id()).thenReturn(taskId00);
- when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
-
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTask));
- when(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask),
eq(taskId00Partitions), any()))
- .thenReturn(activeTask);
+ final StandbyTask standbyTaskToRecycle = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions).build();
+ final StreamTask recycledActiveTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.CREATED)
+ .withInputPartitions(taskId00Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTaskToRecycle));
+
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle,
taskId00Partitions, consumer))
+ .thenReturn(recycledActiveTask);
+
+ // create standby task
taskManager.handleAssignment(Collections.emptyMap(),
taskId00Assignment);
+
+ // convert standby to active
+ when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
+
when(stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(future);
+ future.complete(new
StateUpdater.RemovedTaskResult(standbyTaskToRecycle));
+
taskManager.handleAssignment(taskId00Assignment,
Collections.emptyMap());
- verify(activeTaskCreator, times(2)).createTasks(any(), eq(emptyMap()));
+ verify(activeTaskCreator, times(2)).createTasks(consumer, emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
- verifyNoInteractions(consumer);
+
verify(activeTaskCreator).createActiveTaskFromStandby(standbyTaskToRecycle,
taskId00Partitions, consumer);
+
verify(tasks).addPendingTasksToInit(Collections.singleton(recycledActiveTask));
}
@Test