cadonna commented on code in PR #14281:
URL: https://github.com/apache/kafka/pull/14281#discussion_r1331428275


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java:
##########
@@ -194,12 +204,18 @@ public KafkaFuture<Void> lockTasks(final Set<TaskId> 
taskIds) {
                 }
 
                 if (assignedTasks.containsKey(taskId)) {
-                    final KafkaFuture<StreamTask> future = 
assignedTasks.get(taskId).unassign();
+                    final TaskExecutor executor = assignedTasks.get(taskId);
+                    log.debug("Requesting release of task {} from {}", taskId, 
executor.name());
+                    final KafkaFuture<StreamTask> future = executor.unassign();
                     future.whenComplete((streamTask, throwable) -> {
                         if (throwable != null) {
                             result.completeExceptionally(throwable);
                         } else {
-                            remainingTaskIds.remove(streamTask.id());
+                            assert !assignedTasks.containsKey(taskId);
+                            // It can happen that the executor handed back the 
task before we asked it to
+                            // in which case `streamTask` will be null here.
+                            assert streamTask == null || streamTask.id() == 
taskId;

Review Comment:
   Usually, we do not use `assert` but rather `IllegalStateException`. Is there 
a good reason to use `assert`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java:
##########
@@ -333,5 +360,21 @@ private boolean canProgress(final StreamTask task, final 
long nowMs) {
             taskExecutionMetadata.canProcessTask(task, nowMs) && 
task.isProcessable(nowMs) ||
                 taskExecutionMetadata.canPunctuateTask(task) && 
(task.canPunctuateStreamTime() || task.canPunctuateSystemTime());
     }
+
+    public void start() {

Review Comment:
   nit: Should this be `startTaskExecutors()` ?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java:
##########
@@ -60,29 +64,52 @@ public void setUp() {
         
when(taskManager.assignNextTask(taskExecutor)).thenReturn(task).thenReturn(null);
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         when(task.isProcessable(anyLong())).thenReturn(true);
+        when(task.id()).thenReturn(new TaskId(0, 0, "A"));
         when(task.process(anyLong())).thenReturn(true);
         when(task.prepareCommit()).thenReturn(Collections.emptyMap());
     }
 
     @AfterEach
     public void tearDown() {
-        taskExecutor.shutdown(Duration.ofMinutes(1));
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
     }
 
     @Test
     public void shouldShutdownTaskExecutor() {
         assertNull(taskExecutor.currentTask(), "Have task assigned before 
startup");
+        assertFalse(taskExecutor.isRunning());
 
         taskExecutor.start();
 
+        assertTrue(taskExecutor.isRunning());
         verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
 
-        taskExecutor.shutdown(Duration.ofMinutes(1));
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
 
-        verify(task).prepareCommit();
+        verify(task).flush();
         verify(taskManager).unassignTask(task, taskExecutor);
 
         assertNull(taskExecutor.currentTask(), "Have task assigned after 
shutdown");
+        assertFalse(taskExecutor.isRunning());
+    }
+
+    @Test
+    public void shouldClearTaskReleaseFutureOnShutdown() throws 
InterruptedException {
+        assertNull(taskExecutor.currentTask(), "Have task assigned before 
startup");
+
+        taskExecutor.start();
+
+        verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
+
+        final KafkaFuture<StreamTask> future = taskExecutor.unassign();
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
+
+        waitForCondition(future::isDone, "Await for unassign future to 
complete");
+        assertTrue(future.isDone());

Review Comment:
   Why do you verify whether the future is done twice?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java:
##########
@@ -233,6 +236,19 @@ public void 
shouldAssignTasksThatCanBeStreamTimePunctuated() {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    @Test
+    public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
+        when(task.canPunctuateStreamTime()).thenReturn(true);

Review Comment:
   Could you please extract these two lines to a method named 
`ensureTaskMakesProgress()` or similar and use that method where progress is 
needed?
   I was a bit puzzled when I saw the two calls. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to