cadonna commented on code in PR #14180: URL: https://github.com/apache/kafka/pull/14180#discussion_r1316995442
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java: ########## @@ -94,6 +103,114 @@ public void shouldAssignTaskThatCanBeProcessed() { assertNull(taskManager.assignNextTask(taskExecutor)); } + private class AwaitingRunnable implements Runnable { + private final CountDownLatch awaitDone = new CountDownLatch(1); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + @Override + public void run() { + while (!shutdownRequested.get()) { + try { + taskManager.awaitProcessableTasks(); + } catch (final InterruptedException ignored) { + } + awaitDone.countDown(); + } + } + + public void shutdown() { + shutdownRequested.set(true); + taskManager.signalProcessableTasks(); + } + } + + @Test + public void shouldBlockOnAwait() throws InterruptedException { Review Comment: Waiting for the whole verification timeout for a successful test result seems long to me. Does `shouldReturnFromAwaitOnInterruption()` not imply that the thread is waiting? Do we miss anything if we remove `shouldBlockOnAwait()`? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java: ########## @@ -85,6 +85,16 @@ public void shouldShutdownTaskExecutor() { assertNull(taskExecutor.currentTask(), "Have task assigned after shutdown"); } + @Test + public void shouldAwaitProcessableTasksIfNoneAssignable() throws InterruptedException { + assertNull(taskExecutor.currentTask(), "Have task assigned before startup"); + when(task.isProcessable(anyLong())).thenReturn(false); + + taskExecutor.start(); + + verify(taskManager, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).awaitProcessableTasks(); + } + Review Comment: This test is a bit hard to follow. I think the test would be easier to understand if you specify the simpler stub `when(taskManager.assignNextTask(taskExecutor)).thenReturn(null);` here in the test instead of using the stub specified in `setUp()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java: ########## @@ -73,6 +80,8 @@ public void setUp() { when(task.id()).thenReturn(new TaskId(0, 0, "A")); Review Comment: nit: You could reuse `org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask` here instead of creating your own stubs. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java: ########## @@ -94,6 +103,114 @@ public void shouldAssignTaskThatCanBeProcessed() { assertNull(taskManager.assignNextTask(taskExecutor)); } + private class AwaitingRunnable implements Runnable { + private final CountDownLatch awaitDone = new CountDownLatch(1); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + @Override + public void run() { + while (!shutdownRequested.get()) { + try { + taskManager.awaitProcessableTasks(); + } catch (final InterruptedException ignored) { + } + awaitDone.countDown(); + } + } + + public void shutdown() { + shutdownRequested.set(true); + taskManager.signalProcessableTasks(); + } + } + + @Test + public void shouldBlockOnAwait() throws InterruptedException { + final AwaitingRunnable awaitingRunnable = new AwaitingRunnable(); + final Thread awaitingThread = new Thread(awaitingRunnable); + awaitingThread.start(); + + assertFalse(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS)); + + awaitingRunnable.shutdown(); + } + + @Test + public void shouldReturnFromAwaitOnInterruption() throws InterruptedException { + final AwaitingRunnable awaitingRunnable = new AwaitingRunnable(); + final Thread awaitingThread = new Thread(awaitingRunnable); + awaitingThread.start(); + verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks(); + + awaitingThread.interrupt(); + + assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS)); + + awaitingRunnable.shutdown(); + } + + @Test + public void shouldReturnFromAwaitOnSignalProcessableTasks() throws InterruptedException { + final AwaitingRunnable awaitingRunnable = new AwaitingRunnable(); + final Thread awaitingThread = new Thread(awaitingRunnable); + awaitingThread.start(); + verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks(); + + taskManager.signalProcessableTasks(); + + assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS)); + + awaitingRunnable.shutdown(); + } + + @Test + public void shouldReturnFromAwaitOnUnassignment() throws InterruptedException { + taskManager.add(Collections.singleton(task)); + when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true); + + final StreamTask t = taskManager.assignNextTask(taskExecutor); + assertNotNull(t); Review Comment: nit: ```suggestion final StreamTask task = taskManager.assignNextTask(taskExecutor); assertNotNull(task); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org