cadonna commented on code in PR #12519:
URL: https://github.com/apache/kafka/pull/12519#discussion_r959304723


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -872,14 +872,92 @@ public void shouldHandleMultipleRestoredTasks() {
 
         taskManager.tryToCompleteRestoration(time.milliseconds(), 
noOpResetter);
 
-        Mockito.verify(tasks).addNewActiveTask(taskToTransitToRunning);
+        Mockito.verify(tasks).addTask(taskToTransitToRunning);
         Mockito.verify(stateUpdater).add(recycledStandbyTask);
         Mockito.verify(stateUpdater).add(recycledStandbyTask);
         Mockito.verify(taskToCloseClean).closeClean();
         Mockito.verify(taskToCloseDirty).closeDirty();
         
Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions),
 isNull());
     }
 
+    @Test
+    public void shouldRethrowStreamsExceptionFromStateUpdater() {
+        final StreamTask statefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamsException exception = new StreamsException("boom!");
+        final StateUpdater.ExceptionAndTasks exceptionAndTasks = new 
StateUpdater.ExceptionAndTasks(
+            Collections.singleton(statefulTask),
+            exception
+        );
+        
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        final StreamsException thrown = assertThrows(
+            StreamsException.class,
+            () -> taskManager.tryToCompleteRestoration(time.milliseconds(), 
noOpResetter)
+        );
+
+        assertEquals(exception, thrown);
+        assertEquals(statefulTask.id(), thrown.taskId().get());
+    }
+
+    @Test
+    public void shouldRethrowRuntimeExceptionFromStateUpdater() {
+        final StreamTask statefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final RuntimeException exception = new RuntimeException("boom!");
+        final StateUpdater.ExceptionAndTasks exceptionAndTasks = new 
StateUpdater.ExceptionAndTasks(
+            Collections.singleton(statefulTask),
+            exception
+        );
+        
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        final StreamsException thrown = assertThrows(
+            StreamsException.class,
+            () -> taskManager.tryToCompleteRestoration(time.milliseconds(), 
noOpResetter)
+        );
+
+        assertEquals(exception, thrown.getCause());

Review Comment:
   I think here it would be good also to verify the exception message since it 
should be "First unexpected error for task ...", AFAIU.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -229,34 +229,36 @@ boolean handleCorruption(final Set<TaskId> 
corruptedTasks) {
 
     private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, final boolean markAsCorrupted) {
         for (final Task task : taskWithChangelogs) {
-            final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
+            if (task.state() != State.CLOSED) {
+                final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
 
-            // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-            if (markAsCorrupted) {
-                task.markChangelogAsCorrupted(corruptedPartitions);
-            }
+                // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
+                // TODO: this step should be removed as we complete migrating 
to state updater
+                if (markAsCorrupted && stateUpdater == null) {

Review Comment:
   Are you sure about this condition and the ToDo? A `TaskCorruptedException` 
can also be thrown outside the state updater when the task is not restoring.
   Maybe we should remove the checkpointing due to corruption from the state 
updater since at the moment we mark all changelogs as corrupted also there? We 
can create a ticket for the improvement you are proposing of only marking as 
corrupted the actual corrupted changelogs. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java:
##########
@@ -57,10 +57,10 @@ public interface TasksRegistry {
 
     void addNewActiveTasks(final Collection<Task> newTasks);
 
-    void addNewActiveTask(final Task task);
-
     void addNewStandbyTasks(final Collection<Task> newTasks);

Review Comment:
   I think in future we can consolidate `addNewActiveTasks()` and 
`addNewStandbyTasks()` into `addTasks()`. 
   Just an idea, no action needed for this PR!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -322,40 +324,46 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         final Map<TaskId, RuntimeException> taskCloseExceptions = 
closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean);
 
-        throwTaskExceptions(taskCloseExceptions);
+        maybeThrowTaskExceptions(taskCloseExceptions);
 
         createNewTasks(activeTasksToCreate, standbyTasksToCreate);
     }
 
-    private void throwTaskExceptions(final Map<TaskId, RuntimeException> 
taskExceptions) {
+    // if at least one of the exception is a task-migrated exception, then 
directly throw since it indicates all tasks are lost
+    // if at least one of the exception is a streams exception, then directly 
throw since it should be handled by thread's handler
+    // if at least one of the exception is a non-streams exception, then wrap 
and throw since it should be handled by thread's handler
+    // otherwise, all the exceptions are task-corrupted, then merge their 
tasks and throw a single one
+    // TODO: move task-corrupted and task-migrated out of the public errors 
package since they are internal errors and always be
+    //       handled by Streams library itself
+    private void maybeThrowTaskExceptions(final Map<TaskId, RuntimeException> 
taskExceptions) {
         if (!taskExceptions.isEmpty()) {
             log.error("Get exceptions for the following tasks: {}", 
taskExceptions);
 
+            final TaskCorruptedException allTaskCorrupts = new 
TaskCorruptedException(new HashSet<>());
             for (final Map.Entry<TaskId, RuntimeException> entry : 
taskExceptions.entrySet()) {
-                if (!(entry.getValue() instanceof TaskMigratedException)) {
-                    final TaskId taskId = entry.getKey();
-                    final RuntimeException exception = entry.getValue();
-                    if (exception instanceof StreamsException) {
+                final TaskId taskId = entry.getKey();
+                final RuntimeException exception = entry.getValue();
+
+                if (exception instanceof StreamsException) {
+                    if (exception instanceof TaskMigratedException) {
+                        throw entry.getValue();
+                    } else if (exception instanceof TaskCorruptedException) {
+                        log.warn("Encounter corrupted task" + taskId + ", will 
group it with other corrupted tasks " +
+                            "and handle together", exception);
+                        allTaskCorrupts.corruptedTasks().add(taskId);
+                    } else {
                         ((StreamsException) exception).setTaskId(taskId);
                         throw exception;
-                    } else if (exception instanceof KafkaException) {
-                        throw new StreamsException(exception, taskId);
-                    } else {
-                        throw new StreamsException(
-                            "Unexpected failure to close " + 
taskExceptions.size() +
-                                " task(s) [" + taskExceptions.keySet() + "]. " 
+
-                                "First unexpected exception (for task " + 
taskId + ") follows.",
-                            exception,
-                            taskId
-                        );
+

Review Comment:
   nit: remove line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to