cadonna commented on code in PR #14180:
URL: https://github.com/apache/kafka/pull/14180#discussion_r1316995442


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java:
##########
@@ -94,6 +103,114 @@ public void shouldAssignTaskThatCanBeProcessed() {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    private class AwaitingRunnable implements Runnable {
+        private final CountDownLatch awaitDone = new CountDownLatch(1);
+        private final AtomicBoolean shutdownRequested = new 
AtomicBoolean(false);
+        @Override
+        public void run() {
+            while (!shutdownRequested.get()) {
+                try {
+                    taskManager.awaitProcessableTasks();
+                } catch (final InterruptedException ignored) {
+                }
+                awaitDone.countDown();
+            }
+        }
+
+        public void shutdown() {
+            shutdownRequested.set(true);
+            taskManager.signalProcessableTasks();
+        }
+    }
+
+    @Test
+    public void shouldBlockOnAwait() throws InterruptedException {

Review Comment:
   Waiting for the whole verification timeout for a successful test result 
seems long to me.  
   Does `shouldReturnFromAwaitOnInterruption()` not imply that the thread is 
waiting? Do we miss anything if we remove `shouldBlockOnAwait()`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java:
##########
@@ -85,6 +85,16 @@ public void shouldShutdownTaskExecutor() {
         assertNull(taskExecutor.currentTask(), "Have task assigned after 
shutdown");
     }
 
+    @Test
+    public void shouldAwaitProcessableTasksIfNoneAssignable() throws 
InterruptedException {
+        assertNull(taskExecutor.currentTask(), "Have task assigned before 
startup");
+        when(task.isProcessable(anyLong())).thenReturn(false);
+
+        taskExecutor.start();
+
+        verify(taskManager, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).awaitProcessableTasks();
+    }
+

Review Comment:
   This test is a bit hard to follow. I think the test would be easier to 
understand if you specify the simpler stub 
`when(taskManager.assignNextTask(taskExecutor)).thenReturn(null);` here in the 
test instead of using the stub specified in `setUp()`.   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java:
##########
@@ -73,6 +80,8 @@ public void setUp() {
         when(task.id()).thenReturn(new TaskId(0, 0, "A"));

Review Comment:
   nit: 
   You could reuse 
`org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask` here instead 
of creating your own stubs. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java:
##########
@@ -94,6 +103,114 @@ public void shouldAssignTaskThatCanBeProcessed() {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    private class AwaitingRunnable implements Runnable {
+        private final CountDownLatch awaitDone = new CountDownLatch(1);
+        private final AtomicBoolean shutdownRequested = new 
AtomicBoolean(false);
+        @Override
+        public void run() {
+            while (!shutdownRequested.get()) {
+                try {
+                    taskManager.awaitProcessableTasks();
+                } catch (final InterruptedException ignored) {
+                }
+                awaitDone.countDown();
+            }
+        }
+
+        public void shutdown() {
+            shutdownRequested.set(true);
+            taskManager.signalProcessableTasks();
+        }
+    }
+
+    @Test
+    public void shouldBlockOnAwait() throws InterruptedException {
+        final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
+        final Thread awaitingThread = new Thread(awaitingRunnable);
+        awaitingThread.start();
+
+        assertFalse(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, 
TimeUnit.MILLISECONDS));
+
+        awaitingRunnable.shutdown();
+    }
+
+    @Test
+    public void shouldReturnFromAwaitOnInterruption() throws 
InterruptedException {
+        final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
+        final Thread awaitingThread = new Thread(awaitingRunnable);
+        awaitingThread.start();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+
+        awaitingThread.interrupt();
+
+        assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, 
TimeUnit.MILLISECONDS));
+
+        awaitingRunnable.shutdown();
+    }
+
+    @Test
+    public void shouldReturnFromAwaitOnSignalProcessableTasks() throws 
InterruptedException {
+        final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
+        final Thread awaitingThread = new Thread(awaitingRunnable);
+        awaitingThread.start();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+
+        taskManager.signalProcessableTasks();
+
+        assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, 
TimeUnit.MILLISECONDS));
+
+        awaitingRunnable.shutdown();
+    }
+
+    @Test
+    public void shouldReturnFromAwaitOnUnassignment() throws 
InterruptedException {
+        taskManager.add(Collections.singleton(task));
+        when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
+
+        final StreamTask t = taskManager.assignNextTask(taskExecutor);
+        assertNotNull(t);

Review Comment:
   nit:
   ```suggestion
           final StreamTask task = taskManager.assignNextTask(taskExecutor);
           assertNotNull(task);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to