cadonna commented on code in PR #12562:
URL: https://github.com/apache/kafka/pull/12562#discussion_r957103903


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
         verify(changelogReader, times(2)).clear();
     }
 
+    @Test
+    public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws 
Exception {
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+        final StreamTask statelessTask = 
statelessTask(TASK_0_0).inState(State.RESTORING).build();
+        final StreamTask statefulTask = statefulTask(TASK_1_0, 
mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_0_2, 
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        stateUpdater.pause(TASK_0_1);
+        stateUpdater.add(statelessTask);
+        stateUpdater.add(statefulTask);
+        stateUpdater.remove(TASK_1_1);
+        stateUpdater.add(standbyTask);
+        stateUpdater.resume(TASK_0_1);
+        verifyRemovedTasks();
+
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+
+        verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
+    }
+
+    @Test
+    public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+        stateUpdater = new DefaultStateUpdater(new 
StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
+        final StreamTask activeTask = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_0_2, 
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(activeTask);
+        stateUpdater.add(standbyTask);
+        verifyUpdatingTasks(activeTask, standbyTask);
+        verifyRemovedTasks();
+
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+
+        verifyRemovedTasks(activeTask, standbyTask);
+        verify(activeTask).maybeCheckpoint(true);
+        verify(standbyTask).maybeCheckpoint(true);
+    }
+
+    @Test
+    public void shouldRemovePausedTasksOnShutdown() throws Exception {

Review Comment:
   What Guozhang said. I also explained it as an answer to one of Walker's 
comments above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to