cadonna commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r892863627


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -293,47 +321,165 @@ public void 
shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
 
         stateUpdater.add(task3);
 
-        verifyRestoredActiveTasks(task3);
+        verifyRestoredActiveTasks(task1, task3);
         verify(task3).completeRestoration(offsetResetter);
         orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
         orderVerifier.verify(changelogReader, 
times(1)).transitToUpdateStandby();
     }
 
+    @Test
+    public void shouldRemoveActiveStatefulTask() throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldRemoveStatefulTask(task);
+    }
+
+    @Test
+    public void shouldRemoveStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldRemoveStatefulTask(task);
+    }
+
+    private void shouldRemoveStatefulTask(final Task task) throws Exception {
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(task);
+
+        stateUpdater.remove(TASK_0_0);
+
+        verifyRemovedTasks(task);
+        verifyRestoredActiveTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+        
verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+    }
+
+    @Test
+    public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+    }
+
+    @Test
+    public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws 
Exception {
+        final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+    }
+
+    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task) throws Exception {
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.remove(task.id());
+        stateUpdater.remove(controlTask.id());
+
+        verifyRemovedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromFailedTasks(task);
+    }
+
+    private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws 
Exception {
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StreamsException streamsException = new 
StreamsException("Something happened", task.id());
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        doNothing()
+            .doThrow(streamsException)
+            .doNothing()
+            .when(changelogReader).restore(anyMap());
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        final ExceptionAndTasks expectedExceptionAndTasks = new 
ExceptionAndTasks(mkSet(task), streamsException);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+
+        stateUpdater.remove(task.id());
+        stateUpdater.remove(controlTask.id());
+
+        verifyRemovedTasks(controlTask);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+        verifyUpdatingTasks();
+        verifyRestoredActiveTasks();
+    }
+
+    @Test
+    public void shouldDrainRemovedTasks() throws Exception {
+        assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        stateUpdater.add(task1);
+        stateUpdater.remove(task1.id());
+
+        verifyDrainingRemovedTasks(task1);
+
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StreamTask task3 = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task4 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        stateUpdater.add(task2);
+        stateUpdater.remove(task2.id());
+        stateUpdater.add(task3);
+        stateUpdater.remove(task3.id());
+        stateUpdater.add(task4);
+        stateUpdater.remove(task4.id());
+
+        verifyDrainingRemovedTasks(task2, task3, task4);
+    }
+
     @Test
     public void 
shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() 
throws Exception {
         final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
         final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
-        final String expectedMessage = "The Streams were crossed!";
-        final StreamsException expectedStreamsException = new 
StreamsException(expectedMessage);
+        final String exceptionMessage = "The Streams were crossed!";
+        final StreamsException streamsException = new 
StreamsException(exceptionMessage);
         final Map<TaskId, Task> updatingTasks = mkMap(
             mkEntry(task1.id(), task1),
             mkEntry(task2.id(), task2)
         );
-        
doNothing().doThrow(expectedStreamsException).doNothing().when(changelogReader).restore(updatingTasks);
+        
doNothing().doThrow(streamsException).doNothing().when(changelogReader).restore(updatingTasks);
 
         stateUpdater.add(task1);
         stateUpdater.add(task2);
 
-        final List<ExceptionAndTasks> failedTasks = getFailedTasks(1);
-        assertEquals(1, failedTasks.size());
-        final ExceptionAndTasks actualFailedTasks = failedTasks.get(0);
-        assertEquals(2, actualFailedTasks.tasks.size());
-        assertTrue(actualFailedTasks.tasks.containsAll(Arrays.asList(task1, 
task2)));
-        assertTrue(actualFailedTasks.exception instanceof StreamsException);
-        final StreamsException actualException = (StreamsException) 
actualFailedTasks.exception;
-        assertFalse(actualException.taskId().isPresent());
-        assertEquals(expectedMessage, actualException.getMessage());
-        assertTrue(stateUpdater.getAllTasks().isEmpty());

Review Comment:
   Replaced with a call to `containsAll()` in 
`verifyExceptionsAndFailedTasks()`. There reference equality is verified for 
exception and task which I think is fine in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to