This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ea88a8ec3f0 KAFKA-19683: Clean up TaskManagerTest (#20994)
ea88a8ec3f0 is described below
commit ea88a8ec3f06c89b2c4d72221b6530eae5ab2f79
Author: Shashank <[email protected]>
AuthorDate: Mon Dec 1 04:52:47 2025 -0800
KAFKA-19683: Clean up TaskManagerTest (#20994)
Final cleanup of `TaskManagerTest.java`
- Renamed `setUpTaskManagerWithStateUpdater` to `setUpTaskManager`
- Removed `WithStateUpdaterEnabled` from test names
- Removed the method `setUpTaskManagerWithoutStateUpdater`
- Rewrote the newly added test
`shouldCloseTasksIfStateUpdaterTimesOutOnRemove` to use new mocking
patterns
- Completely removed the utility class `StateMachineTask` and also
`handleAssignment` method
Reviewers: Lucas Brutschy <[email protected]>
---
.../processor/internals/TaskManagerTest.java | 645 ++++++---------------
1 file changed, 162 insertions(+), 483 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7cfa428d337..63638d12c9b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -38,14 +37,12 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
@@ -76,13 +73,10 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -123,7 +117,6 @@ import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -212,16 +205,16 @@ public class TaskManagerTest {
@BeforeEach
public void setUp() {
- taskManager =
setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
null, false);
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
}
- private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode
processingMode, final TasksRegistry tasks) {
- return setUpTaskManagerWithStateUpdater(processingMode, tasks, false);
+ private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TasksRegistry tasks) {
+ return setUpTaskManager(processingMode, tasks, false);
}
- private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode
processingMode,
- final TasksRegistry
tasks,
- final boolean
processingThreadsEnabled) {
+ private TaskManager setUpTaskManager(final ProcessingMode processingMode,
+ final TasksRegistry tasks,
+ final boolean
processingThreadsEnabled) {
topologyMetadata = new TopologyMetadata(topologyBuilder, new
DummyStreamsConfig(processingMode));
final TaskManager taskManager = new TaskManager(
time,
@@ -241,35 +234,13 @@ public class TaskManagerTest {
return taskManager;
}
- private TaskManager setUpTaskManagerWithoutStateUpdater(final
ProcessingMode processingMode,
- final
TasksRegistry tasks,
- final boolean
processingThreadsEnabled) {
- topologyMetadata = new TopologyMetadata(topologyBuilder, new
DummyStreamsConfig(processingMode));
- final TaskManager taskManager = new TaskManager(
- time,
- changeLogReader,
- ProcessId.randomProcessId(),
- "taskManagerTest",
- activeTaskCreator,
- standbyTaskCreator,
- tasks != null ? tasks : new Tasks(new LogContext()),
- topologyMetadata,
- adminClient,
- stateDirectory,
- null,
- processingThreadsEnabled ? schedulingTaskManager : null
- );
- taskManager.setMainConsumer(consumer);
- return taskManager;
- }
-
@Test
public void shouldLockAllTasksOnCorruptionWithProcessingThreads() {
final StreamTask activeTask1 = statefulTask(taskId00,
taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
when(tasks.task(taskId00)).thenReturn(activeTask1);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -291,7 +262,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -304,7 +275,7 @@ public class TaskManagerTest {
@Test
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -327,7 +298,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -347,7 +318,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -367,7 +338,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
taskManager.resumePollingForPartitionsWithAvailableSpace();
@@ -385,7 +356,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
taskManager.updateLags();
@@ -400,7 +371,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -420,7 +391,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -441,7 +412,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -461,7 +432,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -482,7 +453,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
@@ -509,7 +480,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
@@ -537,7 +508,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId03Partitions))
.thenReturn(recycledStandbyTask);
@@ -561,7 +532,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
activeTaskToRecycle.inputPartitions()))
.thenThrow(new RuntimeException());
@@ -591,7 +562,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle,
taskId03Partitions, consumer))
.thenReturn(recycledActiveTask);
@@ -615,7 +586,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
when(activeTaskCreator.createActiveTaskFromStandby(
standbyTaskToRecycle,
@@ -645,7 +616,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
@@ -664,7 +635,7 @@ public class TaskManagerTest {
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
@@ -684,7 +655,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle));
final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
@@ -714,7 +685,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle));
final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
@@ -744,7 +715,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign));
final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
when(stateUpdater.remove(failedActiveTaskToReassign.id()))
@@ -777,7 +748,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
@@ -803,7 +774,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
taskManager.handleAssignment(
@@ -821,7 +792,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask));
taskManager.handleAssignment(
@@ -845,7 +816,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose,
standbyTaskToRecycle));
final CompletableFuture<StateUpdater.RemovedTaskResult>
futureForActiveTaskToClose = new CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
@@ -880,7 +851,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
runningActiveTask)));
@@ -904,7 +875,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
activeTask)));
assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03,
activeTask)));
@@ -916,7 +887,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final Set<Task> createdTasks = Set.of(activeTaskToBeCreated);
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
mkEntry(activeTaskToBeCreated.id(),
activeTaskToBeCreated.inputPartitions()));
@@ -934,7 +905,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final Set<Task> createdTasks = Set.of(standbyTaskToBeCreated);
when(standbyTaskCreator.createTasks(mkMap(
mkEntry(standbyTaskToBeCreated.id(),
standbyTaskToBeCreated.inputPartitions())))
@@ -950,7 +921,7 @@ public class TaskManagerTest {
}
@Test
- public void
shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled()
{
+ public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInit() {
final StreamTask activeTaskToRecycle = statefulTask(taskId01,
taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
@@ -961,7 +932,7 @@ public class TaskManagerTest {
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
.thenReturn(standbyTask);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
@@ -973,13 +944,13 @@ public class TaskManagerTest {
}
@Test
- public void
shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled()
{
+ public void
shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistry() {
final StandbyTask standbyTaskToRecycle = standbyTask(taskId03,
taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -995,12 +966,12 @@ public class TaskManagerTest {
}
@Test
- public void
shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdaterEnabled()
{
+ public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanly() {
final StreamTask activeTaskToClose = statefulTask(taskId03,
taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
@@ -1013,12 +984,12 @@ public class TaskManagerTest {
}
@Test
- public void
shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistryWithStateUpdaterEnabled()
{
+ public void
shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistry() {
final StandbyTask standbyTaskToClose = standbyTask(taskId03,
taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
@@ -1032,13 +1003,13 @@ public class TaskManagerTest {
}
@Test
- public void
shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStateUpdaterEnabled()
{
+ public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitions()
{
final StreamTask activeTaskToUpdateInputPartitions =
statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions,
newInputPartitions)).thenReturn(true);
@@ -1053,12 +1024,12 @@ public class TaskManagerTest {
}
@Test
- public void
shouldResumeActiveRunningTaskInTasksRegistryWithStateUpdaterEnabled() {
+ public void shouldResumeActiveRunningTaskInTasksRegistry() {
final StreamTask activeTaskToResume = statefulTask(taskId03,
taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
@@ -1076,7 +1047,7 @@ public class TaskManagerTest {
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
@@ -1092,13 +1063,13 @@ public class TaskManagerTest {
}
@Test
- public void
shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistryWithStateUpdaterEnabled()
{
+ public void
shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistry()
{
final StandbyTask standbyTaskToUpdateInputPartitions =
standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
@@ -1115,7 +1086,7 @@ public class TaskManagerTest {
}
@Test
- public void
shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
+ public void shouldAssignMultipleTasksInTasksRegistry() {
final StreamTask activeTaskToClose = statefulTask(taskId03,
taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
@@ -1123,7 +1094,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(
@@ -1149,7 +1120,7 @@ public class TaskManagerTest {
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00,
task01));
- taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -1171,7 +1142,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00,
task01));
final LockException lockException = new LockException("Where are my
keys??");
doThrow(lockException).when(task00).initializeIfNeeded();
- taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -1198,7 +1169,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00,
task01));
final TimeoutException timeoutException = new TimeoutException("Timed
out!");
doThrow(timeoutException).when(task00).initializeIfNeeded();
- taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -1225,7 +1196,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00,
task01));
doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
- taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -1263,7 +1234,7 @@ public class TaskManagerTest {
}
@Test
- public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() {
+ public void shouldRethrowRuntimeExceptionInInitTask() {
final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.CREATED).build();
@@ -1271,7 +1242,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
final RuntimeException runtimeException = new
RuntimeException("KABOOM!");
doThrow(runtimeException).when(task00).initializeIfNeeded();
- taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
final StreamsException streamsException = assertThrows(
StreamsException.class,
@@ -1297,7 +1268,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0,
statefulTask1, statefulTask2));
doThrow(new
TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
doThrow(new
TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
@@ -1318,7 +1289,7 @@ public class TaskManagerTest {
public void
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
}
@@ -1327,7 +1298,7 @@ public class TaskManagerTest {
public void
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit()
{
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.hasPendingTasksToInit()).thenReturn(true);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
}
@@ -1335,7 +1306,7 @@ public class TaskManagerTest {
@Test
public void
shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit()
{
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
}
@@ -1555,7 +1526,7 @@ public class TaskManagerTest {
private TaskManager setupForRevocationAndLost(final Set<Task>
tasksInStateUpdater,
final TasksRegistry tasks) {
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater);
return taskManager;
@@ -1624,12 +1595,12 @@ public class TaskManagerTest {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks);
- return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE,
tasks);
+ return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
}
@Test
- public void
shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
+ public void shouldReturnCorrectBooleanWhenTryingToCompleteRestoration() {
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null, false);
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@@ -1647,7 +1618,7 @@ public class TaskManagerTest {
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final StreamsException thrown = assertThrows(
StreamsException.class,
@@ -1674,7 +1645,7 @@ public class TaskManagerTest {
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0,
exceptionAndTasks1));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final TaskCorruptedException thrown = assertThrows(
TaskCorruptedException.class,
@@ -1746,7 +1717,7 @@ public class TaskManagerTest {
public void shouldPauseAllTopicsOnRebalanceComplete() {
final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null);
taskManager.handleRebalanceComplete();
verify(consumer).pause(assigned);
@@ -1758,7 +1729,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
@@ -1780,7 +1751,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask,
restoringStatefulTask));
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
@@ -1814,7 +1785,7 @@ public class TaskManagerTest {
);
when(runningStatefulTask.changelogOffsets()).thenReturn(changelogOffsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
assertThat(
@@ -1837,7 +1808,7 @@ public class TaskManagerTest {
when(restoringStatefulTask.changelogOffsets())
.thenReturn(changelogOffsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
@@ -1857,7 +1828,7 @@ public class TaskManagerTest {
final Map<TopicPartition, Long> changelogOffsetInCheckpoint =
mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
taskManager.handleRebalanceStart(singleton("topic"));
@@ -1875,7 +1846,7 @@ public class TaskManagerTest {
final Map<TopicPartition, Long> changelogOffsetInCheckpoint =
mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
taskManager.handleRebalanceStart(singleton("topic"));
@@ -1900,7 +1871,7 @@ public class TaskManagerTest {
when(restoringStandbyTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p2changelog,
changelogOffsetOfRestoringStandbyTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask,
restoringStatefulTask));
@@ -1925,7 +1896,7 @@ public class TaskManagerTest {
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
@@ -1952,7 +1923,7 @@ public class TaskManagerTest {
when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask));
@@ -2000,7 +1971,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task)));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
@@ -2060,7 +2031,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleAssignment(emptyMap(), emptyMap());
@@ -2081,7 +2052,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final RuntimeException thrown = assertThrows(
RuntimeException.class,
@@ -2124,7 +2095,7 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00, taskId01);
expectDirectoryNotEmpty(taskId00, taskId01);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), is(Set.of(taskId00,
taskId01)));
@@ -2155,7 +2126,7 @@ public class TaskManagerTest {
@Test
public void
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, null, false);
taskManager.handleLostAll();
@@ -2171,7 +2142,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
@@ -2208,7 +2179,7 @@ public class TaskManagerTest {
when(consumer.assignment()).thenReturn(taskId00Partitions);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleCorruption(singleton(taskId00));
@@ -2239,7 +2210,7 @@ public class TaskManagerTest {
when(task00.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
doThrow(new RuntimeException("oops")).when(task00).suspend();
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleCorruption(singleton(taskId00));
@@ -2280,7 +2251,7 @@ public class TaskManagerTest {
when(consumer.assignment()).thenReturn(taskId00Partitions);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleCorruption(Set.of(taskId00));
@@ -2308,7 +2279,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02,
corruptedTask)));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(consumer.assignment()).thenReturn(intersection(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
taskManager.handleCorruption(Set.of(taskId02));
@@ -2347,7 +2318,7 @@ public class TaskManagerTest {
doNothing().when(corruptedStandby).suspend();
doNothing().when(corruptedStandby).postCommit(anyBoolean());
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThrows(TaskMigratedException.class, () ->
taskManager.handleCorruption(singleton(taskId00)));
@@ -2392,7 +2363,7 @@ public class TaskManagerTest {
when(consumer.assignment()).thenReturn(taskId00Partitions);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRebalanceStart(singleton(topic1));
assertThat(taskManager.rebalanceInProgress(), is(true));
@@ -2459,7 +2430,7 @@ public class TaskManagerTest {
doThrow(new
TimeoutException()).when(producer).commitTransaction(offsets, groupMetadata);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
taskManager.handleCorruption(singleton(taskId00));
@@ -2532,7 +2503,7 @@ public class TaskManagerTest {
doThrow(new TimeoutException()).when(consumer).commitSync(offsets);
when(consumer.assignment()).thenReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleCorruption(singleton(taskId00));
@@ -2614,7 +2585,7 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
doThrow(new
TimeoutException()).when(consumer).commitSync(expectedCommittedOffsets);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
@@ -2707,7 +2678,7 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
doThrow(new
TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets,
groupMetadata);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
taskManager.handleRevocation(taskId00Partitions);
@@ -2751,7 +2722,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
- taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(task00));
@@ -2788,7 +2759,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
when(tasks.hasPendingTasksToInit()).thenReturn(false);
- taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(task01));
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
@@ -2824,7 +2795,7 @@ public class TaskManagerTest {
when(tasks.hasPendingTasksToInit()).thenReturn(false);
when(tasks.updateActiveTaskInputPartitions(task00,
newPartitionsSet)).thenReturn(true);
- taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(emptySet());
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
@@ -2850,7 +2821,7 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = taskId00Assignment;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
// first, we need to handle assignment -- creates tasks and adds to
pending initialization
when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(singletonList(task00));
@@ -2888,7 +2859,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions)
@@ -2931,7 +2902,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00));
@@ -2962,7 +2933,7 @@ public class TaskManagerTest {
when(task00.commitNeeded()).thenReturn(true);
when(task00.prepareCommit(true)).thenReturn(offsets);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
@@ -3028,7 +2999,7 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(offsets00);
expectedCommittedOffsets.putAll(offsets01);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
taskManager.handleRevocation(taskId00Partitions);
@@ -3093,7 +3064,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
@@ -3148,7 +3119,7 @@ public class TaskManagerTest {
when(task01.commitNeeded()).thenReturn(true); // only task01 needs
commit
when(task02.commitNeeded()).thenReturn(false);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(processingMode, tasks);
+ final TaskManager taskManager = setUpTaskManager(processingMode,
tasks);
taskManager.handleRevocation(taskId00Partitions);
@@ -3171,7 +3142,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
when(stateUpdater.tasks()).thenReturn(Set.of(task01));
@@ -3205,7 +3176,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
when(stateUpdater.tasks()).thenReturn(Set.of(task01));
@@ -3238,7 +3209,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(activeTaskCreator.createTasks(consumer, taskId00Assignment))
.thenReturn(singletonList(task00));
@@ -3277,7 +3248,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThrows(RuntimeException.class, () ->
taskManager.handleRevocation(taskId00Partitions));
@@ -3308,7 +3279,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(processingMode, tasks);
+ final TaskManager taskManager = setUpTaskManager(processingMode,
tasks);
doThrow(new TaskMigratedException("migrated", new
RuntimeException("cause")))
.when(task01).suspend();
@@ -3348,7 +3319,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
@@ -3373,14 +3344,14 @@ public class TaskManagerTest {
@SuppressWarnings("unchecked")
@Test
public void shouldCloseTasksIfStateUpdaterTimesOutOnRemove() throws
Exception {
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
- final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
- mkEntry(taskId00, taskId00Partitions)
- );
- final Task task00 = spy(new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager));
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
- when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(singletonList(task00));
- taskManager.handleAssignment(assignment, emptyMap());
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(stateUpdater.tasks()).thenReturn(singleton(task00));
final CompletableFuture<StateUpdater.RemovedTaskResult> future =
mock(CompletableFuture.class);
@@ -3406,7 +3377,7 @@ public class TaskManagerTest {
doThrow(new RuntimeException("task 0_1 suspend
boom!")).when(task01).suspend();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(task00, task01));
@@ -3450,7 +3421,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
@@ -3481,7 +3452,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
doThrow(new TaskMigratedException("migrated", new
RuntimeException("cause")))
.when(task01).suspend();
@@ -3527,7 +3498,7 @@ public class TaskManagerTest {
final CompletableFuture<StateUpdater.RemovedTaskResult>
futureForStandbyTask = new CompletableFuture<>();
when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
futureForStandbyTask.complete(new
StateUpdater.RemovedTaskResult(standbyTask00)); // simulate successful removal
@@ -3560,7 +3531,7 @@ public class TaskManagerTest {
new ExceptionAndTask(new RuntimeException(),
failedStandbyTask))
)
.thenReturn(Collections.emptyList());
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.shutdown(true);
@@ -3574,7 +3545,7 @@ public class TaskManagerTest {
@Test
public void shouldShutdownSchedulingTaskManager() {
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.shutdown(true);
@@ -3624,7 +3595,7 @@ public class TaskManagerTest {
new ExceptionAndTask(new StreamsException("KABOOM!"),
removedFailedStatefulTaskDuringRemoval),
new ExceptionAndTask(new StreamsException("KABOOM!"),
removedFailedStandbyTaskDuringRemoval))
).thenReturn(Collections.emptyList());
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
futureForRemovedStatefulTask.complete(new
StateUpdater.RemovedTaskResult(removedStatefulTask));
futureForRemovedStandbyTask.complete(new
StateUpdater.RemovedTaskResult(removedStandbyTask));
futureForRemovedFailedStatefulTask
@@ -3664,7 +3635,7 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = taskId01Assignment;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task01));
@@ -3714,7 +3685,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThat(taskManager.commitAll(), equalTo(2));
@@ -3774,7 +3745,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThat(taskManager.commit(Set.of(task00, task02, task03, task05)),
equalTo(2));
@@ -3810,7 +3781,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThat(taskManager.commitAll(), equalTo(1));
@@ -3838,7 +3809,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRebalanceStart(emptySet());
@@ -3867,7 +3838,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThat(taskManager.commitAll(), equalTo(1));
@@ -3912,7 +3883,7 @@ public class TaskManagerTest {
final ConsumerGroupMetadata groupMetadata =
mock(ConsumerGroupMetadata.class);
when(consumer.groupMetadata()).thenReturn(groupMetadata);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
taskManager.commitAll();
@@ -3939,7 +3910,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final RuntimeException thrown =
assertThrows(RuntimeException.class, taskManager::commitAll);
@@ -3962,7 +3933,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final RuntimeException thrown =
assertThrows(RuntimeException.class, () ->
taskManager.commitAll());
@@ -3994,7 +3965,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.maybePurgeCommittedRecords(); // no-op
taskManager.maybePurgeCommittedRecords(); // sends purge for offset 5L
@@ -4024,7 +3995,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.maybePurgeCommittedRecords();
taskManager.maybePurgeCommittedRecords();
@@ -4052,7 +4023,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.maybePurgeCommittedRecords();
taskManager.maybePurgeCommittedRecords();
@@ -4109,7 +4080,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
// maybeCommitActiveTasksPerUserRequested checks if any task has both
commitRequested AND commitNeeded
// If found, commits all active running tasks that have commitNeeded
@@ -4169,7 +4140,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.activeTasks()).thenReturn(Set.of(task00, task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
// check that we should be processing at most max num records
assertThat(taskManager.process(3, time), is(6));
@@ -4214,7 +4185,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
// should only process 2 records, because task01 throws
TimeoutException
assertThat(taskManager.process(1, time), is(2));
@@ -4241,7 +4212,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.activeTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThrows(TaskMigratedException.class, () -> taskManager.process(1,
time));
}
@@ -4258,7 +4229,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.activeTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final StreamsException exception =
assertThrows(StreamsException.class, () -> taskManager.process(1, time));
assertThat(exception.taskId().isPresent(), is(true));
@@ -4279,7 +4250,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.activeTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThrows(TaskMigratedException.class, taskManager::punctuate);
}
@@ -4296,7 +4267,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.activeTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
assertThrows(KafkaException.class, taskManager::punctuate);
}
@@ -4314,7 +4285,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.activeTasks()).thenReturn(Set.of(task00));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
// one for stream and one for system time
assertThat(taskManager.punctuate(), equalTo(2));
@@ -4326,7 +4297,7 @@ public class TaskManagerTest {
@Test
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
// mock that the state updater is still restoring active tasks
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@@ -4347,7 +4318,7 @@ public class TaskManagerTest {
when(task00.prepareCommit(false)).thenReturn(offsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasks()).thenReturn(Set.of(task00));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(TaskManager.class)) {
@@ -4385,7 +4356,7 @@ public class TaskManagerTest {
.when(migratedTask02).suspend();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01,
migratedTask02));
@@ -4431,7 +4402,7 @@ public class TaskManagerTest {
.when(migratedTask02).suspend();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01,
migratedTask02));
@@ -4476,7 +4447,7 @@ public class TaskManagerTest {
.when(migratedTask02).suspend();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01,
migratedTask02));
@@ -4522,53 +4493,6 @@ public class TaskManagerTest {
assertThat(taskManager.producerMetrics(), is(dummyProducerMetrics));
}
- private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId,
Set<TopicPartition>> runningActiveAssignment,
- final Map<TaskId,
Set<TopicPartition>> standbyAssignment,
- final Map<TaskId,
Set<TopicPartition>> restoringActiveAssignment) {
- final Set<Task> runningTasks =
runningActiveAssignment.entrySet().stream()
- .map(t -> new
StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
- .collect(Collectors.toSet());
- final Set<Task> standbyTasks = standbyAssignment.entrySet().stream()
- .map(t -> new
StateMachineTask(t.getKey(), t.getValue(), false, stateManager))
- .collect(Collectors.toSet());
- final Set<Task> restoringTasks =
restoringActiveAssignment.entrySet().stream()
- .map(t -> new
StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
- .collect(Collectors.toSet());
- // give the restoring tasks some uncompleted changelog partitions so
they'll stay in restoring
- restoringTasks.forEach(t -> ((StateMachineTask)
t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L)));
-
- // Initially assign only the active tasks we want to complete
restoration
- final Map<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new
HashMap<>(runningActiveAssignment);
- allActiveTasksAssignment.putAll(restoringActiveAssignment);
- final Set<Task> allActiveTasks = new HashSet<>(runningTasks);
- allActiveTasks.addAll(restoringTasks);
-
-
when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
- when(activeTaskCreator.createTasks(any(),
eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
-
- lenient().when(consumer.assignment()).thenReturn(assignment);
-
- taskManager.handleAssignment(allActiveTasksAssignment,
standbyAssignment);
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
- final Map<TaskId, StateMachineTask> allTasks = new HashMap<>();
-
- // Just make sure all tasks ended up in the expected state
- for (final Task task : runningTasks) {
- assertThat(task.state(), is(Task.State.RUNNING));
- allTasks.put(task.id(), (StateMachineTask) task);
- }
- for (final Task task : restoringTasks) {
- assertThat(task.state(), is(Task.State.RESTORING));
- allTasks.put(task.id(), (StateMachineTask) task);
- }
- for (final Task task : standbyTasks) {
- assertThat(task.state(), is(Task.State.RUNNING));
- allTasks.put(task.id(), (StateMachineTask) task);
- }
- return allTasks;
- }
-
private void expectLockObtainedFor(final TaskId... tasks) {
for (final TaskId task : tasks) {
when(stateDirectory.lock(task)).thenReturn(true);
@@ -4602,7 +4526,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
doThrow(new
CommitFailedException()).when(consumer).commitSync(offsets);
@@ -4640,7 +4564,7 @@ public class TaskManagerTest {
when(task01.commitNeeded()).thenReturn(false);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
doThrow(new
TimeoutException("KABOOM!")).doNothing().when(consumer).commitSync(any(Map.class));
@@ -4688,7 +4612,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks);
final TaskCorruptedException exception = assertThrows(
TaskCorruptedException.class,
@@ -4717,7 +4641,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
doThrow(new KafkaException()).when(consumer).commitSync(offsets);
@@ -4748,7 +4672,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
doThrow(new
RuntimeException("KABOOM")).when(consumer).commitSync(offsets);
@@ -4780,7 +4704,7 @@ public class TaskManagerTest {
when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final RuntimeException thrown = assertThrows(
RuntimeException.class,
@@ -4804,7 +4728,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(activeTaskCreator.createTasks(consumer,
taskId00Assignment)).thenReturn(singletonList(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId00Partitions))
@@ -4836,7 +4760,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTaskToRecycle));
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle,
taskId00Partitions, consumer))
@@ -4874,7 +4798,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Collections.emptySet());
@@ -4890,7 +4814,7 @@ public class TaskManagerTest {
@Test
public void shouldRecycleStartupTasksFromStateDirectoryAsActive() {
final Tasks taskRegistry = new Tasks(new LogContext());
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
final StandbyTask startupTask = standbyTask(taskId00,
taskId00ChangelogPartitions).build();
final StreamTask activeTask = statefulTask(taskId00,
taskId00ChangelogPartitions).build();
@@ -4928,7 +4852,7 @@ public class TaskManagerTest {
@Test
public void shouldUseStartupTasksFromStateDirectoryAsStandby() {
final Tasks taskRegistry = new Tasks(new LogContext());
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
final StandbyTask startupTask = standbyTask(taskId00,
taskId00ChangelogPartitions).build();
when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
@@ -4960,7 +4884,7 @@ public class TaskManagerTest {
@Test
public void shouldStartStateUpdaterOnInit() {
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null);
taskManager.init();
verify(stateUpdater).start();
}
@@ -4996,249 +4920,4 @@ public class TaskManagerTest {
private File getCheckpointFile(final TaskId task) {
return new File(new File(testFolder.toAbsolutePath().toString(),
task.toString()), StateManagerUtil.CHECKPOINT_FILE_NAME);
}
-
- private static ConsumerRecord<byte[], byte[]> getConsumerRecord(final
TopicPartition topicPartition, final long offset) {
- return new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), offset, null, null);
- }
-
- private static class StateMachineTask extends AbstractTask implements Task
{
- private final boolean active;
-
- // TODO: KAFKA-12569 clean up usage of these flags and use the new
commitCompleted flag where appropriate
- private boolean commitNeeded = false;
- private boolean commitRequested = false;
- private boolean commitPrepared = false;
- private boolean commitCompleted = false;
- private Map<TopicPartition, OffsetAndMetadata> committableOffsets =
Collections.emptyMap();
- private Map<TopicPartition, Long> purgeableOffsets;
- private Map<TopicPartition, Long> changelogOffsets =
Collections.emptyMap();
- private Set<TopicPartition> partitionsForOffsetReset =
Collections.emptySet();
- private Long timeout = null;
-
- private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[],
byte[]>>> queue = new HashMap<>();
-
- StateMachineTask(final TaskId id,
- final Set<TopicPartition> partitions,
- final boolean active,
- final ProcessorStateManager processorStateManager) {
- super(id, null, null, processorStateManager, partitions, (new
TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task",
StateMachineTask.class);
- this.active = active;
- }
-
- @Override
- public void initializeIfNeeded() {
- if (state() == State.CREATED) {
- transitionTo(State.RESTORING);
- if (!active) {
- transitionTo(State.RUNNING);
- }
- }
- }
-
- @Override
- public void addPartitionsForOffsetReset(final Set<TopicPartition>
partitionsForOffsetReset) {
- this.partitionsForOffsetReset = partitionsForOffsetReset;
- }
-
- @Override
- public void completeRestoration(final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
- if (state() == State.RUNNING) {
- return;
- }
- transitionTo(State.RUNNING);
- }
-
- public void setCommitNeeded() {
- commitNeeded = true;
- }
-
- @Override
- public boolean commitNeeded() {
- return commitNeeded;
- }
-
- public void setCommitRequested() {
- commitRequested = true;
- }
-
- @Override
- public boolean commitRequested() {
- return commitRequested;
- }
-
- @Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
- commitPrepared = true;
-
- if (commitNeeded) {
- if (!clean) {
- return null;
- }
- return committableOffsets;
- } else {
- return Collections.emptyMap();
- }
- }
-
- @Override
- public void postCommit(final boolean enforceCheckpoint) {
- commitNeeded = false;
- commitCompleted = true;
- }
-
- @Override
- public void suspend() {
- if (state() == State.CLOSED) {
- throw new IllegalStateException("Illegal state " + state() + "
while suspending active task " + id);
- } else if (state() == State.SUSPENDED) {
- // do nothing
- } else {
- transitionTo(State.SUSPENDED);
- }
- }
-
- @Override
- public void resume() {
- if (state() == State.SUSPENDED) {
- transitionTo(State.RUNNING);
- }
- }
-
- @Override
- public void revive() {
- //TODO: KAFKA-12569 move clearing of commit-required statuses to
closeDirty/Clean/AndRecycle methods
- commitNeeded = false;
- commitRequested = false;
- super.revive();
- }
-
- @Override
- public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
- final Exception cause) {
- timeout = currentWallClockMs;
- }
-
- @Override
- public void clearTaskTimeout() {
- timeout = null;
- }
-
- @Override
- public void recordRestoration(final Time time, final long numRecords,
final boolean initRemaining) {
- // do nothing
- }
-
- @Override
- public void closeClean() {
- transitionTo(State.CLOSED);
- }
-
- @Override
- public void closeDirty() {
- transitionTo(State.CLOSED);
- }
-
- @Override
- public void prepareRecycle() {
- transitionTo(State.CLOSED);
- }
-
- @Override
- public void resumePollingForPartitionsWithAvailableSpace() {
- // noop
- }
-
- @Override
- public void updateLags() {
- // noop
- }
-
- @Override
- public void updateInputPartitions(final Set<TopicPartition>
topicPartitions, final Map<String, List<String>>
allTopologyNodesToSourceTopics) {
- inputPartitions = topicPartitions;
- }
-
- void setCommittableOffsetsAndMetadata(final Map<TopicPartition,
OffsetAndMetadata> committableOffsets) {
- if (!active) {
- throw new IllegalStateException("Cannot set
CommittableOffsetsAndMetadate for StandbyTasks");
- }
- this.committableOffsets = committableOffsets;
- }
-
- @Override
- public StateStore store(final String name) {
- return null;
- }
-
- @Override
- public Set<TopicPartition> changelogPartitions() {
- return changelogOffsets.keySet();
- }
-
- public boolean isActive() {
- return active;
- }
-
- void setPurgeableOffsets(final Map<TopicPartition, Long>
purgeableOffsets) {
- this.purgeableOffsets = purgeableOffsets;
- }
-
- @Override
- public Map<TopicPartition, Long> purgeableOffsets() {
- return purgeableOffsets;
- }
-
- void setChangelogOffsets(final Map<TopicPartition, Long>
changelogOffsets) {
- this.changelogOffsets = changelogOffsets;
- }
-
- @Override
- public Map<TopicPartition, Long> changelogOffsets() {
- return changelogOffsets;
- }
-
- @Override
- public Map<TopicPartition, Long> committedOffsets() {
- return Collections.emptyMap();
- }
-
- @Override
- public Map<TopicPartition, Long> highWaterMark() {
- return Collections.emptyMap();
- }
-
- @Override
- public Optional<Long> timeCurrentIdlingStarted() {
- return Optional.empty();
- }
-
- @Override
- public void addRecords(final TopicPartition partition, final
Iterable<ConsumerRecord<byte[], byte[]>> records) {
- if (isActive()) {
- final Deque<ConsumerRecord<byte[], byte[]>> partitionQueue =
- queue.computeIfAbsent(partition, k -> new LinkedList<>());
-
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- partitionQueue.add(record);
- }
- } else {
- throw new IllegalStateException("Can't add records to an
inactive task.");
- }
- }
-
- @Override
- public boolean process(final long wallClockTime) {
- if (isActive() && state() == State.RUNNING) {
- for (final LinkedList<ConsumerRecord<byte[], byte[]>> records
: queue.values()) {
- final ConsumerRecord<byte[], byte[]> record =
records.poll();
- if (record != null) {
- return true;
- }
- }
- return false;
- } else {
- throw new IllegalStateException("Can't process an inactive or
non-running task.");
- }
- }
- }
}