cadonna commented on code in PR #14281: URL: https://github.com/apache/kafka/pull/14281#discussion_r1338117870
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -237,8 +275,18 @@ public KafkaFuture<StreamTask> unassign() { final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>(); if (taskExecutorThread != null) { - taskExecutorThread.pauseRequested.set(future); + log.debug("Asking {} to hand back task", taskExecutorThread.getName()); + if (!taskExecutorThread.taskReleaseRequested.compareAndSet(null, future)) { + throw new IllegalStateException("There was already a task release request registered"); Review Comment: Could we also just ignore that there has already been a task release request? Or is there a correctness issue if we ignore? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java: ########## @@ -56,10 +58,10 @@ public interface TaskManager { * * This method does not block, instead a future is returned. */ - KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds); + KafkaFuture<Void> lockTasks(final Collection<TaskId> taskIds); Review Comment: Is it important to keep duplicate tasks? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -168,9 +192,10 @@ private StreamTask unassignCurrentTask() { if (currentTask == null) throw new IllegalStateException("Does not own any task while being ask to unassign from task manager"); - // flush the task before giving it back to task manager - // TODO: we can add a separate function in StreamTask to just flush and not return offsets - currentTask.prepareCommit(); + // flush the task before giving it back to task manager, if we are not handing it back because of an error. + if (!taskManager.hasUncaughtException(currentTask.id())) { + currentTask.flush(); Review Comment: Just a question for my understanding: Why do we need to flush the task before we hand it back? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java: ########## @@ -233,6 +236,18 @@ public void shouldAssignTasksThatCanBeStreamTimePunctuated() { assertNull(taskManager.assignNextTask(taskExecutor)); } + @Test + public void shouldNotAssignTasksIfUncaughtExceptionPresent() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + ensureTaskMakesProgress(); + taskManager.assignNextTask(taskExecutor); + taskManager.setUncaughtException(mock(StreamsException.class), taskId); Review Comment: Why do you use a mock of the `StreamsException` and not the exception directly? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -168,9 +192,10 @@ private StreamTask unassignCurrentTask() { if (currentTask == null) throw new IllegalStateException("Does not own any task while being ask to unassign from task manager"); - // flush the task before giving it back to task manager - // TODO: we can add a separate function in StreamTask to just flush and not return offsets - currentTask.prepareCommit(); + // flush the task before giving it back to task manager, if we are not handing it back because of an error. + if (!taskManager.hasUncaughtException(currentTask.id())) { + currentTask.flush(); Review Comment: OK, I saw now in the javadocs of `unassign()`. I am wondering if it makes more sense to call flush on the task in the task manager. You do not need to change that in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org