cadonna commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r904948682


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -161,41 +159,62 @@ private void handleTaskCorruptedException(final 
TaskCorruptedException taskCorru
             final Set<TaskId> corruptedTaskIds = 
taskCorruptedException.corruptedTasks();
             final Set<Task> corruptedTasks = new HashSet<>();
             for (final TaskId taskId : corruptedTaskIds) {
-                final Task corruptedTask = updatingTasks.remove(taskId);
+                final Task corruptedTask = updatingTasks.get(taskId);
                 if (corruptedTask == null) {
                     throw new IllegalStateException("Task " + taskId + " is 
corrupted but is not updating. " + BUG_ERROR_MESSAGE);
                 }
                 corruptedTasks.add(corruptedTask);
             }
-            exceptionsAndFailedTasks.add(new ExceptionAndTasks(corruptedTasks, 
taskCorruptedException));
+            addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new 
ExceptionAndTasks(corruptedTasks, taskCorruptedException));
         }
 
         private void handleStreamsException(final StreamsException 
streamsException) {
             log.info("Encountered streams exception: ", streamsException);
-            final ExceptionAndTasks exceptionAndTasks;
             if (streamsException.taskId().isPresent()) {
-                exceptionAndTasks = 
handleStreamsExceptionWithTask(streamsException);
+                handleStreamsExceptionWithTask(streamsException);
             } else {
-                exceptionAndTasks = 
handleStreamsExceptionWithoutTask(streamsException);
+                handleStreamsExceptionWithoutTask(streamsException);
             }
-            exceptionsAndFailedTasks.add(exceptionAndTasks);
         }
 
-        private ExceptionAndTasks handleStreamsExceptionWithTask(final 
StreamsException streamsException) {
+        private void handleStreamsExceptionWithTask(final StreamsException 
streamsException) {
             final TaskId failedTaskId = streamsException.taskId().get();
             if (!updatingTasks.containsKey(failedTaskId)) {
                 throw new IllegalStateException("Task " + failedTaskId + " 
failed but is not updating. " + BUG_ERROR_MESSAGE);
             }
             final Set<Task> failedTask = new HashSet<>();
             failedTask.add(updatingTasks.get(failedTaskId));
-            updatingTasks.remove(failedTaskId);
-            return new ExceptionAndTasks(failedTask, streamsException);
+            addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new 
ExceptionAndTasks(failedTask, streamsException));
+        }
+
+        private void handleStreamsExceptionWithoutTask(final StreamsException 
streamsException) {
+            addToExceptionsAndFailedTasksThenClearUpdatingTasks(
+                new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), 
streamsException));
         }
 
-        private ExceptionAndTasks handleStreamsExceptionWithoutTask(final 
StreamsException streamsException) {
-            final ExceptionAndTasks exceptionAndTasks = new 
ExceptionAndTasks(new HashSet<>(updatingTasks.values()), streamsException);
+        // It is important to remove the corrupted tasks from the updating 
tasks after they were added to the
+        // failed tasks.
+        // This ensures that all tasks are found in 
DefaultStateUpdater#getTasks().
+        private void 
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(final 
ExceptionAndTasks exceptionAndTasks) {
+            addToExceptionsAndFailedTasks(exceptionAndTasks);
+            
exceptionAndTasks.getTasks().stream().map(Task::id).forEach(updatingTasks::remove);
+            if (onlyStandbyTasksLeft()) {

Review Comment:
   Thanks! How could I miss that?!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to