This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 64952756ff8 KAFKA-17489: Do not handle failed tasks as tasks to assign 
(#17115)
64952756ff8 is described below

commit 64952756ff82efa010ab8d023ede3050b17adc20
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Sep 13 10:41:45 2024 +0200

    KAFKA-17489: Do not handle failed tasks as tasks to assign (#17115)
    
    Failed tasks discovered when removed from the state updater during 
assignment or revocation are added to the task registry. From there they are 
retrieved and handled as normal tasks. This leads to a couple of 
IllegalStateExceptions because it breaks some invariants that ensure that only 
good tasks are assigned and processed.
    
    This commit solves this bug by distinguish failed from non-failed tasks in 
the task registry.
    
    Reviewer: Lucas Brutschy <[email protected]>
---
 .../streams/processor/internals/TaskManager.java   |  10 +-
 .../kafka/streams/processor/internals/Tasks.java   |  20 ++++
 .../streams/processor/internals/TasksRegistry.java |   4 +
 .../processor/internals/TaskManagerTest.java       | 133 +++++++++++++++++----
 .../streams/processor/internals/TasksTest.java     |  44 +++++++
 5 files changed, 185 insertions(+), 26 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 62e64878af9..fa0328447fd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -196,7 +196,7 @@ public class TaskManager {
             mainConsumer.pause(mainConsumer.assignment());
         } else {
             // All tasks that are owned by the task manager are ready and do 
not need to be paused
-            final Set<TopicPartition> partitionsNotToPause = tasks.allTasks()
+            final Set<TopicPartition> partitionsNotToPause = 
tasks.allNonFailedTasks()
                 .stream()
                 .flatMap(task -> task.inputPartitions().stream())
                 .collect(Collectors.toSet());
@@ -523,7 +523,7 @@ public class TaskManager {
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
-        for (final Task task : tasks.allTasks()) {
+        for (final Task task : tasks.allNonFailedTasks()) {
             if (!task.isActive()) {
                 throw new IllegalStateException("Standby tasks should only be 
managed by the state updater, " +
                     "but standby task " + task.id() + " is managed by the 
stream thread");
@@ -685,7 +685,7 @@ public class TaskManager {
         final Task task = removedTaskResult.task();
         if (removedTaskResult.exception().isPresent()) {
             failedTasks.put(task.id(), removedTaskResult.exception().get());
-            tasks.addTask(task);
+            tasks.addFailedTask(task);
             return null;
         }
         return task;
@@ -997,7 +997,7 @@ public class TaskManager {
                 addTaskToStateUpdater(task);
             } catch (final RuntimeException e) {
                 // need to add task back to the bookkeeping to be handled by 
the stream thread
-                tasks.addTask(task);
+                tasks.addFailedTask(task);
                 taskExceptions.put(task.id(), e);
             }
         }
@@ -1029,7 +1029,7 @@ public class TaskManager {
             final RuntimeException exception = exceptionAndTask.exception();
             final Task failedTask = exceptionAndTask.task();
             // need to add task back to the bookkeeping to be handled by the 
stream thread
-            tasks.addTask(failedTask);
+            tasks.addFailedTask(failedTask);
             taskExceptions.put(failedTask.id(), exception);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 54347fd86ab..d75e944efc1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -57,6 +57,7 @@ class Tasks implements TasksRegistry {
     private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate 
= new HashMap<>();
     private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate 
= new HashMap<>();
     private final Set<Task> pendingTasksToInit = new HashSet<>();
+    private final Set<TaskId> failedTaskIds = new HashSet<>();
 
     // TODO: convert to Stream/StandbyTask when we remove 
TaskManager#StateMachineTask with mocks
     private final Map<TopicPartition, Task> activeTasksPerPartition = new 
HashMap<>();
@@ -177,6 +178,12 @@ class Tasks implements TasksRegistry {
         }
     }
 
+    @Override
+    public void addFailedTask(final Task task) {
+        failedTaskIds.add(task.id());
+        addTask(task);
+    }
+
     @Override
     public synchronized void removeTask(final Task taskToRemove) {
         final TaskId taskId = taskToRemove.id();
@@ -195,6 +202,7 @@ class Tasks implements TasksRegistry {
                 throw new IllegalArgumentException("Attempted to remove a 
standby task that is not owned: " + taskId);
             }
         }
+        failedTaskIds.remove(taskToRemove.id());
     }
 
     @Override
@@ -252,6 +260,7 @@ class Tasks implements TasksRegistry {
         activeTasksPerId.clear();
         standbyTasksPerId.clear();
         activeTasksPerPartition.clear();
+        failedTaskIds.clear();
     }
 
     // TODO: change return type to `StreamTask`
@@ -308,6 +317,17 @@ class Tasks implements TasksRegistry {
         return union(HashSet::new, new HashSet<>(activeTasksPerId.values()), 
new HashSet<>(standbyTasksPerId.values()));
     }
 
+    @Override
+    public synchronized Set<Task> allNonFailedTasks() {
+        final Set<Task> nonFailedActiveTasks = 
activeTasksPerId.values().stream()
+            .filter(task -> !failedTaskIds.contains(task.id()))
+            .collect(Collectors.toSet());
+        final Set<Task> nonFailedStandbyTasks = 
standbyTasksPerId.values().stream()
+            .filter(task -> !failedTaskIds.contains(task.id()))
+            .collect(Collectors.toSet());
+        return union(HashSet::new, nonFailedActiveTasks, 
nonFailedStandbyTasks);
+    }
+
     @Override
     public synchronized Set<TaskId> allTaskIds() {
         return union(HashSet::new, activeTasksPerId.keySet(), 
standbyTasksPerId.keySet());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 64f24702d2f..20bee575ebc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -51,6 +51,8 @@ public interface TasksRegistry {
 
     void addTask(final Task task);
 
+    void addFailedTask(final Task task);
+
     void removeTask(final Task taskToRemove);
 
     void replaceActiveWithStandby(final StandbyTask standbyTask);
@@ -73,6 +75,8 @@ public interface TasksRegistry {
 
     Set<Task> allTasks();
 
+    Set<Task> allNonFailedTasks();
+
     Map<TaskId, Task> allTasksPerId();
 
     Set<TaskId> allTaskIds();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c6b958beee5..ee8d8f1da72 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -539,7 +539,7 @@ public class TaskManagerTest {
         assertEquals("Encounter unexpected fatal error for task " + 
failedStandbyTask.id(), exception.getMessage());
         assertInstanceOf(RuntimeException.class, exception.getCause());
         assertEquals(kaboom.getMessage(), exception.getCause().getMessage());
-        verify(tasks).addTask(failedStandbyTask);
+        verify(tasks).addFailedTask(failedStandbyTask);
     }
 
     @Test
@@ -705,7 +705,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask));
+        
when(tasks.allNonFailedTasks()).thenReturn(mkSet(reassignedActiveTask));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(reassignedActiveTask.id(), 
reassignedActiveTask.inputPartitions())),
@@ -718,6 +718,96 @@ public class TaskManagerTest {
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
+    @Test
+    public void 
shouldAddFailedActiveTaskToRecycleDuringAssignmentToTaskRegistry() {
+        final StreamTask failedActiveTaskToRecycle = statefulTask(taskId03, 
taskId03ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId03Partitions).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        
when(stateUpdater.getTasks()).thenReturn(mkSet(failedActiveTaskToRecycle));
+        final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
+        when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
+            .thenReturn(CompletableFuture.completedFuture(
+                new StateUpdater.RemovedTaskResult(failedActiveTaskToRecycle, 
taskException)
+            ));
+
+        final StreamsException exception = assertThrows(
+            StreamsException.class,
+            () -> taskManager.handleAssignment(
+                Collections.emptyMap(),
+                mkMap(mkEntry(failedActiveTaskToRecycle.id(), 
failedActiveTaskToRecycle.inputPartitions()))
+            )
+        );
+
+        assertEquals("Encounter unexpected fatal error for task " + 
failedActiveTaskToRecycle.id(), exception.getMessage());
+        assertEquals(taskException, exception.getCause());
+        verify(tasks).addFailedTask(failedActiveTaskToRecycle);
+        verify(tasks, never()).addTask(failedActiveTaskToRecycle);
+        verify(tasks).allNonFailedTasks();
+        verify(standbyTaskCreator, 
never()).createStandbyTaskFromActive(failedActiveTaskToRecycle, 
taskId03Partitions);
+    }
+
+    @Test
+    public void 
shouldAddFailedStandbyTaskToRecycleDuringAssignmentToTaskRegistry() {
+        final StandbyTask failedStandbyTaskToRecycle = standbyTask(taskId03, 
taskId03ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId03Partitions).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        
when(stateUpdater.getTasks()).thenReturn(mkSet(failedStandbyTaskToRecycle));
+        final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
+        when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
+            .thenReturn(CompletableFuture.completedFuture(
+                new StateUpdater.RemovedTaskResult(failedStandbyTaskToRecycle, 
taskException)
+            ));
+
+        final StreamsException exception = assertThrows(
+            StreamsException.class,
+            () -> taskManager.handleAssignment(
+                mkMap(mkEntry(failedStandbyTaskToRecycle.id(), 
failedStandbyTaskToRecycle.inputPartitions())),
+                Collections.emptyMap()
+            )
+        );
+
+        assertEquals("Encounter unexpected fatal error for task " + 
failedStandbyTaskToRecycle.id(), exception.getMessage());
+        assertEquals(taskException, exception.getCause());
+        verify(tasks).addFailedTask(failedStandbyTaskToRecycle);
+        verify(tasks, never()).addTask(failedStandbyTaskToRecycle);
+        verify(tasks).allNonFailedTasks();
+        verify(activeTaskCreator, 
never()).createActiveTaskFromStandby(failedStandbyTaskToRecycle, 
taskId03Partitions, consumer);
+    }
+
+    @Test
+    public void 
shouldAddFailedActiveTasksToReassignWithDifferentInputPartitionsDuringAssignmentToTaskRegistry()
 {
+        final StreamTask failedActiveTaskToReassign = statefulTask(taskId03, 
taskId03ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId03Partitions).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        
when(stateUpdater.getTasks()).thenReturn(mkSet(failedActiveTaskToReassign));
+        final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
+        when(stateUpdater.remove(failedActiveTaskToReassign.id()))
+            .thenReturn(CompletableFuture.completedFuture(
+                new StateUpdater.RemovedTaskResult(failedActiveTaskToReassign, 
taskException)
+            ));
+
+        final StreamsException exception = assertThrows(
+            StreamsException.class,
+            () -> taskManager.handleAssignment(
+                mkMap(mkEntry(failedActiveTaskToReassign.id(), 
taskId00Partitions)),
+                Collections.emptyMap()
+            )
+        );
+
+        assertEquals("Encounter unexpected fatal error for task " + 
failedActiveTaskToReassign.id(), exception.getMessage());
+        assertEquals(taskException, exception.getCause());
+        verify(tasks).addFailedTask(failedActiveTaskToReassign);
+        verify(tasks, never()).addTask(failedActiveTaskToReassign);
+        verify(tasks).allNonFailedTasks();
+        verify(tasks, 
never()).updateActiveTaskInputPartitions(failedActiveTaskToReassign, 
taskId00Partitions);
+    }
+
     @Test
     public void 
shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRegistry() {
         final StreamTask reassignedActiveTask1 = statefulTask(taskId03, 
taskId03ChangelogPartitions)
@@ -728,7 +818,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask1));
+        
when(tasks.allNonFailedTasks()).thenReturn(mkSet(reassignedActiveTask1));
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask2));
         when(stateUpdater.remove(reassignedActiveTask2.id()))
             .thenReturn(CompletableFuture.completedFuture(new 
StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
@@ -909,7 +999,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions)
             .inState(State.CREATED).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
             .thenReturn(standbyTask);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
@@ -953,7 +1043,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
+        
when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToRecycle));
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
         final IllegalStateException illegalStateException = assertThrows(
@@ -976,7 +1066,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
@@ -995,7 +1085,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToClose));
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -1015,7 +1105,7 @@ public class TaskManagerTest {
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
+        
when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
         
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, 
newInputPartitions)).thenReturn(true);
 
         taskManager.handleAssignment(
@@ -1035,7 +1125,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToResume));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
@@ -1053,7 +1143,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToResume));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
@@ -1075,7 +1165,7 @@ public class TaskManagerTest {
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
+        
when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -1100,7 +1190,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToClose));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions())),
@@ -1176,7 +1266,7 @@ public class TaskManagerTest {
             () -> taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter)
         );
         verify(stateUpdater, never()).add(task00);
