This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 64952756ff8 KAFKA-17489: Do not handle failed tasks as tasks to assign
(#17115)
64952756ff8 is described below
commit 64952756ff82efa010ab8d023ede3050b17adc20
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Sep 13 10:41:45 2024 +0200
KAFKA-17489: Do not handle failed tasks as tasks to assign (#17115)
Failed tasks discovered when removed from the state updater during
assignment or revocation are added to the task registry. From there they are
retrieved and handled as normal tasks. This leads to a couple of
IllegalStateExceptions because it breaks some invariants that ensure that only
good tasks are assigned and processed.
This commit solves this bug by distinguish failed from non-failed tasks in
the task registry.
Reviewer: Lucas Brutschy <[email protected]>
---
.../streams/processor/internals/TaskManager.java | 10 +-
.../kafka/streams/processor/internals/Tasks.java | 20 ++++
.../streams/processor/internals/TasksRegistry.java | 4 +
.../processor/internals/TaskManagerTest.java | 133 +++++++++++++++++----
.../streams/processor/internals/TasksTest.java | 44 +++++++
5 files changed, 185 insertions(+), 26 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 62e64878af9..fa0328447fd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -196,7 +196,7 @@ public class TaskManager {
mainConsumer.pause(mainConsumer.assignment());
} else {
// All tasks that are owned by the task manager are ready and do
not need to be paused
- final Set<TopicPartition> partitionsNotToPause = tasks.allTasks()
+ final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedTasks()
.stream()
.flatMap(task -> task.inputPartitions().stream())
.collect(Collectors.toSet());
@@ -523,7 +523,7 @@ public class TaskManager {
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allNonFailedTasks()) {
if (!task.isActive()) {
throw new IllegalStateException("Standby tasks should only be
managed by the state updater, " +
"but standby task " + task.id() + " is managed by the
stream thread");
@@ -685,7 +685,7 @@ public class TaskManager {
final Task task = removedTaskResult.task();
if (removedTaskResult.exception().isPresent()) {
failedTasks.put(task.id(), removedTaskResult.exception().get());
- tasks.addTask(task);
+ tasks.addFailedTask(task);
return null;
}
return task;
@@ -997,7 +997,7 @@ public class TaskManager {
addTaskToStateUpdater(task);
} catch (final RuntimeException e) {
// need to add task back to the bookkeeping to be handled by
the stream thread
- tasks.addTask(task);
+ tasks.addFailedTask(task);
taskExceptions.put(task.id(), e);
}
}
@@ -1029,7 +1029,7 @@ public class TaskManager {
final RuntimeException exception = exceptionAndTask.exception();
final Task failedTask = exceptionAndTask.task();
// need to add task back to the bookkeeping to be handled by the
stream thread
- tasks.addTask(failedTask);
+ tasks.addFailedTask(failedTask);
taskExceptions.put(failedTask.id(), exception);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 54347fd86ab..d75e944efc1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -57,6 +57,7 @@ class Tasks implements TasksRegistry {
private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate
= new HashMap<>();
private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate
= new HashMap<>();
private final Set<Task> pendingTasksToInit = new HashSet<>();
+ private final Set<TaskId> failedTaskIds = new HashSet<>();
// TODO: convert to Stream/StandbyTask when we remove
TaskManager#StateMachineTask with mocks
private final Map<TopicPartition, Task> activeTasksPerPartition = new
HashMap<>();
@@ -177,6 +178,12 @@ class Tasks implements TasksRegistry {
}
}
+ @Override
+ public void addFailedTask(final Task task) {
+ failedTaskIds.add(task.id());
+ addTask(task);
+ }
+
@Override
public synchronized void removeTask(final Task taskToRemove) {
final TaskId taskId = taskToRemove.id();
@@ -195,6 +202,7 @@ class Tasks implements TasksRegistry {
throw new IllegalArgumentException("Attempted to remove a
standby task that is not owned: " + taskId);
}
}
+ failedTaskIds.remove(taskToRemove.id());
}
@Override
@@ -252,6 +260,7 @@ class Tasks implements TasksRegistry {
activeTasksPerId.clear();
standbyTasksPerId.clear();
activeTasksPerPartition.clear();
+ failedTaskIds.clear();
}
// TODO: change return type to `StreamTask`
@@ -308,6 +317,17 @@ class Tasks implements TasksRegistry {
return union(HashSet::new, new HashSet<>(activeTasksPerId.values()),
new HashSet<>(standbyTasksPerId.values()));
}
+ @Override
+ public synchronized Set<Task> allNonFailedTasks() {
+ final Set<Task> nonFailedActiveTasks =
activeTasksPerId.values().stream()
+ .filter(task -> !failedTaskIds.contains(task.id()))
+ .collect(Collectors.toSet());
+ final Set<Task> nonFailedStandbyTasks =
standbyTasksPerId.values().stream()
+ .filter(task -> !failedTaskIds.contains(task.id()))
+ .collect(Collectors.toSet());
+ return union(HashSet::new, nonFailedActiveTasks,
nonFailedStandbyTasks);
+ }
+
@Override
public synchronized Set<TaskId> allTaskIds() {
return union(HashSet::new, activeTasksPerId.keySet(),
standbyTasksPerId.keySet());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 64f24702d2f..20bee575ebc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -51,6 +51,8 @@ public interface TasksRegistry {
void addTask(final Task task);
+ void addFailedTask(final Task task);
+
void removeTask(final Task taskToRemove);
void replaceActiveWithStandby(final StandbyTask standbyTask);
@@ -73,6 +75,8 @@ public interface TasksRegistry {
Set<Task> allTasks();
+ Set<Task> allNonFailedTasks();
+
Map<TaskId, Task> allTasksPerId();
Set<TaskId> allTaskIds();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c6b958beee5..ee8d8f1da72 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -539,7 +539,7 @@ public class TaskManagerTest {
assertEquals("Encounter unexpected fatal error for task " +
failedStandbyTask.id(), exception.getMessage());
assertInstanceOf(RuntimeException.class, exception.getCause());
assertEquals(kaboom.getMessage(), exception.getCause().getMessage());
- verify(tasks).addTask(failedStandbyTask);
+ verify(tasks).addFailedTask(failedStandbyTask);
}
@Test
@@ -705,7 +705,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(reassignedActiveTask));
taskManager.handleAssignment(
mkMap(mkEntry(reassignedActiveTask.id(),
reassignedActiveTask.inputPartitions())),
@@ -718,6 +718,96 @@ public class TaskManagerTest {
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
+ @Test
+ public void
shouldAddFailedActiveTaskToRecycleDuringAssignmentToTaskRegistry() {
+ final StreamTask failedActiveTaskToRecycle = statefulTask(taskId03,
taskId03ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId03Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
when(stateUpdater.getTasks()).thenReturn(mkSet(failedActiveTaskToRecycle));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
+ .thenReturn(CompletableFuture.completedFuture(
+ new StateUpdater.RemovedTaskResult(failedActiveTaskToRecycle,
taskException)
+ ));
+
+ final StreamsException exception = assertThrows(
+ StreamsException.class,
+ () -> taskManager.handleAssignment(
+ Collections.emptyMap(),
+ mkMap(mkEntry(failedActiveTaskToRecycle.id(),
failedActiveTaskToRecycle.inputPartitions()))
+ )
+ );
+
+ assertEquals("Encounter unexpected fatal error for task " +
failedActiveTaskToRecycle.id(), exception.getMessage());
+ assertEquals(taskException, exception.getCause());
+ verify(tasks).addFailedTask(failedActiveTaskToRecycle);
+ verify(tasks, never()).addTask(failedActiveTaskToRecycle);
+ verify(tasks).allNonFailedTasks();
+ verify(standbyTaskCreator,
never()).createStandbyTaskFromActive(failedActiveTaskToRecycle,
taskId03Partitions);
+ }
+
+ @Test
+ public void
shouldAddFailedStandbyTaskToRecycleDuringAssignmentToTaskRegistry() {
+ final StandbyTask failedStandbyTaskToRecycle = standbyTask(taskId03,
taskId03ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId03Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
when(stateUpdater.getTasks()).thenReturn(mkSet(failedStandbyTaskToRecycle));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
+ .thenReturn(CompletableFuture.completedFuture(
+ new StateUpdater.RemovedTaskResult(failedStandbyTaskToRecycle,
taskException)
+ ));
+
+ final StreamsException exception = assertThrows(
+ StreamsException.class,
+ () -> taskManager.handleAssignment(
+ mkMap(mkEntry(failedStandbyTaskToRecycle.id(),
failedStandbyTaskToRecycle.inputPartitions())),
+ Collections.emptyMap()
+ )
+ );
+
+ assertEquals("Encounter unexpected fatal error for task " +
failedStandbyTaskToRecycle.id(), exception.getMessage());
+ assertEquals(taskException, exception.getCause());
+ verify(tasks).addFailedTask(failedStandbyTaskToRecycle);
+ verify(tasks, never()).addTask(failedStandbyTaskToRecycle);
+ verify(tasks).allNonFailedTasks();
+ verify(activeTaskCreator,
never()).createActiveTaskFromStandby(failedStandbyTaskToRecycle,
taskId03Partitions, consumer);
+ }
+
+ @Test
+ public void
shouldAddFailedActiveTasksToReassignWithDifferentInputPartitionsDuringAssignmentToTaskRegistry()
{
+ final StreamTask failedActiveTaskToReassign = statefulTask(taskId03,
taskId03ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId03Partitions).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
when(stateUpdater.getTasks()).thenReturn(mkSet(failedActiveTaskToReassign));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ when(stateUpdater.remove(failedActiveTaskToReassign.id()))
+ .thenReturn(CompletableFuture.completedFuture(
+ new StateUpdater.RemovedTaskResult(failedActiveTaskToReassign,
taskException)
+ ));
+
+ final StreamsException exception = assertThrows(
+ StreamsException.class,
+ () -> taskManager.handleAssignment(
+ mkMap(mkEntry(failedActiveTaskToReassign.id(),
taskId00Partitions)),
+ Collections.emptyMap()
+ )
+ );
+
+ assertEquals("Encounter unexpected fatal error for task " +
failedActiveTaskToReassign.id(), exception.getMessage());
+ assertEquals(taskException, exception.getCause());
+ verify(tasks).addFailedTask(failedActiveTaskToReassign);
+ verify(tasks, never()).addTask(failedActiveTaskToReassign);
+ verify(tasks).allNonFailedTasks();
+ verify(tasks,
never()).updateActiveTaskInputPartitions(failedActiveTaskToReassign,
taskId00Partitions);
+ }
+
@Test
public void
shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRegistry() {
final StreamTask reassignedActiveTask1 = statefulTask(taskId03,
taskId03ChangelogPartitions)
@@ -728,7 +818,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask1));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(reassignedActiveTask1));
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
.thenReturn(CompletableFuture.completedFuture(new
StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
@@ -909,7 +999,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
@@ -953,7 +1043,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToRecycle));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final IllegalStateException illegalStateException = assertThrows(
@@ -976,7 +1066,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
@@ -995,7 +1085,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1015,7 +1105,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions,
newInputPartitions)).thenReturn(true);
taskManager.handleAssignment(
@@ -1035,7 +1125,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1053,7 +1143,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1075,7 +1165,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1100,7 +1190,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToCreate.id(),
activeTaskToCreate.inputPartitions())),
@@ -1176,7 +1266,7 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter)
);
verify(stateUpdater, never()).add(task00);
- verify(tasks).addTask(task00);
+ verify(tasks).addFailedTask(task00);
assertTrue(streamsException.taskId().isPresent());
assertEquals(task00.id(), streamsException.taskId().get());
assertEquals("Encounter unexpected fatal error for task 0_0",
streamsException.getMessage());
@@ -1205,8 +1295,8 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter)
);
- verify(tasks).addTask(statefulTask0);
- verify(tasks).addTask(statefulTask1);
+ verify(tasks).addFailedTask(statefulTask0);
+ verify(tasks).addFailedTask(statefulTask1);
verify(stateUpdater).add(statefulTask2);
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be
re-initialized", thrown.getMessage());
@@ -1327,19 +1417,20 @@ public class TaskManagerTest {
future1.complete(new StateUpdater.RemovedTaskResult(task1));
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new
CompletableFuture<>();
when(stateUpdater.remove(task2.id())).thenReturn(future2);
- final StreamsException streamsException = new
StreamsException("Something happened");
- future2.complete(new StateUpdater.RemovedTaskResult(task2,
streamsException));
+ final RuntimeException taskException = new RuntimeException("Nobody
expects the Spanish inquisition!");
+ future2.complete(new StateUpdater.RemovedTaskResult(task2,
taskException));
final StreamsException thrownException = assertThrows(
StreamsException.class,
() -> taskManager.handleRevocation(union(HashSet::new,
taskId00Partitions, taskId01Partitions))
);
- assertEquals(thrownException, streamsException);
+ assertEquals("Encounter unexpected fatal error for task " +
task2.id(), thrownException.getMessage());
+ assertEquals(thrownException.getCause(), taskException);
verify(task1).suspend();
verify(tasks).addTask(task1);
verify(task2, never()).suspend();
- verify(tasks).addTask(task2);
+ verify(tasks).addFailedTask(task2);
}
@Test
@@ -1654,7 +1745,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(statefulTask0));
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
@@ -1704,7 +1795,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask,
restoringStatefulTask));
- when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
+ when(tasks.allNonFailedTasks()).thenReturn(mkSet(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
makeTaskFolders(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index e9ebf59a800..9eb40ae0124 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -163,4 +163,48 @@ public class TasksTest {
assertTrue(tasks.hasPendingTasksToInit());
assertTrue(tasks.pendingTasksToInit().containsAll(mkSet(standbyTask1,
standbyTask2)));
}
+
+ @Test
+ public void shouldAddFailedTask() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_B_0)).build();
+ final StreamTask activeTask2 = statefulTask(TASK_0_1,
mkSet(TOPIC_PARTITION_B_1)).build();
+ tasks.addTask(activeTask2);
+
+ tasks.addFailedTask(activeTask1);
+
+ assertEquals(activeTask1, tasks.task(TASK_0_0));
+ assertEquals(activeTask2, tasks.task(TASK_0_1));
+ assertTrue(tasks.allTasks().contains(activeTask1));
+ assertTrue(tasks.allTasks().contains(activeTask2));
+ assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedTasks().contains(activeTask2));
+ }
+
+ @Test
+ public void shouldRemoveFailedTask() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_B_0))
+ .inState(State.SUSPENDED).build();
+ tasks.addFailedTask(activeTask1);
+
+ tasks.removeTask(activeTask1);
+ assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+ assertFalse(tasks.allTasks().contains(activeTask1));
+
+ tasks.addTask(activeTask1);
+ assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ }
+
+ @Test
+ public void shouldClearFailedTask() {
+ final StreamTask activeTask1 = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_B_0))
+ .inState(State.SUSPENDED).build();
+ tasks.addFailedTask(activeTask1);
+
+ tasks.clear();
+ assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
+ assertFalse(tasks.allTasks().contains(activeTask1));
+
+ tasks.addTask(activeTask1);
+ assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ }
}
\ No newline at end of file