cadonna commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r892863627
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -293,47 +321,165 @@ public void
shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
stateUpdater.add(task3);
- verifyRestoredActiveTasks(task3);
+ verifyRestoredActiveTasks(task1, task3);
verify(task3).completeRestoration(offsetResetter);
orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader,
times(1)).transitToUpdateStandby();
}
+ @Test
+ public void shouldRemoveActiveStatefulTask() throws Exception {
+ final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldRemoveStatefulTask(task);
+ }
+
+ @Test
+ public void shouldRemoveStandbyTask() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldRemoveStatefulTask(task);
+ }
+
+ private void shouldRemoveStatefulTask(final Task task) throws Exception {
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+ stateUpdater.add(task);
+
+ stateUpdater.remove(TASK_0_0);
+
+ verifyRemovedTasks(task);
+ verifyRestoredActiveTasks();
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+
verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+ }
+
+ @Test
+ public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks()
throws Exception {
+ final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotRemoveTaskFromRestoredActiveTasks(task);
+ }
+
+ @Test
+ public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws
Exception {
+ final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
+ shouldNotRemoveTaskFromRestoredActiveTasks(task);
+ }
+
+ private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask
task) throws Exception {
+ final StreamTask controlTask =
createActiveStatefulTaskInStateRestoring(TASK_1_0,
Collections.singletonList(TOPIC_PARTITION_B_0));
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+ verifyRestoredActiveTasks(task);
+
+ stateUpdater.remove(task.id());
+ stateUpdater.remove(controlTask.id());
+
+ verifyRemovedTasks(controlTask);
+ verifyRestoredActiveTasks(task);
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+ }
+
+ @Test
+ public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws
Exception {
+ final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotRemoveTaskFromFailedTasks(task);
+ }
+
+ @Test
+ public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotRemoveTaskFromFailedTasks(task);
+ }
+
+ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws
Exception {
+ final StreamTask controlTask =
createActiveStatefulTaskInStateRestoring(TASK_1_0,
Collections.singletonList(TOPIC_PARTITION_B_0));
+ final StreamsException streamsException = new
StreamsException("Something happened", task.id());
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+ doNothing()
+ .doThrow(streamsException)
+ .doNothing()
+ .when(changelogReader).restore(anyMap());
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+ final ExceptionAndTasks expectedExceptionAndTasks = new
ExceptionAndTasks(mkSet(task), streamsException);
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+
+ stateUpdater.remove(task.id());
+ stateUpdater.remove(controlTask.id());
+
+ verifyRemovedTasks(controlTask);
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+ verifyUpdatingTasks();
+ verifyRestoredActiveTasks();
+ }
+
+ @Test
+ public void shouldDrainRemovedTasks() throws Exception {
+ assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+
+ final StreamTask task1 =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_B_0));
+ stateUpdater.add(task1);
+ stateUpdater.remove(task1.id());
+
+ verifyDrainingRemovedTasks(task1);
+
+ final StreamTask task2 =
createActiveStatefulTaskInStateRestoring(TASK_1_1,
Collections.singletonList(TOPIC_PARTITION_C_0));
+ final StreamTask task3 =
createActiveStatefulTaskInStateRestoring(TASK_1_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StreamTask task4 =
createActiveStatefulTaskInStateRestoring(TASK_0_2,
Collections.singletonList(TOPIC_PARTITION_D_0));
+ stateUpdater.add(task2);
+ stateUpdater.remove(task2.id());
+ stateUpdater.add(task3);
+ stateUpdater.remove(task3.id());
+ stateUpdater.add(task4);
+ stateUpdater.remove(task4.id());
+
+ verifyDrainingRemovedTasks(task2, task3, task4);
+ }
+
@Test
public void
shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask()
throws Exception {
final StreamTask task1 =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2,
Collections.singletonList(TOPIC_PARTITION_B_0));
- final String expectedMessage = "The Streams were crossed!";
- final StreamsException expectedStreamsException = new
StreamsException(expectedMessage);
+ final String exceptionMessage = "The Streams were crossed!";
+ final StreamsException streamsException = new
StreamsException(exceptionMessage);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
);
-
doNothing().doThrow(expectedStreamsException).doNothing().when(changelogReader).restore(updatingTasks);
+
doNothing().doThrow(streamsException).doNothing().when(changelogReader).restore(updatingTasks);
stateUpdater.add(task1);
stateUpdater.add(task2);
- final List<ExceptionAndTasks> failedTasks = getFailedTasks(1);
- assertEquals(1, failedTasks.size());
- final ExceptionAndTasks actualFailedTasks = failedTasks.get(0);
- assertEquals(2, actualFailedTasks.tasks.size());
- assertTrue(actualFailedTasks.tasks.containsAll(Arrays.asList(task1,
task2)));
- assertTrue(actualFailedTasks.exception instanceof StreamsException);
- final StreamsException actualException = (StreamsException)
actualFailedTasks.exception;
- assertFalse(actualException.taskId().isPresent());
- assertEquals(expectedMessage, actualException.getMessage());
- assertTrue(stateUpdater.getAllTasks().isEmpty());
Review Comment:
Replaced with a call to `containsAll()` in
`verifyExceptionsAndFailedTasks()`. There reference equality is verified for
exception and task which I think is fine in this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]