-        verify(tasks).addTask(task00);
+        verify(tasks).addFailedTask(task00);
         assertTrue(streamsException.taskId().isPresent());
         assertEquals(task00.id(), streamsException.taskId().get());
         assertEquals("Encounter unexpected fatal error for task 0_0", 
streamsException.getMessage());
@@ -1205,8 +1295,8 @@ public class TaskManagerTest {
             () -> taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter)
         );
 
-        verify(tasks).addTask(statefulTask0);
-        verify(tasks).addTask(statefulTask1);
+        verify(tasks).addFailedTask(statefulTask0);
+        verify(tasks).addFailedTask(statefulTask1);
         verify(stateUpdater).add(statefulTask2);
         assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
         assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be 
re-initialized", thrown.getMessage());
@@ -1327,19 +1417,20 @@ public class TaskManagerTest {
         future1.complete(new StateUpdater.RemovedTaskResult(task1));
         final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new 
CompletableFuture<>();
         when(stateUpdater.remove(task2.id())).thenReturn(future2);
-        final StreamsException streamsException = new 
StreamsException("Something happened");
-        future2.complete(new StateUpdater.RemovedTaskResult(task2, 
streamsException));
+        final RuntimeException taskException = new RuntimeException("Nobody 
expects the Spanish inquisition!");
+        future2.complete(new StateUpdater.RemovedTaskResult(task2, 
taskException));
 
         final StreamsException thrownException = assertThrows(
             StreamsException.class,
             () -> taskManager.handleRevocation(union(HashSet::new, 
taskId00Partitions, taskId01Partitions))
         );
 
-        assertEquals(thrownException, streamsException);
+        assertEquals("Encounter unexpected fatal error for task " + 
task2.id(), thrownException.getMessage());
+        assertEquals(thrownException.getCause(), taskException);
         verify(task1).suspend();
         verify(tasks).addTask(task1);
         verify(task2, never()).suspend();
-        verify(tasks).addTask(task2);
+        verify(tasks).addFailedTask(task2);
     }
 
     @Test
@@ -1654,7 +1745,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(statefulTask0));
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
         when(consumer.assignment()).thenReturn(assigned);
 
