cadonna commented on code in PR #14437:
URL: https://github.com/apache/kafka/pull/14437#discussion_r1335624082


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws 
Exception {
         verify(changelogReader, times(2)).enforceRestoreActive();
     }
 
+    @Test
+    public void shouldAwaitWhenAllTasksPaused() throws Exception {
+        final StreamTask task = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+        verifyPausedTasks(task);
+
+        reset(changelogReader);

Review Comment:
   I think, you do not need to reset the mock since we guarantee that paused 
tasks that are added to the state updater are not restored. We miss a test for 
that case, though.
   
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws 
Exception {
         verify(changelogReader, times(2)).enforceRestoreActive();
     }
 
+    @Test
+    public void shouldAwaitWhenAllTasksPaused() throws Exception {
+        final StreamTask task = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+        verifyPausedTasks(task);
+
+        reset(changelogReader);
+        Thread.sleep(100);
+        verify(changelogReader, never()).restore(any());

Review Comment:
   Instead of this, we could expose the thread state on the implementation of 
the state updater, but not on the interface. With that and a wait until 
verification, we could verify that the thread changes to the waiting state. We 
similar with `getActiveTasks()` or `getPausedTasks()`. That does also smell a 
bit, but it avoids the sleep and maybe smells a little bit less? 🙂  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -307,7 +307,7 @@ private void waitIfAllChangelogsCompletelyRead() {
             tasksAndActionsLock.lock();
             try {
                 while (isRunning.get() &&
-                    changelogReader.allChangelogsCompleted() &&
+                    (changelogReader.allChangelogsCompleted() || 
updatingTasks.isEmpty()) &&

Review Comment:
   Nice catch!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws 
Exception {
         verify(changelogReader, times(2)).enforceRestoreActive();
     }
 
+    @Test
+    public void shouldAwaitWhenAllTasksPaused() throws Exception {
+        final StreamTask task = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+        verifyPausedTasks(task);
+
+        reset(changelogReader);
+        Thread.sleep(100);
+        verify(changelogReader, never()).restore(any());

Review Comment:
   Instead of this, we could expose the thread state on the implementation of 
the state updater, but not on the interface. With that and a wait until 
verification, we could verify that the thread changes to the waiting state. We 
similar with `getActiveTasks()` or `getPausedTasks()`. That does also smell a 
bit, but it avoids the sleep and maybe smells a little bit less? 🙂  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to