This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 59f51fb3cac KAFKA-19683: Remove more dead tests and rewrote 3 tests in
TaskManagerTest [2/N] (#20544)
59f51fb3cac is described below
commit 59f51fb3cac66d8096e647eb90be6c5e4ba9f485
Author: Shashank <[email protected]>
AuthorDate: Fri Oct 10 06:31:28 2025 -0700
KAFKA-19683: Remove more dead tests and rewrote 3 tests in TaskManagerTest
[2/N] (#20544)
Changes made
- Additional `setUpTaskManager()` overloaded method -- Created this
temporarily to pass the CI pipelines so that I can work on the failing
tests incrementally
- Rewrote 3 tests to use stateUpdater thread
Reviewers: Lucas Brutschy <[email protected]>
---
.../processor/internals/TaskManagerTest.java | 360 ++++++++++-----------
1 file changed, 163 insertions(+), 197 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 8d83d1e99fa..f89faffd971 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -61,8 +61,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -216,25 +214,38 @@ public class TaskManagerTest {
@BeforeEach
public void setUp() {
- taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
+ taskManager =
setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
null, false);
}
- private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TasksRegistry tasks) {
- return setUpTaskManager(processingMode, tasks, false);
+ private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode
processingMode, final TasksRegistry tasks) {
+ return setUpTaskManagerWithStateUpdater(processingMode, tasks, false);
}
- private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final boolean stateUpdaterEnabled) {
- return setUpTaskManager(processingMode, null, stateUpdaterEnabled,
false);
- }
-
- private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TasksRegistry tasks, final boolean stateUpdaterEnabled) {
- return setUpTaskManager(processingMode, tasks, stateUpdaterEnabled,
false);
+ private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode
processingMode,
+ final TasksRegistry
tasks,
+ final boolean
processingThreadsEnabled) {
+ topologyMetadata = new TopologyMetadata(topologyBuilder, new
DummyStreamsConfig(processingMode));
+ final TaskManager taskManager = new TaskManager(
+ time,
+ changeLogReader,
+ ProcessId.randomProcessId(),
+ "taskManagerTest",
+ activeTaskCreator,
+ standbyTaskCreator,
+ tasks != null ? tasks : new Tasks(new LogContext()),
+ topologyMetadata,
+ adminClient,
+ stateDirectory,
+ stateUpdater,
+ processingThreadsEnabled ? schedulingTaskManager : null
+ );
+ taskManager.setMainConsumer(consumer);
+ return taskManager;
}
- private TaskManager setUpTaskManager(final ProcessingMode processingMode,
- final TasksRegistry tasks,
- final boolean stateUpdaterEnabled,
- final boolean
processingThreadsEnabled) {
+ private TaskManager setUpTaskManagerWithoutStateUpdater(final
ProcessingMode processingMode,
+ final
TasksRegistry tasks,
+ final boolean
processingThreadsEnabled) {
topologyMetadata = new TopologyMetadata(topologyBuilder, new
DummyStreamsConfig(processingMode));
final TaskManager taskManager = new TaskManager(
time,
@@ -247,7 +258,7 @@ public class TaskManagerTest {
topologyMetadata,
adminClient,
stateDirectory,
- stateUpdaterEnabled ? stateUpdater : null,
+ null,
processingThreadsEnabled ? schedulingTaskManager : null
);
taskManager.setMainConsumer(consumer);
@@ -261,7 +272,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
when(tasks.task(taskId00)).thenReturn(activeTask1);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -283,7 +294,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -296,14 +307,14 @@ public class TaskManagerTest {
@Test
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.handleAssignment(
mkMap(mkEntry(taskId00, taskId00Partitions)),
- mkMap(mkEntry(taskId01, taskId01Partitions))
+ mkMap(mkEntry(taskId01, taskId01Partitions))
);
verify(schedulingTaskManager).lockTasks(Set.of(taskId00, taskId01));
@@ -319,7 +330,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -339,7 +350,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -359,7 +370,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
taskManager.resumePollingForPartitionsWithAvailableSpace();
@@ -377,7 +388,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
taskManager.updateLags();
@@ -392,7 +403,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -412,7 +423,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@@ -433,7 +444,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -453,7 +464,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@@ -474,7 +485,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
@@ -501,7 +512,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new
CompletableFuture<>();
when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
@@ -529,7 +540,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId03Partitions))
.thenReturn(recycledStandbyTask);
@@ -553,7 +564,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
activeTaskToRecycle.inputPartitions()))
.thenThrow(new RuntimeException());
@@ -583,7 +594,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle,
taskId03Partitions, consumer))
.thenReturn(recycledActiveTask);
@@ -607,7 +618,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
when(activeTaskCreator.createActiveTaskFromStandby(
standbyTaskToRecycle,
@@ -637,7 +648,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
@@ -656,7 +667,7 @@ public class TaskManagerTest {
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
@@ -676,7 +687,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle));
final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
@@ -706,7 +717,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle));
final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
@@ -736,7 +747,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign));
final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
when(stateUpdater.remove(failedActiveTaskToReassign.id()))
@@ -769,7 +780,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
@@ -795,7 +806,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
taskManager.handleAssignment(
@@ -813,7 +824,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask));
taskManager.handleAssignment(
@@ -837,7 +848,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose,
standbyTaskToRecycle));
final CompletableFuture<StateUpdater.RemovedTaskResult>
futureForActiveTaskToClose = new CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
@@ -872,7 +883,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
runningActiveTask)));
@@ -896,7 +907,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
activeTask)));
assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03,
activeTask)));
@@ -908,7 +919,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final Set<Task> createdTasks = Set.of(activeTaskToBeCreated);
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
mkEntry(activeTaskToBeCreated.id(),
activeTaskToBeCreated.inputPartitions()));
@@ -926,7 +937,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final Set<Task> createdTasks = Set.of(standbyTaskToBeCreated);
when(standbyTaskCreator.createTasks(mkMap(
mkEntry(standbyTaskToBeCreated.id(),
standbyTaskToBeCreated.inputPartitions())))
@@ -953,7 +964,7 @@ public class TaskManagerTest {
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
.thenReturn(standbyTask);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
@@ -964,28 +975,6 @@ public class TaskManagerTest {
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
- @Test
- public void
shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdaterDisabled()
{
- final StreamTask activeTaskToRecycle = statefulTask(taskId01,
taskId01ChangelogPartitions)
- .withInputPartitions(taskId01Partitions)
- .inState(State.RUNNING).build();
- final StandbyTask standbyTask = standbyTask(taskId01,
taskId01ChangelogPartitions)
- .withInputPartitions(taskId01Partitions)
- .inState(State.CREATED).build();
- final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(activeTaskToRecycle));
-
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
- .thenReturn(standbyTask);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
-
- taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
-
- verify(activeTaskToRecycle).prepareCommit(true);
- verify(tasks).replaceActiveWithStandby(standbyTask);
- verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
- verify(standbyTaskCreator).createTasks(Collections.emptyMap());
- }
-
@Test
public void
shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled()
{
final StandbyTask standbyTaskToRecycle = standbyTask(taskId03,
taskId03ChangelogPartitions)
@@ -993,7 +982,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1014,7 +1003,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
@@ -1032,7 +1021,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
@@ -1052,7 +1041,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions,
newInputPartitions)).thenReturn(true);
@@ -1072,7 +1061,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
@@ -1090,7 +1079,7 @@ public class TaskManagerTest {
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
@@ -1112,7 +1101,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
@@ -1137,7 +1126,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(
@@ -1163,7 +1152,7 @@ public class TaskManagerTest {
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00,
task01));
- taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -1185,7 +1174,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00,
task01));
final LockException lockException = new LockException("Where are my
keys??");
doThrow(lockException).when(task00).initializeIfNeeded();
- taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -1209,7 +1198,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00,
task01));
doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
- taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -1255,7 +1244,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
final RuntimeException runtimeException = new
RuntimeException("KABOOM!");
doThrow(runtimeException).when(task00).initializeIfNeeded();
- taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
final StreamsException streamsException = assertThrows(
StreamsException.class,
@@ -1281,7 +1270,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0,
statefulTask1, statefulTask2));
doThrow(new
TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
doThrow(new
TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
@@ -1302,7 +1291,7 @@ public class TaskManagerTest {
public void
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
}
@@ -1311,7 +1300,7 @@ public class TaskManagerTest {
public void
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit()
{
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.hasPendingTasksToInit()).thenReturn(true);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
}
@@ -1319,7 +1308,7 @@ public class TaskManagerTest {
@Test
public void
shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit()
{
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
}
@@ -1539,7 +1528,7 @@ public class TaskManagerTest {
private TaskManager setupForRevocationAndLost(final Set<Task>
tasksInStateUpdater,
final TasksRegistry tasks) {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater);
return taskManager;
@@ -1608,12 +1597,12 @@ public class TaskManagerTest {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks);
- return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE,
tasks);
}
@Test
public void
shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@@ -1631,7 +1620,7 @@ public class TaskManagerTest {
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final StreamsException thrown = assertThrows(
StreamsException.class,
@@ -1658,7 +1647,7 @@ public class TaskManagerTest {
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0,
exceptionAndTasks1));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final TaskCorruptedException thrown = assertThrows(
TaskCorruptedException.class,
@@ -1742,7 +1731,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
@@ -1787,7 +1776,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask,
restoringStatefulTask));
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
@@ -1819,7 +1808,7 @@ public class TaskManagerTest {
when(runningStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p0changelog,
changelogOffsetOfRunningTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
assertThat(
@@ -1850,7 +1839,7 @@ public class TaskManagerTest {
final Map<TopicPartition, Long> changelogOffsetInCheckpoint =
mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
taskManager.handleRebalanceStart(singleton("topic"));
@@ -1868,7 +1857,7 @@ public class TaskManagerTest {
final Map<TopicPartition, Long> changelogOffsetInCheckpoint =
mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
taskManager.handleRebalanceStart(singleton("topic"));
@@ -1893,7 +1882,7 @@ public class TaskManagerTest {
when(restoringStandbyTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p2changelog,
changelogOffsetOfRestoringStandbyTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask,
restoringStatefulTask));
@@ -1918,7 +1907,7 @@ public class TaskManagerTest {
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
));
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
@@ -1955,17 +1944,23 @@ public class TaskManagerTest {
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
15L));
+ final StandbyTask standbyTask = standbyTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
+ when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets);
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ taskManager =
setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE,
tasks, false);
+
+ when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask));
+
expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
taskManager.handleRebalanceStart(singleton("topic"));
- final StateMachineTask restoringTask = handleAssignment(
- emptyMap(),
- taskId00Assignment,
- emptyMap()
- ).get(taskId00);
- restoringTask.setChangelogOffsets(changelogOffsets);
+ taskManager.handleAssignment(emptyMap(), taskId00Assignment);
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}
@@ -2174,7 +2169,7 @@ public class TaskManagerTest {
@Test
public void
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
taskManager.handleLostAll();
@@ -2190,7 +2185,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
@@ -2281,38 +2276,43 @@ public class TaskManagerTest {
@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
- final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
-
- final StateMachineTask corruptedTask = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- final StateMachineTask nonCorruptedTask = new
StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
+ final StreamTask corruptedTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- final Map<TaskId, Set<TopicPartition>> firstAssignment = new
HashMap<>(taskId00Assignment);
- firstAssignment.putAll(taskId01Assignment);
+ final StreamTask nonCorruptedTask = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
- // `handleAssignment`
- when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
- .thenReturn(asList(corruptedTask, nonCorruptedTask));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.task(taskId00)).thenReturn(corruptedTask);
+ when(tasks.allTasksPerId()).thenReturn(mkMap(
+ mkEntry(taskId00, corruptedTask),
+ mkEntry(taskId01, nonCorruptedTask)
+ ));
+ when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
- when(consumer.assignment())
- .thenReturn(assignment)
- .thenReturn(taskId00Partitions);
+ when(nonCorruptedTask.commitNeeded()).thenReturn(true);
+ when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
+ when(corruptedTask.prepareCommit(false)).thenReturn(emptyMap());
+ doNothing().when(corruptedTask).postCommit(anyBoolean());
- taskManager.handleAssignment(firstAssignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
tp -> assertThat(tp, is(empty()))), is(true));
+ when(consumer.assignment()).thenReturn(taskId00Partitions);
- assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
- nonCorruptedTask.setCommitNeeded();
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
- taskManager.handleCorruption(singleton(taskId00));
+ taskManager.handleCorruption(Set.of(taskId00));
- assertTrue(nonCorruptedTask.commitPrepared);
- assertThat(nonCorruptedTask.partitionsForOffsetReset,
equalTo(Collections.emptySet()));
- assertThat(corruptedTask.partitionsForOffsetReset,
equalTo(taskId00Partitions));
+ verify(nonCorruptedTask).prepareCommit(true);
+ verify(nonCorruptedTask, never()).addPartitionsForOffsetReset(any());
+ verify(corruptedTask).addPartitionsForOffsetReset(taskId00Partitions);
+ verify(corruptedTask).changelogPartitions();
+ verify(corruptedTask).postCommit(true);
// check that we should not commit empty map either
verify(consumer, never()).commitSync(emptyMap());
- verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
@@ -2359,7 +2359,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02,
corruptedTask)));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(consumer.assignment()).thenReturn(intersection(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
taskManager.handleCorruption(Set.of(taskId02));
@@ -2372,37 +2372,6 @@ public class TaskManagerTest {
verify(standbyTask, never()).postCommit(anyBoolean());
}
- @Test
- public void
shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStandbyTasksWithStateUpdaterDisabled()
{
- final StreamTask activeRestoringTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
- .withInputPartitions(taskId00Partitions)
- .inState(State.RESTORING).build();
- final StandbyTask standbyTask = standbyTask(taskId01,
taskId01ChangelogPartitions)
- .withInputPartitions(taskId01Partitions)
- .inState(State.RUNNING).build();
- when(standbyTask.commitNeeded()).thenReturn(true);
- final StreamTask corruptedTask = statefulTask(taskId02,
taskId02ChangelogPartitions)
- .withInputPartitions(taskId02Partitions)
- .inState(State.RUNNING).build();
- final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
- mkEntry(taskId00, activeRestoringTask),
- mkEntry(taskId01, standbyTask),
- mkEntry(taskId02, corruptedTask)
- ));
- when(tasks.task(taskId02)).thenReturn(corruptedTask);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
- when(consumer.assignment()).thenReturn(intersection(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
- taskManager.handleCorruption(Set.of(taskId02));
-
- verify(activeRestoringTask, never()).commitNeeded();
- verify(activeRestoringTask, never()).prepareCommit(true);
- verify(activeRestoringTask, never()).postCommit(anyBoolean());
- verify(standbyTask).prepareCommit(true);
- verify(standbyTask).postCommit(anyBoolean());
- }
-
@Test
public void
shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
@@ -2545,7 +2514,7 @@ public class TaskManagerTest {
@Test
public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
{
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
@@ -2677,7 +2646,7 @@ public class TaskManagerTest {
@Test
public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
{
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
@@ -2919,7 +2888,7 @@ public class TaskManagerTest {
@Test
public void
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
final StreamsProducer producer = mock(StreamsProducer.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -3076,7 +3045,7 @@ public class TaskManagerTest {
@Test
public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
@@ -3212,7 +3181,7 @@ public class TaskManagerTest {
}
private void
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final
ProcessingMode processingMode) {
- final TaskManager taskManager = setUpTaskManager(processingMode, null,
false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(processingMode, null, false);
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
@@ -3367,7 +3336,7 @@ public class TaskManagerTest {
@Test
public void
shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
- setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2,
null, false);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
false, stateManager);
@@ -3533,7 +3502,7 @@ public class TaskManagerTest {
new ExceptionAndTask(new RuntimeException(),
failedStatefulTask),
new ExceptionAndTask(new RuntimeException(),
failedStandbyTask))
);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.shutdown(true);
@@ -3547,7 +3516,7 @@ public class TaskManagerTest {
@Test
public void shouldShutdownSchedulingTaskManager() {
final TasksRegistry tasks = mock(TasksRegistry.class);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.shutdown(true);
@@ -3596,7 +3565,7 @@ public class TaskManagerTest {
new ExceptionAndTask(new StreamsException("KABOOM!"),
removedFailedStatefulTaskDuringRemoval),
new ExceptionAndTask(new StreamsException("KABOOM!"),
removedFailedStandbyTaskDuringRemoval)
));
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
futureForRemovedStatefulTask.complete(new
StateUpdater.RemovedTaskResult(removedStatefulTask));
futureForRemovedStandbyTask.complete(new
StateUpdater.RemovedTaskResult(removedStandbyTask));
futureForRemovedFailedStatefulTask
@@ -3825,7 +3794,7 @@ public class TaskManagerTest {
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
@@ -4248,28 +4217,30 @@ public class TaskManagerTest {
@Test
public void shouldPunctuateActiveTasks() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
- @Override
- public boolean maybePunctuateStreamTime() {
- return true;
- }
- @Override
- public boolean maybePunctuateSystemTime() {
- return true;
- }
- };
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(task00.maybePunctuateStreamTime()).thenReturn(true);
+ when(task00.maybePunctuateSystemTime()).thenReturn(true);
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.activeTasks()).thenReturn(Set.of(task00));
- assertThat(task00.state(), is(Task.State.RUNNING));
+ when(stateUpdater.restoresActiveTasks()).thenReturn(false);
+ when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(false);
+
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of());
+
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of());
+
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
// one for stream and one for system time
assertThat(taskManager.punctuate(), equalTo(2));
+
+ verify(task00).maybePunctuateStreamTime();
+ verify(task00).maybePunctuateSystemTime();
}
@Test
@@ -4543,7 +4514,7 @@ public class TaskManagerTest {
@Test
public void
shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
+ final TaskManager taskManager =
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null,
false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -4748,7 +4719,7 @@ public class TaskManagerTest {
@Test
public void
shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() {
final Tasks taskRegistry = new Tasks(new LogContext());
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
final StandbyTask startupTask = standbyTask(taskId00,
taskId00ChangelogPartitions).build();
final StreamTask activeTask = statefulTask(taskId00,
taskId00ChangelogPartitions).build();
@@ -4786,7 +4757,7 @@ public class TaskManagerTest {
@Test
public void
shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() {
final Tasks taskRegistry = new Tasks(new LogContext());
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
final StandbyTask startupTask = standbyTask(taskId00,
taskId00ChangelogPartitions).build();
when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
@@ -4816,16 +4787,11 @@ public class TaskManagerTest {
assertEquals(Collections.singletonMap(taskId00, startupTask),
taskManager.standbyTaskMap());
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void shouldStartStateUpdaterOnInit(final boolean
stateUpdaterEnabled) {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled);
+ @Test
+ public void shouldStartStateUpdaterOnInit() {
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
taskManager.init();
- if (stateUpdaterEnabled) {
- verify(stateUpdater).start();
- } else {
- verify(stateUpdater, never()).start();
- }
+ verify(stateUpdater).start();
}
private static KafkaFutureImpl<DeletedRecords> completedFuture() {