wcarlson5 commented on code in PR #12562:
URL: https://github.com/apache/kafka/pull/12562#discussion_r956444131


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -86,6 +87,10 @@ public boolean onlyStandbyTasksLeft() {
             return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);
         }
 
+        public Collection<Task> getPausedTasks() {

Review Comment:
   Just to confirm this will not change the pause resume API?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -557,7 +577,9 @@ public Set<Task> getRemovedTasks() {
     }
 
     public Set<Task> getPausedTasks() {
-        return Collections.unmodifiableSet(new 
HashSet<>(pausedTasks.values()));
+        return stateUpdaterThread != null

Review Comment:
   We can only pause tasks with the state updated thread? What if they are 
using the old/different implementation? I think that can get confusing



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -427,6 +430,23 @@ public void shutdown(final Duration timeout) {
                 stateUpdaterThread = null;
             } catch (final InterruptedException ignored) {
             }
+        } else {
+            removeAddedTasksFromInputQueue();

Review Comment:
   Do we also want to remove the paused tasks here? If not why do we need 
remove these?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
         verify(changelogReader, times(2)).clear();
     }
 
+    @Test
+    public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws 
Exception {
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+        final StreamTask statelessTask = 
statelessTask(TASK_0_0).inState(State.RESTORING).build();
+        final StreamTask statefulTask = statefulTask(TASK_1_0, 
mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_0_2, 
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        stateUpdater.pause(TASK_0_1);
+        stateUpdater.add(statelessTask);
+        stateUpdater.add(statefulTask);
+        stateUpdater.remove(TASK_1_1);
+        stateUpdater.add(standbyTask);
+        stateUpdater.resume(TASK_0_1);
+        verifyRemovedTasks();
+
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+
+        verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
+    }
+
+    @Test
+    public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+        stateUpdater = new DefaultStateUpdater(new 
StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
+        final StreamTask activeTask = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_0_2, 
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(activeTask);
+        stateUpdater.add(standbyTask);
+        verifyUpdatingTasks(activeTask, standbyTask);
+        verifyRemovedTasks();
+
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+
+        verifyRemovedTasks(activeTask, standbyTask);
+        verify(activeTask).maybeCheckpoint(true);
+        verify(standbyTask).maybeCheckpoint(true);
+    }
+
+    @Test
+    public void shouldRemovePausedTasksOnShutdown() throws Exception {

Review Comment:
   Nice test coverage! I am only confused on why we call 
`removeAddedTasksFromInputQueue` on shutdown and not 
`removeUpdatingAndPausedTasks` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to