This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 4f0675d5e92 KAFKA-17489: Do not handle failed tasks as tasks to assign
(#17115)
4f0675d5e92 is described below
commit 4f0675d5e925bb8f7ad66d14ca7c5d955e777c71
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Sep 13 10:41:45 2024 +0200
KAFKA-17489: Do not handle failed tasks as tasks to assign (#17115)
Failed tasks discovered when removed from the state updater during
assignment or revocation are added to the task registry. From there they are
retrieved and handled as normal tasks. This leads to a couple of
IllegalStateExceptions because it breaks some invariants that ensure that only
good tasks are assigned and processed.
This commit solves this bug by distinguish failed from non-failed tasks in
the task registry.
Reviewer: Lucas Brutschy <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 10 +-
.../kafka/streams/processor/internals/Tasks.java | 20 ++++
.../streams/processor/internals/TasksRegistry.java | 4 +
.../processor/internals/TaskManagerTest.java | 133 +++++++++++++++++----
.../streams/processor/internals/TasksTest.java | 44 +++++++
5 files changed, 185 insertions(+), 26 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bde092cfe49..d6a0280e06e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -197,7 +197,7 @@ public class TaskManager {
mainConsumer.pause(mainConsumer.assignment());
} else {
// All tasks that are owned by the task manager are ready and do
not need to be paused
- final Set<TopicPartition> partitionsNotToPause = tasks.allTasks()
+ final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedTasks()
.stream()
.flatMap(task -> task.inputPartitions().stream())
.collect(Collectors.toSet());
@@ -524,7 +524,7 @@ public class TaskManager {
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allNonFailedTasks()) {
if (!task.isActive()) {
throw new IllegalStateException("Standby tasks should only be
managed by the state updater, " +
"but standby task " + task.id() + " is managed by the
stream thread");
@@ -686,7 +686,7 @@ public class TaskManager {
final Task task = removedTaskResult.task();
if (removedTaskResult.exception().isPresent()) {
failedTasks.put(task.id(), removedTaskResult.exception().get());
- tasks.addTask(task);
+ tasks.addFailedTask(task);
return null;
}
return task;
@@ -998,7 +998,7 @@ public class TaskManager {
addTaskToStateUpdater(task);
} catch (final RuntimeException e) {
// need to add task back to the bookkeeping to be handled by
the stream thread
- tasks.addTask(task);
+ tasks.addFailedTask(task);
taskExceptions.put(task.id(), e);
}
}
@@ -1030,7 +1030,7 @@ public class TaskManager {
final RuntimeException exception = exceptionAndTask.exception();
final Task failedTask = exceptionAndTask.task();
// need to add task back to the bookkeeping to be handled by the
stream thread
- tasks.addTask(failedTask);
+ tasks.addFailedTask(failedTask);
taskExceptions.put(failedTask.id(), exception);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index e30333aabfb..92dd07ba974 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -58,6 +58,7 @@ class Tasks implements TasksRegistry {
private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate
= new HashMap<>();
private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate
= new HashMap<>();
private final Set<Task> pendingTasksToInit = new HashSet<>();
+ private final Set<TaskId> failedTaskIds = new HashSet<>();
// TODO: convert to Stream/StandbyTask when we remove
TaskManager#StateMachineTask with mocks
private final Map<TopicPartition, Task> activeTasksPerPartition = new
HashMap<>();
@@ -178,6 +179,12 @@ class Tasks implements TasksRegistry {
}
}
+ @Override
+ public void addFailedTask(final Task task) {
+ failedTaskIds.add(task.id());
+ addTask(task);
+ }
+
@Override
public synchronized void removeTask(final Task taskToRemove) {
final TaskId taskId = taskToRemove.id();
@@ -196,6 +203,7 @@ class Tasks implements TasksRegistry {
throw new IllegalArgumentException("Attempted to remove a
standby task that is not owned: " + taskId);
}
}
+ failedTaskIds.remove(taskToRemove.id());
}
@Override
@@ -253,6 +261,7 @@ class Tasks implements TasksRegistry {
activeTasksPerId.clear();
standbyTasksPerId.clear();
activeTasksPerPartition.clear();
+ failedTaskIds.clear();
}
// TODO: change return type to `StreamTask`
@@ -309,6 +318,17 @@ class Tasks implements TasksRegistry {
return union(HashSet::new, new HashSet<>(activeTasksPerId.values()),
new HashSet<>(standbyTasksPerId.values()));
}
+ @Override
+ public synchronized Set<Task> allNonFailedTasks() {
+ final Set<Task> nonFailedActiveTasks =
activeTasksPerId.values().stream()
+ .filter(task -> !failedTaskIds.contains(task.id()))
+ .collect(Collectors.toSet());
+ final Set<Task> nonFailedStandbyTasks =
standbyTasksPerId.values().stream()
+ .filter(task -> !failedTaskIds.contains(task.id()))
+ .collect(Collectors.toSet());
+ return union(HashSet::new, nonFailedActiveTasks,
nonFailedStandbyTasks);
+ }
+
@Override
public synchronized Set<TaskId> allTaskIds() {
return union(HashSet::new, activeTasksPerId.keySet(),
standbyTasksPerId.keySet());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 64f24702d2f..20bee575ebc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -51,6 +51,8 @@ public interface TasksRegistry {
void addTask(final Task task);
+ void addFailedTask(final Task task);
+
void removeTask(final Task taskToRemove);
void replaceActiveWithStandby(final StandbyTask standbyTask);
@@ -73,6 +75,8 @@ public interface TasksRegistry {
Set<Task> allTasks();
+ Set<Task> allNonFailedTasks();
+
Map<TaskId, Task> allTasksPerId();
Set<TaskId> allTaskIds();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 0ae179a914d..1dc4cf00b6a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -536,7 +536,7 @@ public class TaskManagerTest {
assertEquals("Encounter unexpected fatal error for task " +
failedStandbyTask.id(), exception.getMessage());
assertInstanceOf(RuntimeException.class, exception.getCause());
assertEquals(kaboom.getMessage(), exception.getCause().getMessage());
- verify(tasks).addTask(failedStandbyTask);
+ verify(tasks).addFailedTask(failedStandbyTask);
}
@Test
@@ -702,7 +702,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(reassignedActiveTask));
taskManager.handleAssignment(
mkMap(mkEntry(reassignedActiveTask.id(),
reassignedActiveTask.inputPartitions())),
@@ -715,6 +715,96 @@ public class TaskManagerTest {
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
+ @Test
+ public void
shouldAddFailedActiveTaskToRecycleDuringAssignmentToTaskRegistry() {
+ final StreamTask failedActiveTaskToRecycle = statefulTask(taskId03,
taskId03ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId03Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
when(stateUpdater.getTasks()).thenReturn(mkSet(failedActiveTaskToRecycle));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
+ .thenReturn(CompletableFuture.completedFuture(
+ new StateUpdater.RemovedTaskResult(failedActiveTaskToRecycle,
taskException)
+ ));
+
+ final StreamsException exception = assertThrows(
+ StreamsException.class,
+ () -> taskManager.handleAssignment(
+ Collections.emptyMap(),
+ mkMap(mkEntry(failedActiveTaskToRecycle.id(),
failedActiveTaskToRecycle.inputPartitions()))
+ )
+ );
+
+ assertEquals("Encounter unexpected fatal error for task " +
failedActiveTaskToRecycle.id(), exception.getMessage());
+ assertEquals(taskException, exception.getCause());
+ verify(tasks).addFailedTask(failedActiveTaskToRecycle);
+ verify(tasks, never()).addTask(failedActiveTaskToRecycle);
+ verify(tasks).allNonFailedTasks();
+ verify(standbyTaskCreator,
never()).createStandbyTaskFromActive(failedActiveTaskToRecycle,
taskId03Partitions);
+ }
+
+ @Test
+ public void
shouldAddFailedStandbyTaskToRecycleDuringAssignmentToTaskRegistry() {
+ final StandbyTask failedStandbyTaskToRecycle = standbyTask(taskId03,
taskId03ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId03Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
when(stateUpdater.getTasks()).thenReturn(mkSet(failedStandbyTaskToRecycle));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
+ .thenReturn(CompletableFuture.completedFuture(
+ new StateUpdater.RemovedTaskResult(failedStandbyTaskToRecycle,
taskException)
+ ));
+
+ final StreamsException exception = assertThrows(
+ StreamsException.class,
+ () -> taskManager.handleAssignment(
+ mkMap(mkEntry(failedStandbyTaskToRecycle.id(),
failedStandbyTaskToRecycle.inputPartitions())),
+ Collections.emptyMap()
+ )
+ );
+
+ assertEquals("Encounter unexpected fatal error for task " +
failedStandbyTaskToRecycle.id(), exception.getMessage());
+ assertEquals(taskException, exception.getCause());
+ verify(tasks).addFailedTask(failedStandbyTaskToRecycle);
+ verify(tasks, never()).addTask(failedStandbyTaskToRecycle);
+ verify(tasks).allNonFailedTasks();
+ verify(activeTaskCreator,
never()).createActiveTaskFromStandby(failedStandbyTaskToRecycle,
taskId03Partitions, consumer);
+ }
+
+ @Test
+ public void
shouldAddFailedActiveTasksToReassignWithDifferentInputPartitionsDuringAssignmentToTaskRegistry()
{
+ final StreamTask failedActiveTaskToReassign = statefulTask(taskId03,
taskId03ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId03Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
when(stateUpdater.getTasks()).thenReturn(mkSet(failedActiveTaskToReassign));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ when(stateUpdater.remove(failedActiveTaskToReassign.id()))
+ .thenReturn(CompletableFuture.completedFuture(
+ new StateUpdater.RemovedTaskResult(failedActiveTaskToReassign,
taskException)
+ ));
+
+ final StreamsException exception = assertThrows(
+ StreamsException.class,
+ () -> taskManager.handleAssignment(
+ mkMap(mkEntry(failedActiveTaskToReassign.id(),
taskId00Partitions)),
+ Collections.emptyMap()
+ )
+ );
+
+ assertEquals("Encounter unexpected fatal error for task " +
failedActiveTaskToReassign.id(), exception.getMessage());
+ assertEquals(taskException, exception.getCause());
+ verify(tasks).addFailedTask(failedActiveTaskToReassign);
+ verify(tasks, never()).addTask(failedActiveTaskToReassign);
+ verify(tasks).allNonFailedTasks();
+ verify(tasks,
never()).updateActiveTaskInputPartitions(failedActiveTaskToReassign,
taskId00Partitions);
+ }
+
@Test
public void
shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRegistry() {
final StreamTask reassignedActiveTask1 = statefulTask(taskId03,
taskId03ChangelogPartitions)
@@ -725,7 +815,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask1));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(reassignedActiveTask1));
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
.thenReturn(CompletableFuture.completedFuture(new
StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
@@ -906,7 +996,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
@@ -950,7 +1040,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToRecycle));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final IllegalStateException illegalStateException = assertThrows(
@@ -973,7 +1063,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
@@ -992,7 +1082,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1012,7 +1102,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions,
newInputPartitions)).thenReturn(true);
taskManager.handleAssignment(
@@ -1032,7 +1122,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1050,7 +1140,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1072,7 +1162,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1097,7 +1187,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToCreate.id(),
activeTaskToCreate.inputPartitions())),
@@ -1173,7 +1263,7 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter)
);
verify(stateUpdater, never()).add(task00);
- verify(tasks).addTask(task00);
+ verify(tasks).addFailedTask(task00);
assertTrue(streamsException.taskId().isPresent());
assertEquals(task00.id(), streamsException.taskId().get());
assertEquals("Encounter unexpected fatal error for task 0_0",
streamsException.getMessage());
@@ -1202,8 +1292,8 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter)
);
- verify(tasks).addTask(statefulTask0);
- verify(tasks).addTask(statefulTask1);
+ verify(tasks).addFailedTask(statefulTask0);
+ verify(tasks).addFailedTask(statefulTask1);
verify(stateUpdater).add(statefulTask2);
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be
re-initialized", thrown.getMessage());
@@ -1324,19 +1414,20 @@ public class TaskManagerTest {
future1.complete(new StateUpdater.RemovedTaskResult(task1));
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new
CompletableFuture<>();
when(stateUpdater.remove(task2.id())).thenReturn(future2);
- final StreamsException streamsException = new
StreamsException("Something happened");
- future2.complete(new StateUpdater.RemovedTaskResult(task2,
streamsException));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ future2.complete(new StateUpdater.RemovedTaskResult(task2,
taskException));
final StreamsException thrownException = assertThrows(
StreamsException.class,
() -> taskManager.handleRevocation(union(HashSet::new,
taskId00Partitions, taskId01Partitions))
);
- assertEquals(thrownException, streamsException);
+ assertEquals("Encounter unexpected fatal error for task " +
task2.id(), thrownException.getMessage());
+ assertEquals(thrownException.getCause(), taskException);
verify(task1).suspend();
verify(tasks).addTask(task1);
verify(task2, never()).suspend();
- verify(tasks).addTask(task2);
+ verify(tasks).addFailedTask(task2);
}
@Test
@@ -1651,7 +1742,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(statefulTask0));
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
@@ -1701,7 +1792,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask,
restoringStatefulTask));
- when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
makeTaskFolders(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 0f572a548e4..0620dcfb006 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -164,4 +164,48 @@ public class TasksTest {
assertTrue(tasks.hasPendingTasksToInit());
assertTrue(tasks.pendingTasksToInit().containsAll(mkSet(standbyTask1,
standbyTask2)));
}
+
+ @Test
+ public void shouldAddFailedTask() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_B_0)).build();
+ final StreamTask activeTask2 = statefulTask(TASK_0_1,
mkSet(TOPIC_PARTITION_B_1)).build();
+ tasks.addTask(activeTask2);
+
+ tasks.addFailedTask(activeTask1);
+
+ assertEquals(activeTask1, tasks.task(TASK_0_0));
+ assertEquals(activeTask2, tasks.task(TASK_0_1));
+ assertTrue(tasks.allTasks().contains(activeTask1));
+ assertTrue(tasks.allTasks().contains(activeTask2));
+ assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedTasks().contains(activeTask2));
+ }
+
+ @Test
+ public void shouldRemoveFailedTask() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_B_0))
+ .inState(State.SUSPENDED).build();
+ tasks.addFailedTask(activeTask1);
+
+ tasks.removeTask(activeTask1);
+ assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+ assertFalse(tasks.allTasks().contains(activeTask1));
+
+ tasks.addTask(activeTask1);
+ assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ }
+
+ @Test
+ public void shouldClearFailedTask() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_B_0))
+ .inState(State.SUSPENDED).build();
+ tasks.addFailedTask(activeTask1);
+
+ tasks.clear();
+ assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+ assertFalse(tasks.allTasks().contains(activeTask1));
+
+ tasks.addTask(activeTask1);
+ assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ }
}