guozhangwang commented on code in PR #12562:
URL: https://github.com/apache/kafka/pull/12562#discussion_r956661497
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
verify(changelogReader, times(2)).clear();
}
+ @Test
+ public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws
Exception {
+ stateUpdater.shutdown(Duration.ofMinutes(1));
Review Comment:
Why we call `shutdown` first? We should have shutdown upon tearing down each
test.
Ditto in `shouldRemoveUpdatingTasksOnShutdown`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -100,7 +105,8 @@ public void run() {
} catch (final RuntimeException anyOtherException) {
handleRuntimeException(anyOtherException);
} finally {
- clear();
+ removeAddedTasksFromInputQueue();
+ removeUpdatingAndPausedTasks();
Review Comment:
We no longer clear the `restoredActiveTasks`, is that intentional?
Do we assume by the time the thread is shutting down, the stream thread
would not care about any restored tasks any longer, or are you going to merge
the restored tasks with removed tasks soon anyways?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -125,6 +125,80 @@ public void shouldShutdownStateUpdaterAndRestart() {
verify(changelogReader, times(2)).clear();
}
+ @Test
+ public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws
Exception {
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+ final StreamTask statelessTask =
statelessTask(TASK_0_0).inState(State.RESTORING).build();
+ final StreamTask statefulTask = statefulTask(TASK_1_0,
mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask = standbyTask(TASK_0_2,
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+ stateUpdater.pause(TASK_0_1);
+ stateUpdater.add(statelessTask);
+ stateUpdater.add(statefulTask);
+ stateUpdater.remove(TASK_1_1);
+ stateUpdater.add(standbyTask);
+ stateUpdater.resume(TASK_0_1);
+ verifyRemovedTasks();
+
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+
+ verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
+ }
+
+ @Test
+ public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+ stateUpdater = new DefaultStateUpdater(new
StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
+ final StreamTask activeTask = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask = standbyTask(TASK_0_2,
mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+ stateUpdater.start();
+ stateUpdater.add(activeTask);
+ stateUpdater.add(standbyTask);
+ verifyUpdatingTasks(activeTask, standbyTask);
+ verifyRemovedTasks();
+
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+
+ verifyRemovedTasks(activeTask, standbyTask);
+ verify(activeTask).maybeCheckpoint(true);
+ verify(standbyTask).maybeCheckpoint(true);
+ }
+
+ @Test
+ public void shouldRemovePausedTasksOnShutdown() throws Exception {
Review Comment:
It's possible that the state updater thread was not started when shutdown
was called, in that case we do not need to do the latter since it's always
empty still.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]