This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f191e4781e MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436) f191e4781e is described below commit f191e4781ee16d131a58e4bccf5eb1fbd476ada6 Author: Bruno Cadonna <cado...@apache.org> AuthorDate: Tue Jul 26 10:12:20 2022 +0200 MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436) Reviewer: Guozhang Wang <wangg...@gmail.com> --- .../internals/DefaultStateUpdaterTest.java | 290 +++++++++------------ .../org/apache/kafka/test/StreamsTestUtils.java | 5 + 2 files changed, 125 insertions(+), 170 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 087559742b..adc417ede6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -45,6 +45,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask; +import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask; +import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.easymock.EasyMock.anyBoolean; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -123,12 +126,12 @@ class DefaultStateUpdaterTest { @Test public void shouldThrowIfStatelessTaskNotInStateRestoring() { - shouldThrowIfActiveTaskNotInStateRestoring(createStatelessTask(TASK_0_0)); + shouldThrowIfActiveTaskNotInStateRestoring(statelessTask(TASK_0_0).build()); } @Test public void shouldThrowIfStatefulTaskNotInStateRestoring() { - shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0))); + shouldThrowIfActiveTaskNotInStateRestoring(statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build()); } private void shouldThrowIfActiveTaskNotInStateRestoring(final StreamTask task) { @@ -137,7 +140,7 @@ class DefaultStateUpdaterTest { @Test public void shouldThrowIfStandbyTaskNotInStateRunning() { - final StandbyTask task = createStandbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).build(); shouldThrowIfTaskNotInGivenState(task, State.RUNNING); } @@ -152,29 +155,29 @@ class DefaultStateUpdaterTest { @Test public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task2 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldThrowIfAddingTasksWithSameId(task1, task2); } @Test public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception { - final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); + final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); shouldThrowIfAddingTasksWithSameId(task1, task2); } @Test public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); shouldThrowIfAddingTasksWithSameId(task1, task2); } @Test public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); shouldThrowIfAddingTasksWithSameId(task2, task1); } @@ -188,15 +191,15 @@ class DefaultStateUpdaterTest { @Test public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception { - final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0); + final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build(); shouldImmediatelyAddStatelessTasksToRestoredTasks(task1); } @Test public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception { - final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0); - final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_0_2); - final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0); + final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build(); + final StreamTask task2 = statelessTask(TASK_0_2).inState(State.RESTORING).build(); + final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build(); shouldImmediatelyAddStatelessTasksToRestoredTasks(task1, task2, task3); } @@ -217,7 +220,7 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreSingleActiveStatefulTask() throws Exception { final StreamTask task = - createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)); + statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_A_0)) @@ -243,9 +246,9 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreMultipleActiveStatefulTasks() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build(); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_C_0)) @@ -277,15 +280,15 @@ class DefaultStateUpdaterTest { public void shouldDrainRestoredActiveTasks() throws Exception { assertTrue(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty()); - final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0); + final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build(); stateUpdater.start(); stateUpdater.add(task1); verifyDrainingRestoredActiveTasks(task1); - final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_1_1); - final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0); - final StreamTask task4 = createStatelessTaskInStateRestoring(TASK_0_2); + final StreamTask task2 = statelessTask(TASK_1_1).inState(State.RESTORING).build(); + final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build(); + final StreamTask task4 = statelessTask(TASK_0_2).inState(State.RESTORING).build(); stateUpdater.add(task2); stateUpdater.add(task3); stateUpdater.add(task4); @@ -295,18 +298,16 @@ class DefaultStateUpdaterTest { @Test public void shouldUpdateSingleStandbyTask() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning( - TASK_0_0, - mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0) - ); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)) + .inState(State.RUNNING).build(); shouldUpdateStandbyTasks(task); } @Test public void shouldUpdateMultipleStandbyTasks() throws Exception { - final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); + final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); + final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); shouldUpdateStandbyTasks(task1, task2, task3); } @@ -331,10 +332,10 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); - final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_A_0)) @@ -361,9 +362,9 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + final StreamTask task3 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_A_0)) @@ -391,9 +392,9 @@ class DefaultStateUpdaterTest { @Test public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id())); final Map<TaskId, Task> updatingTasks1 = mkMap( @@ -419,9 +420,9 @@ class DefaultStateUpdaterTest { @Test public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -441,13 +442,13 @@ class DefaultStateUpdaterTest { @Test public void shouldRemoveActiveStatefulTask() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldRemoveStatefulTask(task); } @Test public void shouldRemoveStandbyTask() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldRemoveStatefulTask(task); } @@ -470,8 +471,8 @@ class DefaultStateUpdaterTest { @Test public void shouldRemovePausedTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); stateUpdater.start(); stateUpdater.add(task1); @@ -496,18 +497,18 @@ class DefaultStateUpdaterTest { @Test public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldNotRemoveTaskFromRestoredActiveTasks(task); } @Test public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception { - final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0); + final StreamTask task = statelessTask(TASK_0_0).inState(State.RESTORING).build(); shouldNotRemoveTaskFromRestoredActiveTasks(task); } private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -527,18 +528,18 @@ class DefaultStateUpdaterTest { @Test public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldNotRemoveTaskFromFailedTasks(task); } @Test public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldNotRemoveTaskFromFailedTasks(task); } private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); final StreamsException streamsException = new StreamsException("Something happened", task.id()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -568,22 +569,22 @@ class DefaultStateUpdaterTest { @Test public void shouldPauseActiveStatefulTask() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldPauseStatefulTask(task); verify(changelogReader, never()).transitToUpdateStandby(); } @Test public void shouldPauseStandbyTask() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldPauseStatefulTask(task); verify(changelogReader, times(1)).transitToUpdateStandby(); } @Test public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); stateUpdater.start(); stateUpdater.add(task1); @@ -629,8 +630,8 @@ class DefaultStateUpdaterTest { @Test public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -649,18 +650,18 @@ class DefaultStateUpdaterTest { @Test public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldNotPauseTaskInFailedTasks(task); } @Test public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldNotPauseTaskInFailedTasks(task); } private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); final StreamsException streamsException = new StreamsException("Something happened", task.id()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -689,13 +690,13 @@ class DefaultStateUpdaterTest { @Test public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldNotPauseTaskInRemovedTasks(task); } @Test public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldNotPauseTaskInRemovedTasks(task); } @@ -724,14 +725,14 @@ class DefaultStateUpdaterTest { @Test public void shouldResumeActiveStatefulTask() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldResumeStatefulTask(task); verify(changelogReader, times(2)).enforceRestoreActive(); } @Test public void shouldResumeStandbyTask() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldResumeStatefulTask(task); verify(changelogReader, times(2)).transitToUpdateStandby(); } @@ -765,8 +766,8 @@ class DefaultStateUpdaterTest { @Test public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -786,13 +787,13 @@ class DefaultStateUpdaterTest { @Test public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldNotPauseTaskInRemovedTasks(task); } @Test public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldNotResumeTaskInRemovedTasks(task); } @@ -817,18 +818,18 @@ class DefaultStateUpdaterTest { @Test public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); shouldNotPauseTaskInFailedTasks(task); } @Test public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); shouldNotResumeTaskInFailedTasks(task); } private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); final StreamsException streamsException = new StreamsException("Something happened", task.id()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -861,15 +862,15 @@ class DefaultStateUpdaterTest { when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); stateUpdater.add(task1); stateUpdater.remove(task1.id()); verifyDrainingRemovedTasks(task1); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)); + final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build(); + final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build(); stateUpdater.add(task2); stateUpdater.remove(task2.id()); stateUpdater.add(task3); @@ -882,8 +883,8 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); final String exceptionMessage = "The Streams were crossed!"; final StreamsException streamsException = new StreamsException(exceptionMessage); final Map<TaskId, Task> updatingTasks = mkMap( @@ -906,9 +907,9 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); final String exceptionMessage = "The Streams were crossed!"; final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id()); final StreamsException streamsException2 = new StreamsException(exceptionMessage, task3.id()); @@ -944,9 +945,9 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); + final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build(); final Set<TaskId> expectedTaskIds = mkSet(task1.id(), task2.id()); final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds); final Map<TaskId, Task> updatingTasks = mkMap( @@ -970,8 +971,8 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build(); final IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!"); final Map<TaskId, Task> updatingTasks = mkMap( mkEntry(task1.id(), task1), @@ -995,10 +996,10 @@ class DefaultStateUpdaterTest { public void shouldDrainFailedTasksAndExceptions() throws Exception { assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build(); + final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build(); final String exceptionMessage = "The Streams were crossed!"; final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id()); final Map<TaskId, Task> updatingTasks1 = mkMap( @@ -1043,10 +1044,10 @@ class DefaultStateUpdaterTest { @Test public void shouldAutoCheckpointTasksOnInterval() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); - final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1069,10 +1070,10 @@ class DefaultStateUpdaterTest { final Time time = new MockTime(); final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time); try { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); - final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); + final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1105,11 +1106,11 @@ class DefaultStateUpdaterTest { public void shouldGetTasksFromInputQueue() { stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); + final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build(); + final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build(); stateUpdater.add(activeTask1); stateUpdater.add(standbyTask1); stateUpdater.add(standbyTask2); @@ -1135,11 +1136,11 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromUpdatingTasks() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); + final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build(); + final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1168,8 +1169,8 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromRestoredActiveTasks() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1186,9 +1187,9 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); + final StreamTask activeTask1 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build(); + final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build(); final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(standbyTask1.id(), standbyTask2.id())); final StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id()); @@ -1220,9 +1221,9 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromRemovedTasks() throws Exception { - final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); + final StreamTask activeTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build(); + final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1243,8 +1244,8 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromPausedTasks() throws Exception { - final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); - final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); stateUpdater.start(); stateUpdater.add(activeTask); stateUpdater.add(standbyTask); @@ -1453,55 +1454,4 @@ class DefaultStateUpdaterTest { ); assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); } - - private StreamTask createActiveStatefulTaskInStateRestoring(final TaskId taskId, - final Set<TopicPartition> changelogPartitions) { - final StreamTask task = createActiveStatefulTask(taskId, changelogPartitions); - when(task.state()).thenReturn(State.RESTORING); - return task; - } - - private StreamTask createActiveStatefulTask(final TaskId taskId, - final Set<TopicPartition> changelogPartitions) { - final StreamTask task = mock(StreamTask.class); - setupStatefulTask(task, taskId, changelogPartitions); - when(task.isActive()).thenReturn(true); - return task; - } - - private StreamTask createStatelessTaskInStateRestoring(final TaskId taskId) { - final StreamTask task = createStatelessTask(taskId); - when(task.state()).thenReturn(State.RESTORING); - return task; - } - - private StreamTask createStatelessTask(final TaskId taskId) { - final StreamTask task = mock(StreamTask.class); - when(task.changelogPartitions()).thenReturn(Collections.emptySet()); - when(task.isActive()).thenReturn(true); - when(task.id()).thenReturn(taskId); - return task; - } - - private StandbyTask createStandbyTaskInStateRunning(final TaskId taskId, - final Set<TopicPartition> changelogPartitions) { - final StandbyTask task = createStandbyTask(taskId, changelogPartitions); - when(task.state()).thenReturn(State.RUNNING); - return task; - } - - private StandbyTask createStandbyTask(final TaskId taskId, - final Set<TopicPartition> changelogPartitions) { - final StandbyTask task = mock(StandbyTask.class); - setupStatefulTask(task, taskId, changelogPartitions); - when(task.isActive()).thenReturn(false); - return task; - } - - private void setupStatefulTask(final Task task, - final TaskId taskId, - final Set<TopicPartition> changelogPartitions) { - when(task.changelogPartitions()).thenReturn(changelogPartitions); - when(task.id()).thenReturn(taskId); - } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index c88515e01d..77e3bec562 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -343,6 +343,11 @@ public final class StreamsTestUtils { when(task.id()).thenReturn(taskId); } + public TaskBuilder<T> inState(final Task.State state) { + when(task.state()).thenReturn(state); + return this; + } + public T build() { return task; }