cadonna commented on code in PR #14437:
URL: https://github.com/apache/kafka/pull/14437#discussion_r1335624082
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws
Exception {
verify(changelogReader, times(2)).enforceRestoreActive();
}
+ @Test
+ public void shouldAwaitWhenAllTasksPaused() throws Exception {
+ final StreamTask task = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+ verifyPausedTasks(task);
+
+ reset(changelogReader);
Review Comment:
I think, you do not need to reset the mock since we guarantee that paused
tasks that are added to the state updater are not restored. We miss a test for
that case, though.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws
Exception {
verify(changelogReader, times(2)).enforceRestoreActive();
}
+ @Test
+ public void shouldAwaitWhenAllTasksPaused() throws Exception {
+ final StreamTask task = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+ verifyPausedTasks(task);
+
+ reset(changelogReader);
+ Thread.sleep(100);
+ verify(changelogReader, never()).restore(any());
Review Comment:
Instead of this, we could expose the thread state on the implementation of
the state updater, but not on the interface. With that and a wait until
verification, we could verify that the thread changes to the waiting state. We
similar with `getActiveTasks()` or `getPausedTasks()`. That does also smell a
bit, but it avoids the sleep and maybe smells a little bit less? 🙂
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -307,7 +307,7 @@ private void waitIfAllChangelogsCompletelyRead() {
tasksAndActionsLock.lock();
try {
while (isRunning.get() &&
- changelogReader.allChangelogsCompleted() &&
+ (changelogReader.allChangelogsCompleted() ||
updatingTasks.isEmpty()) &&
Review Comment:
Nice catch!
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws
Exception {
verify(changelogReader, times(2)).enforceRestoreActive();
}
+ @Test
+ public void shouldAwaitWhenAllTasksPaused() throws Exception {
+ final StreamTask task = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ stateUpdater.start();
+ stateUpdater.add(task);
+
+ when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+ verifyPausedTasks(task);
+
+ reset(changelogReader);
+ Thread.sleep(100);
+ verify(changelogReader, never()).restore(any());
Review Comment:
Instead of this, we could expose the thread state on the implementation of
the state updater, but not on the interface. With that and a wait until
verification, we could verify that the thread changes to the waiting state. We
similar with `getActiveTasks()` or `getPausedTasks()`. That does also smell a
bit, but it avoids the sleep and maybe smells a little bit less? 🙂
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]