cadonna commented on code in PR #14281:
URL: https://github.com/apache/kafka/pull/14281#discussion_r1338117870


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##########
@@ -237,8 +275,18 @@ public KafkaFuture<StreamTask> unassign() {
         final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
 
         if (taskExecutorThread != null) {
-            taskExecutorThread.pauseRequested.set(future);
+            log.debug("Asking {} to hand back task", 
taskExecutorThread.getName());
+            if (!taskExecutorThread.taskReleaseRequested.compareAndSet(null, 
future)) {
+                throw new IllegalStateException("There was already a task 
release request registered");

Review Comment:
   Could we also just ignore that there has already been a task release 
request? Or is there a correctness issue if we ignore?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java:
##########
@@ -56,10 +58,10 @@ public interface TaskManager {
      *
      * This method does not block, instead a future is returned.
      */
-    KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds);
+    KafkaFuture<Void> lockTasks(final Collection<TaskId> taskIds);

Review Comment:
   Is it important to keep duplicate tasks?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##########
@@ -168,9 +192,10 @@ private StreamTask unassignCurrentTask() {
             if (currentTask == null)
                 throw new IllegalStateException("Does not own any task while 
being ask to unassign from task manager");
 
-            // flush the task before giving it back to task manager
-            // TODO: we can add a separate function in StreamTask to just 
flush and not return offsets
-            currentTask.prepareCommit();
+            // flush the task before giving it back to task manager, if we are 
not handing it back because of an error.
+            if (!taskManager.hasUncaughtException(currentTask.id())) {
+                currentTask.flush();

Review Comment:
   Just a question for my understanding: Why do we need to flush the task 
before we hand it back? 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java:
##########
@@ -233,6 +236,18 @@ public void 
shouldAssignTasksThatCanBeStreamTimePunctuated() {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    @Test
+    public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        ensureTaskMakesProgress();
+        taskManager.assignNextTask(taskExecutor);
+        taskManager.setUncaughtException(mock(StreamsException.class), taskId);

Review Comment:
   Why do you use a mock of the `StreamsException` and not the exception 
directly?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##########
@@ -168,9 +192,10 @@ private StreamTask unassignCurrentTask() {
             if (currentTask == null)
                 throw new IllegalStateException("Does not own any task while 
being ask to unassign from task manager");
 
-            // flush the task before giving it back to task manager
-            // TODO: we can add a separate function in StreamTask to just 
flush and not return offsets
-            currentTask.prepareCommit();
+            // flush the task before giving it back to task manager, if we are 
not handing it back because of an error.
+            if (!taskManager.hasUncaughtException(currentTask.id())) {
+                currentTask.flush();

Review Comment:
   OK, I saw now in the javadocs of `unassign()`. I am wondering if it makes 
more sense to call flush on the task in the task manager. You do not need to 
change that in this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to