guozhangwang commented on code in PR #12519:
URL: https://github.com/apache/kafka/pull/12519#discussion_r959853208


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java:
##########
@@ -57,10 +57,10 @@ public interface TasksRegistry {
 
     void addNewActiveTasks(final Collection<Task> newTasks);
 
-    void addNewActiveTask(final Task task);
-
     void addNewStandbyTasks(final Collection<Task> newTasks);

Review Comment:
   Yes I agree :) I'm actually thinking about some more aggressive change in 
the future, since as we complete the state updater, the task manager / task 
registry would then only handle running active tasks, and there's no restoring 
active and standby tasks ever in this class, we can further slimmer this class 
and even consider inlining it.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -877,6 +877,8 @@ private void initializeAndRestorePhase() {
             // transit to restore active is idempotent so we can call it 
multiple times
             changelogReader.enforceRestoreActive();
 
+            taskManager.tryHandleExceptionsFromStateUpdater();

Review Comment:
   Ack, we can do that.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -229,34 +229,36 @@ boolean handleCorruption(final Set<TaskId> 
corruptedTasks) {
 
     private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, final boolean markAsCorrupted) {
         for (final Task task : taskWithChangelogs) {
-            final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
+            if (task.state() != State.CLOSED) {
+                final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
 
-            // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-            if (markAsCorrupted) {
-                task.markChangelogAsCorrupted(corruptedPartitions);
-            }
+                // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
+                // TODO: this step should be removed as we complete migrating 
to state updater
+                if (markAsCorrupted && stateUpdater == null) {

Review Comment:
   I have thought about that and put a JIRA ticket about exception handling for 
task corrupted after we've done state updater, and just to copy-paste my 
thoughts below:
   
   ```
   Polling thread:
   
   RecordCollector → Producer.send(callback)'s callback gets a retriable 
exception: We cannot simply retry sending as it would change the ordering 
appended to the topics. This can be thrown in the following case
   
   Task.suspend → RecordCollector.flush() → checkException()
   
   Task.closeClean → RecordCollector.closeClean() → checkException() (? 
Honestly I think it should never reach this since if there’s an error it should 
be found in the previous flush call?)
   
   Normally, when RecordCollector.forward() → checkException()
   ```
   
   We would handle it as fine-grained TaskEmitOrderCorruptedException. This 
exception does not need to mark any changelogs as corrupted.
   
   
   (EOS) checkpoint not found when registering stores: This means we are 
unclear which snapshot the local state stores represent and hence cannot 
restore from a specific position.
   
   We would throw it as fine-grained TaskStateCorruptedException.
   
   As for the handling logic:
   
   1) Mark the task as corrupted and hence not be scheduled for any further 
processing anymore.
   2) At the beginning of each loop, check if there are any tasks marked as 
corrupted (either by itself, or sent back from the restore thread, see below). 
If there are any:
   3) Commit for all other non-corrupted tasks first.
   4) Revive those corrupted tasks by first close-dirty (which will wipe the 
state stores under EOS), and then convert to created mode.
    
   ```
   Restore thread:
   
   Restore consumer throws invalid offset exception. E.g. out-of-range, log 
truncated, no-offset-for-partition
   
   We would throw it as fine-grained TaskChangelogCorruptedException. 
   ```
   
   As for handling: as we did here, we would mark changelog as corrupted and 
delete the topic partition entry in the checkpoint file inside the state 
updater, and then remove the task in the state updater and send the exception 
back to the polling thread to handle.
   
   ```
   EOS:
   
   When initializing the task, we found there's no checkpoint file.
   ```
   Here I think we should just handle it directly inside the function to wipe 
out store and re-initialize, rather than throw an exception all the way up to 
stream thread to handle.
   
   As a result of the above, the ONLY case that we should mark changelog as 
corrupted would be for this case.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -667,6 +672,25 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    public void tryHandleExceptionsFromStateUpdater() {
+        if (stateUpdater != null) {
+            final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
+
+            for (final StateUpdater.ExceptionAndTasks exceptionAndTasks : 
stateUpdater.drainExceptionsAndFailedTasks()) {
+                final RuntimeException exception = 
exceptionAndTasks.exception();
+                final Set<Task> failedTasks = exceptionAndTasks.getTasks();
+
+                for (final Task failedTask : failedTasks) {
+                    // need to add task back to the bookkeeping to be handled 
by the stream thread
+                    tasks.addTask(failedTask);
+                    taskExceptions.put(failedTask.id(), exception);
+                }
+            }
+
+            maybeThrowTaskExceptions(taskExceptions);

Review Comment:
   The motivation for `maybeThrowTaskExceptions` is that 1) if there's 
exceptions that would cause "all is lost", then we can directly throw that one 
since we are going to close-dirty all tasks; 2) otherwise, then there's only 
task-corrupted, then we should handle them at once. If we do not do 2) then we 
need to bookkeep the list of "yet handled corrupted exception" to let the 
thread throw and handle one by one, which would be quite complicated.. I agree 
that doing two transformations here is a bit awkward, we can instead just do 
the quoted logic 1/2 here inside the function directly (from state updater, we 
should either see fatal streams exception or task corrupted only), I'm doing 
this just for paranoid code-deduping.
   
   On the lose log information, yeah I totally agree with you. After thinking 
about that a bit I think we can just log the stack trace as we group the task 
corrupted exception, wdyt? BTW in the future we would finer-categorize 
different causes of task-corrupted as different exceptions so this logic is 
likely going to be temporary only.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -872,14 +872,92 @@ public void shouldHandleMultipleRestoredTasks() {
 
         taskManager.tryToCompleteRestoration(time.milliseconds(), 
noOpResetter);
 
-        Mockito.verify(tasks).addNewActiveTask(taskToTransitToRunning);
+        Mockito.verify(tasks).addTask(taskToTransitToRunning);
         Mockito.verify(stateUpdater).add(recycledStandbyTask);
         Mockito.verify(stateUpdater).add(recycledStandbyTask);
         Mockito.verify(taskToCloseClean).closeClean();
         Mockito.verify(taskToCloseDirty).closeDirty();
         
Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions),
 isNull());
     }
 
+    @Test
+    public void shouldRethrowStreamsExceptionFromStateUpdater() {
+        final StreamTask statefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamsException exception = new StreamsException("boom!");
+        final StateUpdater.ExceptionAndTasks exceptionAndTasks = new 
StateUpdater.ExceptionAndTasks(
+            Collections.singleton(statefulTask),
+            exception
+        );
+        
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        final StreamsException thrown = assertThrows(
+            StreamsException.class,
+            () -> taskManager.tryToCompleteRestoration(time.milliseconds(), 
noOpResetter)
+        );
+
+        assertEquals(exception, thrown);
+        assertEquals(statefulTask.id(), thrown.taskId().get());
+    }
+
+    @Test
+    public void shouldRethrowRuntimeExceptionFromStateUpdater() {
+        final StreamTask statefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final RuntimeException exception = new RuntimeException("boom!");
+        final StateUpdater.ExceptionAndTasks exceptionAndTasks = new 
StateUpdater.ExceptionAndTasks(
+            Collections.singleton(statefulTask),
+            exception
+        );
+        
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        final StreamsException thrown = assertThrows(
+            StreamsException.class,
+            () -> taskManager.tryToCompleteRestoration(time.milliseconds(), 
noOpResetter)
+        );
+
+        assertEquals(exception, thrown.getCause());

Review Comment:
   Yup, good idea!
   
   I will add for those cases where we do not validate `equals(exception, 
thrown)` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to