This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f568932e45d MINOR: rename TaskRegistry methods to better reflect their
purpose. (#21448)
f568932e45d is described below
commit f568932e45dd055bc7b95b3f2f0e3ca09ca0ee57
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Tue Feb 17 11:43:12 2026 -0800
MINOR: rename TaskRegistry methods to better reflect their purpose. (#21448)
Changed the name of method that work only with initialized tasks(not
pending) to better reflect their purpose.
Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../streams/processor/internals/TaskExecutor.java | 6 +-
.../streams/processor/internals/TaskManager.java | 54 +++---
.../kafka/streams/processor/internals/Tasks.java | 25 +--
.../streams/processor/internals/TasksRegistry.java | 22 +--
.../internals/tasks/DefaultTaskManager.java | 16 +-
.../processor/internals/TaskExecutorTest.java | 4 +-
.../processor/internals/TaskManagerTest.java | 202 ++++++++++-----------
.../streams/processor/internals/TasksTest.java | 52 +++---
.../internals/tasks/DefaultTaskManagerTest.java | 74 ++++----
9 files changed, 228 insertions(+), 227 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 91deab0dd9d..8294bf407d0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -68,7 +68,7 @@ public class TaskExecutor {
int totalProcessed = 0;
Task lastProcessed = null;
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
final long now = time.milliseconds();
try {
if (executionMetadata.canProcessTask(task, now)) {
@@ -233,7 +233,7 @@ public class TaskExecutor {
private void updateTaskCommitMetadata(final Map<TopicPartition,
OffsetAndMetadata> allOffsets) {
if (!allOffsets.isEmpty()) {
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (task instanceof StreamTask) {
for (final TopicPartition topicPartition :
task.inputPartitions()) {
if (allOffsets.containsKey(topicPartition)) {
@@ -261,7 +261,7 @@ public class TaskExecutor {
int punctuate() {
int punctuated = 0;
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 8e6433c0090..f91663cf0c8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -197,7 +197,7 @@ public class TaskManager {
// we should pause consumer only within the listener since
// before then the assignment has not been updated yet.
// All tasks that are owned by the task manager are ready and do not
need to be paused
- final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedTasks()
+ final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedInitializedTasks()
.stream()
.flatMap(task -> task.inputPartitions().stream())
.collect(Collectors.toSet());
@@ -214,7 +214,7 @@ public class TaskManager {
* @throws TaskMigratedException
*/
boolean handleCorruption(final Set<TaskId> corruptedTasks) {
- final Set<TaskId> activeTasks = new HashSet<>(tasks.activeTaskIds());
+ final Set<TaskId> activeTasks = new
HashSet<>(tasks.activeInitializedTaskIds());
// We need to stop all processing, since we need to commit
non-corrupted tasks as well.
maybeLockTasks(activeTasks);
@@ -223,7 +223,7 @@ public class TaskManager {
final Set<Task> corruptedStandbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
for (final TaskId taskId : corruptedTasks) {
- final Task task = tasks.task(taskId);
+ final Task task = tasks.initializedTask(taskId);
if (task.isActive()) {
corruptedActiveTasks.add(task);
} else {
@@ -237,7 +237,7 @@ public class TaskManager {
// We need to commit before closing the corrupted active tasks since
this will force the ongoing txn to abort
try {
- final Collection<Task> tasksToCommit = tasks.allTasksPerId()
+ final Collection<Task> tasksToCommit =
tasks.allInitializedTasksPerId()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING)
@@ -247,10 +247,10 @@ public class TaskManager {
} catch (final TaskCorruptedException e) {
log.info("Some additional tasks were found corrupted while trying
to commit, these will be added to the " +
"tasks to clean and revive: {}", e.corruptedTasks());
- corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+
corruptedActiveTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
} catch (final TimeoutException e) {
log.info("Hit TimeoutException when committing all non-corrupted
tasks, these will be closed and revived");
- final Collection<Task> uncorruptedTasks = new
HashSet<>(tasks.activeTasks());
+ final Collection<Task> uncorruptedTasks = new
HashSet<>(tasks.activeInitializedTasks());
uncorruptedTasks.removeAll(corruptedActiveTasks);
// Those tasks which just timed out can just be closed dirty
without marking changelogs as corrupted
closeDirtyAndRevive(uncorruptedTasks, false);
@@ -366,7 +366,7 @@ public class TaskManager {
final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
final Set<TaskId> tasksToLock =
- tasks.allTaskIds().stream()
+ tasks.allInitializedTaskIds().stream()
.filter(x -> activeTasksToCreate.containsKey(x) ||
standbyTasksToCreate.containsKey(x))
.collect(Collectors.toSet());
@@ -533,7 +533,7 @@ public class TaskManager {
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
- for (final Task task : tasks.allNonFailedTasks()) {
+ for (final Task task : tasks.allNonFailedInitializedTasks()) {
if (!task.isActive()) {
throw new IllegalStateException("Standby tasks should only be
managed by the state updater, " +
"but standby task " + task.id() + " is managed by the
stream thread");
@@ -733,7 +733,7 @@ public class TaskManager {
while (iter.hasNext()) {
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
final TaskId taskId = entry.getKey();
- final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
+ final boolean taskIsOwned =
tasks.allInitializedTaskIds().contains(taskId)
|| (stateUpdater.tasks().stream().anyMatch(task ->
task.id().equals(taskId)));
if (taskId.topologyName() != null && !taskIsOwned &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
log.info("Cannot create the assigned task {} since it's
topology name cannot be recognized, will put it " +
@@ -1079,7 +1079,7 @@ public class TaskManager {
e.corruptedTasks());
// If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
- dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+ dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
closeDirtyAndRevive(dirtyTasks, true);
} catch (final TimeoutException e) {
log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
@@ -1193,8 +1193,8 @@ public class TaskManager {
}
private void closeRunningTasksDirty() {
- final Set<Task> allTask = tasks.allTasks();
- final Set<TaskId> allTaskIds = tasks.allTaskIds();
+ final Set<Task> allTask = tasks.allInitializedTasks();
+ final Set<TaskId> allTaskIds = tasks.allInitializedTaskIds();
maybeLockTasks(allTaskIds);
for (final Task task : allTask) {
// Even though we've apparently dropped out of the group, we can
continue safely to maintain our
@@ -1413,10 +1413,10 @@ public class TaskManager {
// TODO: change type to `StreamTask`
final Set<Task> activeTasks = new
TreeSet<>(Comparator.comparing(Task::id));
- activeTasks.addAll(tasks.activeTasks());
+ activeTasks.addAll(tasks.activeInitializedTasks());
// TODO: change type to `StandbyTask`
final Set<Task> standbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
- standbyTasks.addAll(tasks.standbyTasks());
+ standbyTasks.addAll(tasks.standbyInitializedTasks());
final Set<Task> pendingActiveTasks =
tasks.drainPendingActiveTasksToInit();
activeTasks.addAll(pendingActiveTasks);
@@ -1670,7 +1670,7 @@ public class TaskManager {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
final Map<TaskId, Task> ret =
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
- ret.putAll(tasks.allTasksPerId());
+ ret.putAll(tasks.allInitializedTasksPerId());
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
x -> x)));
return ret;
}
@@ -1683,19 +1683,19 @@ public class TaskManager {
Map<TaskId, Task> allRunningTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
- return tasks.allTasksPerId();
+ return tasks.allInitializedTasksPerId();
}
Set<Task> readOnlyAllTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
- ret.addAll(tasks.allTasks());
+ ret.addAll(tasks.allInitializedTasks());
return Collections.unmodifiableSet(ret);
}
Map<TaskId, Task> notPausedTasks() {
- return Collections.unmodifiableMap(tasks.allTasks()
+ return Collections.unmodifiableMap(tasks.allInitializedTasks()
.stream()
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toMap(Task::id, v -> v)));
@@ -1721,7 +1721,7 @@ public class TaskManager {
}
private Stream<Task> activeRunningTaskStream() {
- return tasks.allTasks().stream().filter(Task::isActive);
+ return tasks.allInitializedTasks().stream().filter(Task::isActive);
}
Map<TaskId, Task> standbyTaskMap() {
@@ -1733,7 +1733,7 @@ public class TaskManager {
}
private Stream<Task> standbyTaskStream() {
- final Stream<Task> standbyTasksInTaskRegistry =
tasks.allTasks().stream().filter(t -> !t.isActive());
+ final Stream<Task> standbyTasksInTaskRegistry =
tasks.allInitializedTasks().stream().filter(t -> !t.isActive());
return Stream.concat(
stateUpdater.standbyTasks().stream(),
standbyTasksInTaskRegistry
@@ -1741,7 +1741,7 @@ public class TaskManager {
}
// For testing only.
int commitAll() {
- return commit(tasks.allTasks());
+ return commit(tasks.allInitializedTasks());
}
/**
@@ -1749,7 +1749,7 @@ public class TaskManager {
* the corresponding record queues have capacity (again).
*/
public void resumePollingForPartitionsWithAvailableSpace() {
- for (final Task t: tasks.activeTasks()) {
+ for (final Task t: tasks.activeInitializedTasks()) {
t.resumePollingForPartitionsWithAvailableSpace();
}
}
@@ -1758,7 +1758,7 @@ public class TaskManager {
* Fetches up-to-date lag information from the consumer.
*/
public void updateLags() {
- for (final Task t: tasks.activeTasks()) {
+ for (final Task t: tasks.activeInitializedTasks()) {
t.updateLags();
}
}
@@ -1808,7 +1808,7 @@ public class TaskManager {
}
private Task getActiveTask(final TopicPartition partition) {
- final Task activeTask = tasks.activeTasksForInputPartition(partition);
+ final Task activeTask =
tasks.activeInitializedTasksForInputPartition(partition);
if (activeTask == null) {
log.error("Unable to locate active task for received-record
partition {}. Current tasks: {}",
@@ -1912,7 +1912,7 @@ public class TaskManager {
}
public void updateTaskEndMetadata(final TopicPartition topicPartition,
final Long offset) {
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (task instanceof StreamTask) {
if (task.inputPartitions().contains(topicPartition)) {
((StreamTask) task).updateEndOffsets(topicPartition,
offset);
@@ -1943,7 +1943,7 @@ public class TaskManager {
try {
final Set<Task> activeTasksToRemove = new
TreeSet<>(Comparator.comparing(Task::id));
final Set<Task> standbyTasksToRemove = new
TreeSet<>(Comparator.comparing(Task::id));
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allInitializedTasks()) {
if
(!currentNamedTopologies.contains(task.id().topologyName())) {
if (task.isActive()) {
activeTasksToRemove.add(task);
@@ -2032,7 +2032,7 @@ public class TaskManager {
stringBuilder.append("TaskManager\n");
stringBuilder.append(indent).append("\tMetadataState:\n");
stringBuilder.append(indent).append("\tTasks:\n");
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allInitializedTasks()) {
stringBuilder.append(indent)
.append("\t\t")
.append(task.id())
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 4af90d181c6..2e6175446ad 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -280,6 +280,7 @@ class Tasks implements TasksRegistry {
@Override
public synchronized void clear() {
pendingTasksToInit.clear();
+ pendingTasksToClose.clear();
pendingActiveTasksToCreate.clear();
pendingStandbyTasksToCreate.clear();
activeTasksPerId.clear();
@@ -290,7 +291,7 @@ class Tasks implements TasksRegistry {
// TODO: change return type to `StreamTask`
@Override
- public Task activeTasksForInputPartition(final TopicPartition partition) {
+ public Task activeInitializedTasksForInputPartition(final TopicPartition
partition) {
return activeTasksPerPartition.get(partition);
}
@@ -305,7 +306,7 @@ class Tasks implements TasksRegistry {
}
@Override
- public Task task(final TaskId taskId) {
+ public Task initializedTask(final TaskId taskId) {
final Task task = getTask(taskId);
if (task != null)
@@ -315,26 +316,26 @@ class Tasks implements TasksRegistry {
}
@Override
- public Collection<Task> tasks(final Collection<TaskId> taskIds) {
+ public Collection<Task> initializedTasks(final Collection<TaskId> taskIds)
{
final Set<Task> tasks = new HashSet<>();
for (final TaskId taskId : taskIds) {
- tasks.add(task(taskId));
+ tasks.add(initializedTask(taskId));
}
return tasks;
}
@Override
- public synchronized Collection<TaskId> activeTaskIds() {
+ public synchronized Collection<TaskId> activeInitializedTaskIds() {
return Collections.unmodifiableCollection(activeTasksPerId.keySet());
}
@Override
- public synchronized Collection<Task> activeTasks() {
+ public synchronized Collection<Task> activeInitializedTasks() {
return Collections.unmodifiableCollection(activeTasksPerId.values());
}
@Override
- public synchronized Collection<Task> standbyTasks() {
+ public synchronized Collection<Task> standbyInitializedTasks() {
return Collections.unmodifiableCollection(standbyTasksPerId.values());
}
@@ -343,12 +344,12 @@ class Tasks implements TasksRegistry {
* and the returned task could be modified by other threads concurrently
*/
@Override
- public synchronized Set<Task> allTasks() {
+ public synchronized Set<Task> allInitializedTasks() {
return union(HashSet::new, new HashSet<>(activeTasksPerId.values()),
new HashSet<>(standbyTasksPerId.values()));
}
@Override
- public synchronized Set<Task> allNonFailedTasks() {
+ public synchronized Set<Task> allNonFailedInitializedTasks() {
final Set<Task> nonFailedActiveTasks =
activeTasksPerId.values().stream()
.filter(task -> !failedTaskIds.contains(task.id()))
.collect(Collectors.toSet());
@@ -359,12 +360,12 @@ class Tasks implements TasksRegistry {
}
@Override
- public synchronized Set<TaskId> allTaskIds() {
+ public synchronized Set<TaskId> allInitializedTaskIds() {
return union(HashSet::new, activeTasksPerId.keySet(),
standbyTasksPerId.keySet());
}
@Override
- public synchronized Map<TaskId, Task> allTasksPerId() {
+ public synchronized Map<TaskId, Task> allInitializedTasksPerId() {
final Map<TaskId, Task> ret = new HashMap<>();
ret.putAll(activeTasksPerId);
ret.putAll(standbyTasksPerId);
@@ -372,7 +373,7 @@ class Tasks implements TasksRegistry {
}
@Override
- public boolean contains(final TaskId taskId) {
+ public boolean containsInitialized(final TaskId taskId) {
return getTask(taskId) != null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 6099efb5bc7..4e037fbd6f6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -69,25 +69,25 @@ public interface TasksRegistry {
void clear();
- Task activeTasksForInputPartition(final TopicPartition partition);
+ Task activeInitializedTasksForInputPartition(final TopicPartition
partition);
- Task task(final TaskId taskId);
+ Task initializedTask(final TaskId taskId);
- Collection<Task> tasks(final Collection<TaskId> taskIds);
+ Collection<Task> initializedTasks(final Collection<TaskId> taskIds);
- Collection<TaskId> activeTaskIds();
+ Collection<TaskId> activeInitializedTaskIds();
- Collection<Task> activeTasks();
+ Collection<Task> activeInitializedTasks();
- Collection<Task> standbyTasks();
+ Collection<Task> standbyInitializedTasks();
- Set<Task> allTasks();
+ Set<Task> allInitializedTasks();
- Set<Task> allNonFailedTasks();
+ Set<Task> allNonFailedInitializedTasks();
- Map<TaskId, Task> allTasksPerId();
+ Map<TaskId, Task> allInitializedTasksPerId();
- Set<TaskId> allTaskIds();
+ Set<TaskId> allInitializedTaskIds();
- boolean contains(final TaskId taskId);
+ boolean containsInitialized(final TaskId taskId);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 51ec80d05ac..2259f7768c2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -102,7 +102,7 @@ public final class DefaultTaskManager implements
TaskManager {
}
// the most naive scheduling algorithm for now: give the next
unlocked, unassigned, and processable task
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds()) &&
@@ -126,7 +126,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public void awaitProcessableTasks(final Supplier<Boolean> isShuttingDown)
throws InterruptedException {
final boolean interrupted = returnWithTasksLocked(() -> {
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds()) &&
@@ -200,7 +200,7 @@ public final class DefaultTaskManager implements
TaskManager {
final Set<TaskId> remainingTaskIds = new
ConcurrentSkipListSet<>(taskIds);
for (final TaskId taskId : taskIds) {
- final Task task = tasks.task(taskId);
+ final Task task = tasks.initializedTask(taskId);
if (task == null) {
throw new IllegalArgumentException("Trying to lock task "
+ taskId + " but it's not owned");
@@ -243,7 +243,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public KafkaFuture<Void> lockAllTasks() {
return returnWithTasksLocked(() ->
-
lockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet()))
+
lockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet()))
);
}
@@ -263,7 +263,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public void unlockAllTasks() {
- executeWithTasksLocked(() ->
unlockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
+ executeWithTasksLocked(() ->
unlockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet())));
}
@Override
@@ -290,11 +290,11 @@ public final class DefaultTaskManager implements
TaskManager {
throw new IllegalArgumentException("The task to remove is not
locked yet by the task manager");
}
- if (!tasks.contains(taskId)) {
+ if (!tasks.containsInitialized(taskId)) {
throw new IllegalArgumentException("The task to remove is not
owned by the task manager");
}
- tasks.removeTask(tasks.task(taskId));
+ tasks.removeTask(tasks.initializedTask(taskId));
});
log.info("Removed task {} from the task manager", taskId);
@@ -302,7 +302,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public Set<ReadOnlyTask> getTasks() {
- return returnWithTasksLocked(() ->
tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
+ return returnWithTasksLocked(() ->
tasks.activeInitializedTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 8d9ef70c5e3..568998931c9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -38,7 +38,7 @@ public class TaskExecutorTest {
final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager,
metadata, new LogContext());
taskExecutor.punctuate();
- verify(tasks).activeTasks();
+ verify(tasks).activeInitializedTasks();
}
@Test
@@ -59,4 +59,4 @@ public class TaskExecutorTest {
verify(producer).commitTransaction(Collections.emptyMap(),
groupMetadata);
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 86e732185fb..92d150096a7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -239,8 +239,8 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
- when(tasks.task(taskId00)).thenReturn(activeTask1);
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
+ when(tasks.initializedTask(taskId00)).thenReturn(activeTask1);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -274,7 +274,7 @@ public class TaskManagerTest {
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -297,7 +297,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(activeTask1,
activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -333,7 +333,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1,
activeTask2));
taskManager.resumePollingForPartitionsWithAvailableSpace();
@@ -351,7 +351,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1,
activeTask2));
taskManager.updateLags();
@@ -630,7 +630,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
mkMap(mkEntry(reassignedActiveTask.id(),
reassignedActiveTask.inputPartitions())),
@@ -669,7 +669,7 @@ public class TaskManagerTest {
assertEquals(taskException, exception.getCause());
verify(tasks).addFailedTask(failedActiveTaskToRecycle);
verify(tasks, never()).addTask(failedActiveTaskToRecycle);
- verify(tasks).allNonFailedTasks();
+ verify(tasks).allNonFailedInitializedTasks();
verify(standbyTaskCreator,
never()).createStandbyTaskFromActive(failedActiveTaskToRecycle,
taskId03Partitions);
}
@@ -699,7 +699,7 @@ public class TaskManagerTest {
assertEquals(taskException, exception.getCause());
verify(tasks).addFailedTask(failedStandbyTaskToRecycle);
verify(tasks, never()).addTask(failedStandbyTaskToRecycle);
- verify(tasks).allNonFailedTasks();
+ verify(tasks).allNonFailedInitializedTasks();
verify(activeTaskCreator,
never()).createActiveTaskFromStandby(failedStandbyTaskToRecycle,
taskId03Partitions, consumer);
}
@@ -729,7 +729,7 @@ public class TaskManagerTest {
assertEquals(taskException, exception.getCause());
verify(tasks).addFailedTask(failedActiveTaskToReassign);
verify(tasks, never()).addTask(failedActiveTaskToReassign);
- verify(tasks).allNonFailedTasks();
+ verify(tasks).allNonFailedInitializedTasks();
verify(tasks,
never()).updateActiveTaskInputPartitions(failedActiveTaskToReassign,
taskId00Partitions);
}
@@ -743,7 +743,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask1));
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
.thenReturn(CompletableFuture.completedFuture(new
StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
@@ -848,7 +848,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
runningActiveTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
runningActiveTask)));
when(tasks.pendingTasksToInit()).thenReturn(Set.of(activeTaskToInit));
assertEquals(
taskManager.allTasks(),
@@ -871,7 +871,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
activeTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
activeTask)));
assertEquals(taskManager.allRunningTasks(), mkMap(mkEntry(taskId03,
activeTask)));
}
@@ -923,7 +923,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -943,7 +943,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final IllegalStateException illegalStateException = assertThrows(
@@ -966,7 +966,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
@@ -984,7 +984,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1004,7 +1004,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions,
newInputPartitions)).thenReturn(true);
taskManager.handleAssignment(
@@ -1024,7 +1024,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1042,7 +1042,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1064,7 +1064,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1089,7 +1089,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToCreate.id(),
activeTaskToCreate.inputPartitions())),
@@ -1770,7 +1770,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(statefulTask0));
final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
@@ -1792,9 +1792,9 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask,
restoringStatefulTask));
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
makeTaskFolders(
@@ -1826,7 +1826,7 @@ public class TaskManagerTest {
when(runningStatefulTask.changelogOffsets()).thenReturn(changelogOffsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
assertThat(
taskManager.taskOffsetSums(),
@@ -1912,7 +1912,7 @@ public class TaskManagerTest {
.thenReturn(mkMap(mkEntry(t1p2changelog,
changelogOffsetOfRestoringStandbyTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask,
restoringStatefulTask));
assertThat(
@@ -1937,7 +1937,7 @@ public class TaskManagerTest {
));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
assertThat(
@@ -2009,7 +2009,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
task)));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -2069,7 +2069,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -2090,7 +2090,7 @@ public class TaskManagerTest {
doThrow(new RuntimeException("KABOOM!")).when(task00).closeClean();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -2121,8 +2121,8 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
- when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(2);
taskFolders.add(new
TaskDirectory(testFolder.resolve(taskId00.toString()).toFile(), null));
@@ -2183,8 +2183,8 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
- when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
+ when(tasks.initializedTask(taskId03)).thenReturn(corruptedActiveTask);
+ when(tasks.initializedTask(taskId02)).thenReturn(corruptedStandbyTask);
taskManager.handleCorruption(Set.of(corruptedActiveTask.id(),
corruptedStandbyTask.id()));
@@ -2209,9 +2209,9 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(task00);
- when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+ when(tasks.initializedTask(taskId00)).thenReturn(task00);
+
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00,
task00));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
when(task00.prepareCommit(false)).thenReturn(emptyMap());
doNothing().when(task00).postCommit(anyBoolean());
@@ -2241,9 +2241,9 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(task00);
- when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+ when(tasks.initializedTask(taskId00)).thenReturn(task00);
+
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00,
task00));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
when(task00.prepareCommit(false)).thenReturn(emptyMap());
when(task00.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
@@ -2276,12 +2276,12 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedTask);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedTask);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedTask),
mkEntry(taskId01, nonCorruptedTask)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
when(nonCorruptedTask.commitNeeded()).thenReturn(true);
when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
@@ -2315,8 +2315,8 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02,
corruptedTask)));
- when(tasks.task(taskId02)).thenReturn(corruptedTask);
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId02,
corruptedTask)));
+ when(tasks.initializedTask(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(consumer.assignment()).thenReturn(intersection(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
@@ -2340,12 +2340,12 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedStandby);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedStandby);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedStandby),
mkEntry(taskId01, runningNonCorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId01));
when(runningNonCorruptedActive.commitNeeded()).thenReturn(true);
when(runningNonCorruptedActive.prepareCommit(true))
@@ -2386,12 +2386,12 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedActive);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedActive),
mkEntry(taskId01, uncorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
when(uncorruptedActive.commitNeeded()).thenReturn(true);
when(uncorruptedActive.prepareCommit(true)).thenReturn(emptyMap());
@@ -2430,12 +2430,12 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedActive);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedActive),
mkEntry(taskId01, uncorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2446,7 +2446,7 @@ public class TaskManagerTest {
// mock uncorrupted task to indicate that it needs commit and will
return offsets
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
-
when(tasks.tasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
+
when(tasks.initializedTasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
when(uncorruptedActive.commitNeeded()).thenReturn(true);
when(uncorruptedActive.prepareCommit(true)).thenReturn(offsets);
when(uncorruptedActive.prepareCommit(false)).thenReturn(emptyMap());
@@ -2511,13 +2511,13 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedActive);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedActive),
mkEntry(taskId01, uncorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
- when(tasks.activeTasks()).thenReturn(Set.of(corruptedActive,
uncorruptedActive));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
+
when(tasks.activeInitializedTasks()).thenReturn(Set.of(corruptedActive,
uncorruptedActive));
// we need to mock uncorrupted task to indicate that it needs commit
and will return offsets
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -2591,7 +2591,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
when(consumer.assignment()).thenReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
@@ -2677,8 +2677,8 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
- when(tasks.tasks(Set.of(taskId00,
taskId01))).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+ when(tasks.initializedTasks(Set.of(taskId00,
taskId01))).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2791,7 +2791,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
when(tasks.hasPendingTasksToInit()).thenReturn(false);
@@ -2827,7 +2827,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final Set<TopicPartition> newPartitionsSet = Set.of(t1p1);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
when(tasks.hasPendingTasksToInit()).thenReturn(false);
when(tasks.updateActiveTaskInputPartitions(task00,
newPartitionsSet)).thenReturn(true);
@@ -2963,7 +2963,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -3007,7 +3007,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task10));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02, task10));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -3099,7 +3099,7 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(offsets01);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02, task03));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3150,7 +3150,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
when(task00.commitNeeded()).thenReturn(false);
when(task01.commitNeeded()).thenReturn(true); // only task01 needs
commit
@@ -3181,7 +3181,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(stateUpdater.tasks()).thenReturn(Set.of(task01));
final Map<TaskId, Set<TopicPartition>> assignmentActive =
singletonMap(taskId00, taskId00Partitions);
@@ -3215,7 +3215,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(stateUpdater.tasks()).thenReturn(Set.of(task01));
// mock to remove standby task from state updater
@@ -3283,7 +3283,7 @@ public class TaskManagerTest {
doThrow(new RuntimeException("KABOOM!")).when(task00).suspend();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3323,7 +3323,7 @@ public class TaskManagerTest {
doThrow(new RuntimeException("oops"))
.when(task02).suspend();
- when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
final RuntimeException exception = assertThrows(
RuntimeException.class,
@@ -3360,7 +3360,7 @@ public class TaskManagerTest {
doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final RuntimeException exception = assertThrows(
RuntimeException.class,
@@ -3460,7 +3460,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
final RuntimeException thrown = assertThrows(RuntimeException.class,
() -> taskManager.handleRevocation(union(HashSet::new,
taskId01Partitions, taskId02Partitions)));
@@ -3497,7 +3497,7 @@ public class TaskManagerTest {
.when(task02).suspend();
doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
- when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
taskManager.shutdown(false);
@@ -3530,7 +3530,7 @@ public class TaskManagerTest {
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask00)).thenReturn(Set.of());
when(stateUpdater.standbyTasks()).thenReturn(Set.of(standbyTask00));
- when(tasks.standbyTasks()).thenReturn(Set.of(standbyTask00));
+
when(tasks.standbyInitializedTasks()).thenReturn(Set.of(standbyTask00));
final CompletableFuture<StateUpdater.RemovedTaskResult>
futureForStandbyTask = new CompletableFuture<>();
when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
@@ -3750,7 +3750,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(emptyMap());
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3846,7 +3846,7 @@ public class TaskManagerTest {
when(task00.prepareCommit(true)).thenReturn(emptyMap());
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3874,7 +3874,7 @@ public class TaskManagerTest {
when(task01.commitNeeded()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3903,7 +3903,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(offsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3928,7 +3928,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01, task02));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01, task02));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -3975,7 +3975,7 @@ public class TaskManagerTest {
when(task00.prepareCommit(true)).thenThrow(new
RuntimeException("opsh."));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3998,7 +3998,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenThrow(new
RuntimeException("opsh."));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4030,7 +4030,7 @@ public class TaskManagerTest {
.thenReturn(singletonMap(t1p1, 17L));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4060,7 +4060,7 @@ public class TaskManagerTest {
.thenReturn(singletonMap(t1p1, 17L));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4088,7 +4088,7 @@ public class TaskManagerTest {
when(task00.purgeableOffsets()).thenReturn(singletonMap(t1p1, 5L));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4145,7 +4145,7 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(offsets1);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02, task03));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4205,7 +4205,7 @@ public class TaskManagerTest {
.thenReturn(false); // no more records
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00,
task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4250,7 +4250,7 @@ public class TaskManagerTest {
.thenReturn(false);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4277,7 +4277,7 @@ public class TaskManagerTest {
.thenThrow(new TaskMigratedException("migrated", new
RuntimeException("cause")));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4294,7 +4294,7 @@ public class TaskManagerTest {
when(task00.process(anyLong())).thenThrow(new
RuntimeException("oops"));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4315,7 +4315,7 @@ public class TaskManagerTest {
.thenThrow(new TaskMigratedException("migrated", new
RuntimeException("cause")));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4332,7 +4332,7 @@ public class TaskManagerTest {
when(task00.maybePunctuateStreamTime()).thenThrow(new
KafkaException("oops"));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4350,7 +4350,7 @@ public class TaskManagerTest {
when(task00.maybePunctuateSystemTime()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4386,7 +4386,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(TaskManager.class)) {
appender.setClassLogger(TaskManager.class, Level.DEBUG);
@@ -4591,7 +4591,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(offsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4706,7 +4706,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(offsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4737,7 +4737,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(offsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4769,7 +4769,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4863,7 +4863,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 0887c982873..bb2865d223c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -75,25 +75,25 @@ public class TasksTest {
tasks.addActiveTasks(Set.of(statefulTask, statelessTask));
tasks.addStandbyTasks(Collections.singletonList(standbyTask));
- assertEquals(statefulTask, tasks.task(statefulTask.id()));
- assertEquals(statelessTask, tasks.task(statelessTask.id()));
- assertEquals(standbyTask, tasks.task(standbyTask.id()));
-
- assertEquals(Set.of(statefulTask, statelessTask), new
HashSet<>(tasks.activeTasks()));
- assertEquals(Set.of(standbyTask), new HashSet<>(tasks.standbyTasks()));
- assertEquals(Set.of(statefulTask, statelessTask, standbyTask),
tasks.allTasks());
- assertEquals(Set.of(statefulTask, standbyTask),
tasks.tasks(Set.of(statefulTask.id(), standbyTask.id())));
- assertEquals(Set.of(statefulTask.id(), statelessTask.id(),
standbyTask.id()), tasks.allTaskIds());
+ assertEquals(statefulTask, tasks.initializedTask(statefulTask.id()));
+ assertEquals(statelessTask, tasks.initializedTask(statelessTask.id()));
+ assertEquals(standbyTask, tasks.initializedTask(standbyTask.id()));
+
+ assertEquals(Set.of(statefulTask, statelessTask), new
HashSet<>(tasks.activeInitializedTasks()));
+ assertEquals(Set.of(standbyTask), new
HashSet<>(tasks.standbyInitializedTasks()));
+ assertEquals(Set.of(statefulTask, statelessTask, standbyTask),
tasks.allInitializedTasks());
+ assertEquals(Set.of(statefulTask, standbyTask),
tasks.initializedTasks(Set.of(statefulTask.id(), standbyTask.id())));
+ assertEquals(Set.of(statefulTask.id(), statelessTask.id(),
standbyTask.id()), tasks.allInitializedTaskIds());
assertEquals(
mkMap(
mkEntry(statefulTask.id(), statefulTask),
mkEntry(statelessTask.id(), statelessTask),
mkEntry(standbyTask.id(), standbyTask)
),
- tasks.allTasksPerId());
- assertTrue(tasks.contains(statefulTask.id()));
- assertTrue(tasks.contains(statelessTask.id()));
- assertTrue(tasks.contains(statefulTask.id()));
+ tasks.allInitializedTasksPerId());
+ assertTrue(tasks.containsInitialized(statefulTask.id()));
+ assertTrue(tasks.containsInitialized(statelessTask.id()));
+ assertTrue(tasks.containsInitialized(statefulTask.id()));
}
@Test
@@ -192,12 +192,12 @@ public class TasksTest {
tasks.addFailedTask(activeTask1);
- assertEquals(activeTask1, tasks.task(TASK_0_0));
- assertEquals(activeTask2, tasks.task(TASK_0_1));
- assertTrue(tasks.allTasks().contains(activeTask1));
- assertTrue(tasks.allTasks().contains(activeTask2));
- assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
- assertTrue(tasks.allNonFailedTasks().contains(activeTask2));
+ assertEquals(activeTask1, tasks.initializedTask(TASK_0_0));
+ assertEquals(activeTask2, tasks.initializedTask(TASK_0_1));
+ assertTrue(tasks.allInitializedTasks().contains(activeTask1));
+ assertTrue(tasks.allInitializedTasks().contains(activeTask2));
+
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask2));
}
@Test
@@ -207,11 +207,11 @@ public class TasksTest {
tasks.addFailedTask(activeTask1);
tasks.removeTask(activeTask1);
- assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
- assertFalse(tasks.allTasks().contains(activeTask1));
+
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+ assertFalse(tasks.allInitializedTasks().contains(activeTask1));
tasks.addTask(activeTask1);
- assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
}
@Test
@@ -221,11 +221,11 @@ public class TasksTest {
tasks.addFailedTask(activeTask1);
tasks.clear();
- assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
- assertFalse(tasks.allTasks().contains(activeTask1));
+
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+ assertFalse(tasks.allInitializedTasks().contains(activeTask1));
tasks.addTask(activeTask1);
- assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
}
@Test
@@ -260,6 +260,6 @@ public class TasksTest {
tasks.removeTask(activeTask1);
assertFalse(tasks.pendingTasksToInit().contains(activeTask1));
- assertFalse(tasks.allTasks().contains(activeTask1));
+ assertFalse(tasks.allInitializedTasks().contains(activeTask1));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 0261ac99720..879f62f0b08 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -69,8 +69,8 @@ public class DefaultTaskManagerTest {
@BeforeEach
public void setUp() {
when(task.isProcessable(anyLong())).thenReturn(true);
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
- when(tasks.task(taskId)).thenReturn(task);
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+ when(tasks.initializedTask(taskId)).thenReturn(task);
}
@Test
@@ -94,14 +94,14 @@ public class DefaultTaskManagerTest {
taskManager.add(Collections.singleton(task));
verify(tasks).addTask(task);
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
assertEquals(1, taskManager.getTasks().size());
}
@Test
public void shouldAssignTaskThatCanBeProcessed() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -145,7 +145,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
awaitingThread.interrupt();
@@ -160,7 +160,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.signalTaskExecutors();
@@ -180,7 +180,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.unassignTask(task, taskExecutor);
@@ -195,7 +195,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.add(Collections.singleton(task));
@@ -212,7 +212,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.unlockAllTasks();
@@ -225,7 +225,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldAssignTasksThatCanBeSystemTimePunctuated() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
when(task.canPunctuateSystemTime()).thenReturn(true);
@@ -236,7 +236,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldAssignTasksThatCanBeStreamTimePunctuated() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
when(task.canPunctuateStreamTime()).thenReturn(true);
@@ -247,7 +247,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
ensureTaskMakesProgress();
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(new StreamsException("Exception"),
taskId);
@@ -259,7 +259,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(false);
when(task.canPunctuateStreamTime()).thenReturn(true);
when(task.canPunctuateSystemTime()).thenReturn(true);
@@ -270,7 +270,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignTasksForProcessingIfProcessingDisabled() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(false);
when(task.isProcessable(anyLong())).thenReturn(true);
@@ -280,7 +280,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldUnassignTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -292,7 +292,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotUnassignNotOwnedTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -319,16 +319,16 @@ public class DefaultTaskManagerTest {
@Test
public void shouldRemoveTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
taskManager.lockTasks(Collections.singleton(task.id()));
taskManager.remove(task.id());
verify(tasks).removeTask(task);
reset(tasks);
- when(tasks.activeTasks()).thenReturn(Collections.emptySet());
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.emptySet());
assertEquals(0, taskManager.getTasks().size());
}
@@ -336,10 +336,10 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignLockedTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
assertTrue(taskManager.lockTasks(Collections.singleton(task.id())).isDone());
@@ -349,10 +349,10 @@ public class DefaultTaskManagerTest {
@Test
public void shouldLockAnEmptySetOfTasks() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
assertTrue(taskManager.lockTasks(Collections.emptySet()).isDone());
@@ -365,10 +365,10 @@ public class DefaultTaskManagerTest {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
when(taskExecutor.unassign()).thenReturn(future);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -386,10 +386,10 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignAnyLockedTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
assertTrue(taskManager.lockAllTasks().isDone());
@@ -407,7 +407,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotSetUncaughtExceptionsTwice() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());
@@ -419,7 +419,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldReturnAndClearExceptionsOnDrainExceptions() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());
@@ -433,9 +433,9 @@ public class DefaultTaskManagerTest {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
when(taskExecutor.unassign()).thenReturn(future);
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);