cadonna commented on code in PR #12519: URL: https://github.com/apache/kafka/pull/12519#discussion_r959304723
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -872,14 +872,92 @@ public void shouldHandleMultipleRestoredTasks() { taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter); - Mockito.verify(tasks).addNewActiveTask(taskToTransitToRunning); + Mockito.verify(tasks).addTask(taskToTransitToRunning); Mockito.verify(stateUpdater).add(recycledStandbyTask); Mockito.verify(stateUpdater).add(recycledStandbyTask); Mockito.verify(taskToCloseClean).closeClean(); Mockito.verify(taskToCloseDirty).closeDirty(); Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), isNull()); } + @Test + public void shouldRethrowStreamsExceptionFromStateUpdater() { + final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final StreamsException exception = new StreamsException("boom!"); + final StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks( + Collections.singleton(statefulTask), + exception + ); + when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + + final StreamsException thrown = assertThrows( + StreamsException.class, + () -> taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter) + ); + + assertEquals(exception, thrown); + assertEquals(statefulTask.id(), thrown.taskId().get()); + } + + @Test + public void shouldRethrowRuntimeExceptionFromStateUpdater() { + final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final RuntimeException exception = new RuntimeException("boom!"); + final StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks( + Collections.singleton(statefulTask), + exception + ); + when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + + final StreamsException thrown = assertThrows( + StreamsException.class, + () -> taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter) + ); + + assertEquals(exception, thrown.getCause()); Review Comment: I think here it would be good also to verify the exception message since it should be "First unexpected error for task ...", AFAIU. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -229,34 +229,36 @@ boolean handleCorruption(final Set<TaskId> corruptedTasks) { private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, final boolean markAsCorrupted) { for (final Task task : taskWithChangelogs) { - final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions(); + if (task.state() != State.CLOSED) { + final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions(); - // mark corrupted partitions to not be checkpointed, and then close the task as dirty - if (markAsCorrupted) { - task.markChangelogAsCorrupted(corruptedPartitions); - } + // mark corrupted partitions to not be checkpointed, and then close the task as dirty + // TODO: this step should be removed as we complete migrating to state updater + if (markAsCorrupted && stateUpdater == null) { Review Comment: Are you sure about this condition and the ToDo? A `TaskCorruptedException` can also be thrown outside the state updater when the task is not restoring. Maybe we should remove the checkpointing due to corruption from the state updater since at the moment we mark all changelogs as corrupted also there? We can create a ticket for the improvement you are proposing of only marking as corrupted the actual corrupted changelogs. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java: ########## @@ -57,10 +57,10 @@ public interface TasksRegistry { void addNewActiveTasks(final Collection<Task> newTasks); - void addNewActiveTask(final Task task); - void addNewStandbyTasks(final Collection<Task> newTasks); Review Comment: I think in future we can consolidate `addNewActiveTasks()` and `addNewStandbyTasks()` into `addTasks()`. Just an idea, no action needed for this PR! ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -322,40 +324,46 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, final Map<TaskId, RuntimeException> taskCloseExceptions = closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean); - throwTaskExceptions(taskCloseExceptions); + maybeThrowTaskExceptions(taskCloseExceptions); createNewTasks(activeTasksToCreate, standbyTasksToCreate); } - private void throwTaskExceptions(final Map<TaskId, RuntimeException> taskExceptions) { + // if at least one of the exception is a task-migrated exception, then directly throw since it indicates all tasks are lost + // if at least one of the exception is a streams exception, then directly throw since it should be handled by thread's handler + // if at least one of the exception is a non-streams exception, then wrap and throw since it should be handled by thread's handler + // otherwise, all the exceptions are task-corrupted, then merge their tasks and throw a single one + // TODO: move task-corrupted and task-migrated out of the public errors package since they are internal errors and always be + // handled by Streams library itself + private void maybeThrowTaskExceptions(final Map<TaskId, RuntimeException> taskExceptions) { if (!taskExceptions.isEmpty()) { log.error("Get exceptions for the following tasks: {}", taskExceptions); + final TaskCorruptedException allTaskCorrupts = new TaskCorruptedException(new HashSet<>()); for (final Map.Entry<TaskId, RuntimeException> entry : taskExceptions.entrySet()) { - if (!(entry.getValue() instanceof TaskMigratedException)) { - final TaskId taskId = entry.getKey(); - final RuntimeException exception = entry.getValue(); - if (exception instanceof StreamsException) { + final TaskId taskId = entry.getKey(); + final RuntimeException exception = entry.getValue(); + + if (exception instanceof StreamsException) { + if (exception instanceof TaskMigratedException) { + throw entry.getValue(); + } else if (exception instanceof TaskCorruptedException) { + log.warn("Encounter corrupted task" + taskId + ", will group it with other corrupted tasks " + + "and handle together", exception); + allTaskCorrupts.corruptedTasks().add(taskId); + } else { ((StreamsException) exception).setTaskId(taskId); throw exception; - } else if (exception instanceof KafkaException) { - throw new StreamsException(exception, taskId); - } else { - throw new StreamsException( - "Unexpected failure to close " + taskExceptions.size() + - " task(s) [" + taskExceptions.keySet() + "]. " + - "First unexpected exception (for task " + taskId + ") follows.", - exception, - taskId - ); + Review Comment: nit: remove line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org