cadonna commented on code in PR #14281: URL: https://github.com/apache/kafka/pull/14281#discussion_r1331428275
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java: ########## @@ -194,12 +204,18 @@ public KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds) { } if (assignedTasks.containsKey(taskId)) { - final KafkaFuture<StreamTask> future = assignedTasks.get(taskId).unassign(); + final TaskExecutor executor = assignedTasks.get(taskId); + log.debug("Requesting release of task {} from {}", taskId, executor.name()); + final KafkaFuture<StreamTask> future = executor.unassign(); future.whenComplete((streamTask, throwable) -> { if (throwable != null) { result.completeExceptionally(throwable); } else { - remainingTaskIds.remove(streamTask.id()); + assert !assignedTasks.containsKey(taskId); + // It can happen that the executor handed back the task before we asked it to + // in which case `streamTask` will be null here. + assert streamTask == null || streamTask.id() == taskId; Review Comment: Usually, we do not use `assert` but rather `IllegalStateException`. Is there a good reason to use `assert`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java: ########## @@ -333,5 +360,21 @@ private boolean canProgress(final StreamTask task, final long nowMs) { taskExecutionMetadata.canProcessTask(task, nowMs) && task.isProcessable(nowMs) || taskExecutionMetadata.canPunctuateTask(task) && (task.canPunctuateStreamTime() || task.canPunctuateSystemTime()); } + + public void start() { Review Comment: nit: Should this be `startTaskExecutors()` ? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java: ########## @@ -60,29 +64,52 @@ public void setUp() { when(taskManager.assignNextTask(taskExecutor)).thenReturn(task).thenReturn(null); when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true); when(task.isProcessable(anyLong())).thenReturn(true); + when(task.id()).thenReturn(new TaskId(0, 0, "A")); when(task.process(anyLong())).thenReturn(true); when(task.prepareCommit()).thenReturn(Collections.emptyMap()); } @AfterEach public void tearDown() { - taskExecutor.shutdown(Duration.ofMinutes(1)); + taskExecutor.requestShutdown(); + taskExecutor.awaitShutdown(Duration.ofMinutes(1)); } @Test public void shouldShutdownTaskExecutor() { assertNull(taskExecutor.currentTask(), "Have task assigned before startup"); + assertFalse(taskExecutor.isRunning()); taskExecutor.start(); + assertTrue(taskExecutor.isRunning()); verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor); - taskExecutor.shutdown(Duration.ofMinutes(1)); + taskExecutor.requestShutdown(); + taskExecutor.awaitShutdown(Duration.ofMinutes(1)); - verify(task).prepareCommit(); + verify(task).flush(); verify(taskManager).unassignTask(task, taskExecutor); assertNull(taskExecutor.currentTask(), "Have task assigned after shutdown"); + assertFalse(taskExecutor.isRunning()); + } + + @Test + public void shouldClearTaskReleaseFutureOnShutdown() throws InterruptedException { + assertNull(taskExecutor.currentTask(), "Have task assigned before startup"); + + taskExecutor.start(); + + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor); + + final KafkaFuture<StreamTask> future = taskExecutor.unassign(); + taskExecutor.requestShutdown(); + taskExecutor.awaitShutdown(Duration.ofMinutes(1)); + + waitForCondition(future::isDone, "Await for unassign future to complete"); + assertTrue(future.isDone()); Review Comment: Why do you verify whether the future is done twice? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java: ########## @@ -233,6 +236,19 @@ public void shouldAssignTasksThatCanBeStreamTimePunctuated() { assertNull(taskManager.assignNextTask(taskExecutor)); } + @Test + public void shouldNotAssignTasksIfUncaughtExceptionPresent() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true); + when(task.canPunctuateStreamTime()).thenReturn(true); Review Comment: Could you please extract these two lines to a method named `ensureTaskMakesProgress()` or similar and use that method where progress is needed? I was a bit puzzled when I saw the two calls. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org