guozhangwang commented on code in PR #12519: URL: https://github.com/apache/kafka/pull/12519#discussion_r959853208
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java: ########## @@ -57,10 +57,10 @@ public interface TasksRegistry { void addNewActiveTasks(final Collection<Task> newTasks); - void addNewActiveTask(final Task task); - void addNewStandbyTasks(final Collection<Task> newTasks); Review Comment: Yes I agree :) I'm actually thinking about some more aggressive change in the future, since as we complete the state updater, the task manager / task registry would then only handle running active tasks, and there's no restoring active and standby tasks ever in this class, we can further slimmer this class and even consider inlining it. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -877,6 +877,8 @@ private void initializeAndRestorePhase() { // transit to restore active is idempotent so we can call it multiple times changelogReader.enforceRestoreActive(); + taskManager.tryHandleExceptionsFromStateUpdater(); Review Comment: Ack, we can do that. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -229,34 +229,36 @@ boolean handleCorruption(final Set<TaskId> corruptedTasks) { private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, final boolean markAsCorrupted) { for (final Task task : taskWithChangelogs) { - final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions(); + if (task.state() != State.CLOSED) { + final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions(); - // mark corrupted partitions to not be checkpointed, and then close the task as dirty - if (markAsCorrupted) { - task.markChangelogAsCorrupted(corruptedPartitions); - } + // mark corrupted partitions to not be checkpointed, and then close the task as dirty + // TODO: this step should be removed as we complete migrating to state updater + if (markAsCorrupted && stateUpdater == null) { Review Comment: I have thought about that and put a JIRA ticket about exception handling for task corrupted after we've done state updater, and just to copy-paste my thoughts below: ``` Polling thread: RecordCollector → Producer.send(callback)'s callback gets a retriable exception: We cannot simply retry sending as it would change the ordering appended to the topics. This can be thrown in the following case Task.suspend → RecordCollector.flush() → checkException() Task.closeClean → RecordCollector.closeClean() → checkException() (? Honestly I think it should never reach this since if there’s an error it should be found in the previous flush call?) Normally, when RecordCollector.forward() → checkException() ``` We would handle it as fine-grained TaskEmitOrderCorruptedException. This exception does not need to mark any changelogs as corrupted. (EOS) checkpoint not found when registering stores: This means we are unclear which snapshot the local state stores represent and hence cannot restore from a specific position. We would throw it as fine-grained TaskStateCorruptedException. As for the handling logic: 1) Mark the task as corrupted and hence not be scheduled for any further processing anymore. 2) At the beginning of each loop, check if there are any tasks marked as corrupted (either by itself, or sent back from the restore thread, see below). If there are any: 3) Commit for all other non-corrupted tasks first. 4) Revive those corrupted tasks by first close-dirty (which will wipe the state stores under EOS), and then convert to created mode. ``` Restore thread: Restore consumer throws invalid offset exception. E.g. out-of-range, log truncated, no-offset-for-partition We would throw it as fine-grained TaskChangelogCorruptedException. ``` As for handling: as we did here, we would mark changelog as corrupted and delete the topic partition entry in the checkpoint file inside the state updater, and then remove the task in the state updater and send the exception back to the polling thread to handle. ``` EOS: When initializing the task, we found there's no checkpoint file. ``` Here I think we should just handle it directly inside the function to wipe out store and re-initialize, rather than throw an exception all the way up to stream thread to handle. As a result of the above, the ONLY case that we should mark changelog as corrupted would be for this case. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -667,6 +672,25 @@ private void addTasksToStateUpdater() { } } + public void tryHandleExceptionsFromStateUpdater() { + if (stateUpdater != null) { + final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>(); + + for (final StateUpdater.ExceptionAndTasks exceptionAndTasks : stateUpdater.drainExceptionsAndFailedTasks()) { + final RuntimeException exception = exceptionAndTasks.exception(); + final Set<Task> failedTasks = exceptionAndTasks.getTasks(); + + for (final Task failedTask : failedTasks) { + // need to add task back to the bookkeeping to be handled by the stream thread + tasks.addTask(failedTask); + taskExceptions.put(failedTask.id(), exception); + } + } + + maybeThrowTaskExceptions(taskExceptions); Review Comment: The motivation for `maybeThrowTaskExceptions` is that 1) if there's exceptions that would cause "all is lost", then we can directly throw that one since we are going to close-dirty all tasks; 2) otherwise, then there's only task-corrupted, then we should handle them at once. If we do not do 2) then we need to bookkeep the list of "yet handled corrupted exception" to let the thread throw and handle one by one, which would be quite complicated.. I agree that doing two transformations here is a bit awkward, we can instead just do the quoted logic 1/2 here inside the function directly (from state updater, we should either see fatal streams exception or task corrupted only), I'm doing this just for paranoid code-deduping. On the lose log information, yeah I totally agree with you. After thinking about that a bit I think we can just log the stack trace as we group the task corrupted exception, wdyt? BTW in the future we would finer-categorize different causes of task-corrupted as different exceptions so this logic is likely going to be temporary only. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -872,14 +872,92 @@ public void shouldHandleMultipleRestoredTasks() { taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter); - Mockito.verify(tasks).addNewActiveTask(taskToTransitToRunning); + Mockito.verify(tasks).addTask(taskToTransitToRunning); Mockito.verify(stateUpdater).add(recycledStandbyTask); Mockito.verify(stateUpdater).add(recycledStandbyTask); Mockito.verify(taskToCloseClean).closeClean(); Mockito.verify(taskToCloseDirty).closeDirty(); Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), isNull()); } + @Test + public void shouldRethrowStreamsExceptionFromStateUpdater() { + final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final StreamsException exception = new StreamsException("boom!"); + final StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks( + Collections.singleton(statefulTask), + exception + ); + when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + + final StreamsException thrown = assertThrows( + StreamsException.class, + () -> taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter) + ); + + assertEquals(exception, thrown); + assertEquals(statefulTask.id(), thrown.taskId().get()); + } + + @Test + public void shouldRethrowRuntimeExceptionFromStateUpdater() { + final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final RuntimeException exception = new RuntimeException("boom!"); + final StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks( + Collections.singleton(statefulTask), + exception + ); + when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + + final StreamsException thrown = assertThrows( + StreamsException.class, + () -> taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter) + ); + + assertEquals(exception, thrown.getCause()); Review Comment: Yup, good idea! I will add for those cases where we do not validate `equals(exception, thrown)` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org