guozhangwang commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r893752714


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
 
         private void addTask(final Task task) {
             if (isStateless(task)) {
-                log.debug("Stateless active task " + task.id() + " was added 
to the state updater");
                 addTaskToRestoredTasks((StreamTask) task);
+                log.debug("Stateless active task " + task.id() + " was added 
to the restored tasks of the state updater");
             } else {
+                updatingTasks.put(task.id(), task);
                 if (task.isActive()) {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Stateful active task " + task.id() + " was 
added to the state updater");
+                    log.debug("Stateful active task " + task.id() + " was 
added to the updating tasks of the state updater");
                     changelogReader.enforceRestoreActive();
                 } else {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Standby task " + task.id() + " was added to the 
state updater");
+                    log.debug("Standby task " + task.id() + " was added to the 
updating tasks of the state updater");
                     if (updatingTasks.size() == 1) {
                         changelogReader.transitToUpdateStandby();
                     }
                 }
             }
         }
 
+        private void removeTask(final TaskId taskId) {
+            final Task task = updatingTasks.remove(taskId);

Review Comment:
   Not a comment: one of the bug fix we want to piggy-back in restoration is to 
write the new checkpoint when we stop restoring a task otherwise the 
restoration progress so-far would be lost. Also the restore callback should be 
triggered as well (KAFKA-10575).
   
   I will try to add this in a separate PR, also a good exercise on how the 
state updater could interact with the processor state manager.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -375,52 +384,42 @@ public Set<StreamTask> getRestoredActiveTasks(final 
Duration timeout) {
     }
 
     @Override
-    public List<ExceptionAndTasks> getFailedTasksAndExceptions() {
+    public Set<Task> drainRemovedTasks() {

Review Comment:
   When we add the logic for recycling a task, which is to be done at the task 
manager still we would need two round-trips: first remove the task as 
active/standby, then after recycling it add the new task as standby/active. I'm 
still trying to flesh out the details here, just in case we would need to also 
have a timeout for removed tasks similar to restored-active-tasks, this data 
structure may also need to be transformed to a lock+condition+queue manner 
instead of a blocking-queue.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -375,52 +384,42 @@ public Set<StreamTask> getRestoredActiveTasks(final 
Duration timeout) {
     }
 
     @Override
-    public List<ExceptionAndTasks> getFailedTasksAndExceptions() {
+    public Set<Task> drainRemovedTasks() {
+        final List<Task> result = new ArrayList<>();
+        removedTasks.drainTo(result);
+        return new HashSet<>(result);
+    }
+
+    @Override
+    public List<ExceptionAndTasks> drainExceptionsAndFailedTasks() {
         final List<ExceptionAndTasks> result = new ArrayList<>();
-        failedTasks.drainTo(result);
+        exceptionsAndFailedTasks.drainTo(result);
         return result;
     }
 
-    @Override
-    public Set<Task> getAllTasks() {
-        tasksAndActionsLock.lock();
+    public Set<StandbyTask> getUpdatingStandbyTasks() {
+        return Collections.unmodifiableSet(new 
HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks()));
+    }
+
+    public Set<Task> getUpdatingTasks() {
+        return Collections.unmodifiableSet(new 
HashSet<>(stateUpdaterThread.getUpdatingTasks()));
+    }
+
+    public Set<StreamTask> getRestoredActiveTasks() {
         restoredActiveTasksLock.lock();
         try {
-            final Set<Task> allTasks = new HashSet<>();
-            allTasks.addAll(tasksAndActions.stream()
-                .filter(t -> t.action == Action.ADD)
-                .map(t -> t.task)
-                .collect(Collectors.toList())
-            );
-            allTasks.addAll(stateUpdaterThread.getAllUpdatingTasks());
-            allTasks.addAll(restoredActiveTasks);
-            return Collections.unmodifiableSet(allTasks);
+            return Collections.unmodifiableSet(new 
HashSet<>(restoredActiveTasks));
         } finally {
             restoredActiveTasksLock.unlock();
-            tasksAndActionsLock.unlock();
         }
     }
 
-    @Override
-    public Set<StandbyTask> getStandbyTasks() {
-        tasksAndActionsLock.lock();
-        try {
-            final Set<StandbyTask> standbyTasks = new HashSet<>();
-            standbyTasks.addAll(tasksAndActions.stream()
-                .filter(t -> t.action == Action.ADD)
-                .filter(t -> !t.task.isActive())
-                .map(t -> (StandbyTask) t.task)
-                .collect(Collectors.toList())
-            );
-            standbyTasks.addAll(getUpdatingStandbyTasks());
-            return Collections.unmodifiableSet(standbyTasks);
-        } finally {
-            tasksAndActionsLock.unlock();
-        }
+    public List<ExceptionAndTasks> getExceptionsAndFailedTasks() {

Review Comment:
   I feel some of the getters here would not end up needed in the non-testing 
code since we have the corresponding `drainXX` functions that are declared in 
the interface and would be used in those non-testing code. If that turns out 
true let's cluster them together and commented they are for testing only.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -16,65 +16,101 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 public interface StateUpdater {
 
     class ExceptionAndTasks {
-        public final Set<Task> tasks;
-        public final RuntimeException exception;
+        private final Set<Task> tasks;
+        private final RuntimeException exception;
 
         public ExceptionAndTasks(final Set<Task> tasks, final RuntimeException 
exception) {
-            this.tasks = tasks;
-            this.exception = exception;
+            this.tasks = Objects.requireNonNull(tasks);
+            this.exception = Objects.requireNonNull(exception);
+        }
+
+        public Set<Task> tasks() {
+            return Collections.unmodifiableSet(tasks);
+        }
+
+        public RuntimeException exception() {
+            return exception;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (!(o instanceof ExceptionAndTasks)) return false;
+            final ExceptionAndTasks that = (ExceptionAndTasks) o;
+            return tasks.equals(that.tasks) && 
exception.equals(that.exception);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tasks, exception);
         }
     }
 
     /**
      * Adds a task (active or standby) to the state updater.
      *
+     * This method does not block until the task is added to the state updater.
+     *
      * @param task task to add
      */
     void add(final Task task);
 
     /**
-     * Removes a task (active or standby) from the state updater.
+     * Removes a task (active or standby) from the state updater and adds the 
removed task to the removed tasks.
+     *
+     * This method does not block until the removed task is removed from the 
state updater.
      *
-     * @param task task ro remove
+     * The task to be removed is not removed from the restored active tasks 
and the failed tasks.
+     * Stateless tasks will never be added to the removed tasks since they are 
immediately added to the
+     * restored active tasks.
+     *
+     * @param taskId ID of the task to remove
      */
-    void remove(final Task task);
+    void remove(final TaskId taskId);
 
     /**
-     * Gets restored active tasks from state restoration/update
+     * Drains the restored active tasks from the state updater.
+     *
+     * The returned active tasks are removed from the state updater.
      *
      * @param timeout duration how long the calling thread should wait for 
restored active tasks
      *
      * @return set of active tasks with up-to-date states
      */
-    Set<StreamTask> getRestoredActiveTasks(final Duration timeout);
+    Set<StreamTask> drainRestoredActiveTasks(final Duration timeout);
 
-    /**
-     * Gets failed tasks and the corresponding exceptions
-     *
-     * @return list of failed tasks and the corresponding exceptions
-     */
-    List<ExceptionAndTasks> getFailedTasksAndExceptions();
 
     /**
-     * Get all tasks (active and standby) that are managed by the state 
updater.
+     * Drains the removed tasks (active and standbys) from the state updater.
+     *
+     * Removed tasks returned by this method are tasks extraordinarily removed 
from the state updater. These do not

Review Comment:
   Like the word "extraordinarily" :)
   
   Jokes aside, I have a slight different thought about the semantics here, 
i.e. whether the drained removed and the restored/failed tasks should be 
exclusive or be overlapping possibly, mainly from how easily the caller could 
handle the overlapping scenarios. I left an early comment above.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -293,47 +321,165 @@ public void 
shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
 
         stateUpdater.add(task3);
 
-        verifyRestoredActiveTasks(task3);
+        verifyRestoredActiveTasks(task1, task3);
         verify(task3).completeRestoration(offsetResetter);
         orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
         orderVerifier.verify(changelogReader, 
times(1)).transitToUpdateStandby();
     }
 
+    @Test
+    public void shouldRemoveActiveStatefulTask() throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldRemoveStatefulTask(task);
+    }
+
+    @Test
+    public void shouldRemoveStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldRemoveStatefulTask(task);
+    }
+
+    private void shouldRemoveStatefulTask(final Task task) throws Exception {
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(task);
+
+        stateUpdater.remove(TASK_0_0);
+
+        verifyRemovedTasks(task);
+        verifyRestoredActiveTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+        
verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+    }
+
+    @Test
+    public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+    }
+
+    @Test
+    public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws 
Exception {
+        final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+    }
+
+    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task) throws Exception {
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.remove(task.id());
+        stateUpdater.remove(controlTask.id());
+
+        verifyRemovedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromFailedTasks(task);
+    }
+
+    private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws 
Exception {
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StreamsException streamsException = new 
StreamsException("Something happened", task.id());
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+        doNothing()
+            .doThrow(streamsException)
+            .doNothing()
+            .when(changelogReader).restore(anyMap());
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        final ExceptionAndTasks expectedExceptionAndTasks = new 
ExceptionAndTasks(mkSet(task), streamsException);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+
+        stateUpdater.remove(task.id());
+        stateUpdater.remove(controlTask.id());
+
+        verifyRemovedTasks(controlTask);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+        verifyUpdatingTasks();
+        verifyRestoredActiveTasks();
+    }
+
+    @Test
+    public void shouldDrainRemovedTasks() throws Exception {
+        assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
+        when(changelogReader.completedChangelogs())
+            .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+            .thenReturn(false);
+
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        stateUpdater.add(task1);
+        stateUpdater.remove(task1.id());
+
+        verifyDrainingRemovedTasks(task1);
+
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StreamTask task3 = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task4 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        stateUpdater.add(task2);
+        stateUpdater.remove(task2.id());
+        stateUpdater.add(task3);
+        stateUpdater.remove(task3.id());
+        stateUpdater.add(task4);
+        stateUpdater.remove(task4.id());
+
+        verifyDrainingRemovedTasks(task2, task3, task4);
+    }
+
     @Test
     public void 
shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() 
throws Exception {
         final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
         final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
-        final String expectedMessage = "The Streams were crossed!";
-        final StreamsException expectedStreamsException = new 
StreamsException(expectedMessage);
+        final String exceptionMessage = "The Streams were crossed!";
+        final StreamsException streamsException = new 
StreamsException(exceptionMessage);
         final Map<TaskId, Task> updatingTasks = mkMap(
             mkEntry(task1.id(), task1),
             mkEntry(task2.id(), task2)
         );
-        
doNothing().doThrow(expectedStreamsException).doNothing().when(changelogReader).restore(updatingTasks);
+        
doNothing().doThrow(streamsException).doNothing().when(changelogReader).restore(updatingTasks);
 
         stateUpdater.add(task1);
         stateUpdater.add(task2);
 
-        final List<ExceptionAndTasks> failedTasks = getFailedTasks(1);
-        assertEquals(1, failedTasks.size());
-        final ExceptionAndTasks actualFailedTasks = failedTasks.get(0);
-        assertEquals(2, actualFailedTasks.tasks.size());
-        assertTrue(actualFailedTasks.tasks.containsAll(Arrays.asList(task1, 
task2)));
-        assertTrue(actualFailedTasks.exception instanceof StreamsException);
-        final StreamsException actualException = (StreamsException) 
actualFailedTasks.exception;
-        assertFalse(actualException.taskId().isPresent());
-        assertEquals(expectedMessage, actualException.getMessage());
-        assertTrue(stateUpdater.getAllTasks().isEmpty());

Review Comment:
   Yup, agreed.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
 
         private void addTask(final Task task) {
             if (isStateless(task)) {
-                log.debug("Stateless active task " + task.id() + " was added 
to the state updater");
                 addTaskToRestoredTasks((StreamTask) task);
+                log.debug("Stateless active task " + task.id() + " was added 
to the restored tasks of the state updater");
             } else {
+                updatingTasks.put(task.id(), task);
                 if (task.isActive()) {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Stateful active task " + task.id() + " was 
added to the state updater");
+                    log.debug("Stateful active task " + task.id() + " was 
added to the updating tasks of the state updater");
                     changelogReader.enforceRestoreActive();
                 } else {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Standby task " + task.id() + " was added to the 
state updater");
+                    log.debug("Standby task " + task.id() + " was added to the 
updating tasks of the state updater");
                     if (updatingTasks.size() == 1) {
                         changelogReader.transitToUpdateStandby();
                     }
                 }
             }
         }
 
+        private void removeTask(final TaskId taskId) {
+            final Task task = updatingTasks.remove(taskId);
+            if (task != null) {
+                final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the updating 
tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not 
updating.");

Review Comment:
   This is a meta comment: for those tasks that have been restored, or failed, 
should we still include them into the removed tasks to be returned in the 
`drain` function still?
   
   The reason I'm wondering about it is that, the caller of `updater.remove` 
would likely expect to eventually see the task show up from the future `drain` 
functions (again here one example would be the recycle scenario). If we do not 
add them there then the caller's logic needs to be a bit complicated as to 
check the `restored` / `failed` set from the updater as well while checking 
when this task has been removed completely.
   
   The down side of course is that a task can be shown in multiple of such 
channels, but I feel the caller's logic to "de-dup" such cases would be easier 
as long as there's a deterministic ordering of checking 
removed/completed/failed tasks from the updater.
   
   WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to