@@ -1704,7 +1795,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, 
restoringStatefulTask));
-        when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
+        when(tasks.allNonFailedTasks()).thenReturn(mkSet(runningStatefulTask));
         expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
         expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
         makeTaskFolders(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index e9ebf59a800..9eb40ae0124 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -163,4 +163,48 @@ public class TasksTest {
         assertTrue(tasks.hasPendingTasksToInit());
         assertTrue(tasks.pendingTasksToInit().containsAll(mkSet(standbyTask1, 
standbyTask2)));
     }
+
+    @Test
+    public void shouldAddFailedTask() {
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_B_0)).build();
+        final StreamTask activeTask2 = statefulTask(TASK_0_1, 
mkSet(TOPIC_PARTITION_B_1)).build();
+        tasks.addTask(activeTask2);
+
+        tasks.addFailedTask(activeTask1);
+
+        assertEquals(activeTask1, tasks.task(TASK_0_0));
+        assertEquals(activeTask2, tasks.task(TASK_0_1));
+        assertTrue(tasks.allTasks().contains(activeTask1));
+        assertTrue(tasks.allTasks().contains(activeTask2));
+        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+        assertTrue(tasks.allNonFailedTasks().contains(activeTask2));
+    }
+
+    @Test
+    public void shouldRemoveFailedTask() {
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_B_0))
+            .inState(State.SUSPENDED).build();
+        tasks.addFailedTask(activeTask1);
+
+        tasks.removeTask(activeTask1);
+        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+        assertFalse(tasks.allTasks().contains(activeTask1));
+
+        tasks.addTask(activeTask1);
+        assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+    }
+
+    @Test
+    public void shouldClearFailedTask() {
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_B_0))
+            .inState(State.SUSPENDED).build();
+        tasks.addFailedTask(activeTask1);
+
+        tasks.clear();
+        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+        assertFalse(tasks.allTasks().contains(activeTask1));
+
+        tasks.addTask(activeTask1);
+        assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+    }
 }
\ No newline at end of file

Reply via email to