This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f568932e45d MINOR: rename TaskRegistry methods to better reflect their 
purpose. (#21448)
f568932e45d is described below

commit f568932e45dd055bc7b95b3f2f0e3ca09ca0ee57
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Tue Feb 17 11:43:12 2026 -0800

    MINOR: rename TaskRegistry methods to better reflect their purpose. (#21448)
    
    Changed the name of method that work only with initialized tasks(not
    pending) to better reflect their purpose.
    
    Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy
     <[email protected]>
---
 .../streams/processor/internals/TaskExecutor.java  |   6 +-
 .../streams/processor/internals/TaskManager.java   |  54 +++---
 .../kafka/streams/processor/internals/Tasks.java   |  25 +--
 .../streams/processor/internals/TasksRegistry.java |  22 +--
 .../internals/tasks/DefaultTaskManager.java        |  16 +-
 .../processor/internals/TaskExecutorTest.java      |   4 +-
 .../processor/internals/TaskManagerTest.java       | 202 ++++++++++-----------
 .../streams/processor/internals/TasksTest.java     |  52 +++---
 .../internals/tasks/DefaultTaskManagerTest.java    |  74 ++++----
 9 files changed, 228 insertions(+), 227 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 91deab0dd9d..8294bf407d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -68,7 +68,7 @@ public class TaskExecutor {
         int totalProcessed = 0;
         Task lastProcessed = null;
 
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.activeInitializedTasks()) {
             final long now = time.milliseconds();
             try {
                 if (executionMetadata.canProcessTask(task, now)) {
@@ -233,7 +233,7 @@ public class TaskExecutor {
 
     private void updateTaskCommitMetadata(final Map<TopicPartition, 
OffsetAndMetadata> allOffsets) {
         if (!allOffsets.isEmpty()) {
-            for (final Task task : tasks.activeTasks()) {
+            for (final Task task : tasks.activeInitializedTasks()) {
                 if (task instanceof StreamTask) {
                     for (final TopicPartition topicPartition : 
task.inputPartitions()) {
                         if (allOffsets.containsKey(topicPartition)) {
@@ -261,7 +261,7 @@ public class TaskExecutor {
     int punctuate() {
         int punctuated = 0;
 
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.activeInitializedTasks()) {
             try {
                 if (executionMetadata.canPunctuateTask(task)) {
                     if (task.maybePunctuateStreamTime()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 8e6433c0090..f91663cf0c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -197,7 +197,7 @@ public class TaskManager {
         // we should pause consumer only within the listener since
         // before then the assignment has not been updated yet.
         // All tasks that are owned by the task manager are ready and do not 
need to be paused
-        final Set<TopicPartition> partitionsNotToPause = 
tasks.allNonFailedTasks()
+        final Set<TopicPartition> partitionsNotToPause = 
tasks.allNonFailedInitializedTasks()
             .stream()
             .flatMap(task -> task.inputPartitions().stream())
             .collect(Collectors.toSet());
@@ -214,7 +214,7 @@ public class TaskManager {
      * @throws TaskMigratedException
      */
     boolean handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Set<TaskId> activeTasks = new HashSet<>(tasks.activeTaskIds());
+        final Set<TaskId> activeTasks = new 
HashSet<>(tasks.activeInitializedTaskIds());
 
         // We need to stop all processing, since we need to commit 
non-corrupted tasks as well.
         maybeLockTasks(activeTasks);
@@ -223,7 +223,7 @@ public class TaskManager {
         final Set<Task> corruptedStandbyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         for (final TaskId taskId : corruptedTasks) {
-            final Task task = tasks.task(taskId);
+            final Task task = tasks.initializedTask(taskId);
             if (task.isActive()) {
                 corruptedActiveTasks.add(task);
             } else {
@@ -237,7 +237,7 @@ public class TaskManager {
 
         // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
         try {
-            final Collection<Task> tasksToCommit = tasks.allTasksPerId()
+            final Collection<Task> tasksToCommit = 
tasks.allInitializedTasksPerId()
                 .values()
                 .stream()
                 .filter(t -> t.state() == Task.State.RUNNING)
@@ -247,10 +247,10 @@ public class TaskManager {
         } catch (final TaskCorruptedException e) {
             log.info("Some additional tasks were found corrupted while trying 
to commit, these will be added to the " +
                          "tasks to clean and revive: {}", e.corruptedTasks());
-            corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            
corruptedActiveTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
         } catch (final TimeoutException e) {
             log.info("Hit TimeoutException when committing all non-corrupted 
tasks, these will be closed and revived");
-            final Collection<Task> uncorruptedTasks = new 
HashSet<>(tasks.activeTasks());
+            final Collection<Task> uncorruptedTasks = new 
HashSet<>(tasks.activeInitializedTasks());
             uncorruptedTasks.removeAll(corruptedActiveTasks);
             // Those tasks which just timed out can just be closed dirty 
without marking changelogs as corrupted
             closeDirtyAndRevive(uncorruptedTasks, false);
@@ -366,7 +366,7 @@ public class TaskManager {
         final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         final Set<TaskId> tasksToLock =
-            tasks.allTaskIds().stream()
+            tasks.allInitializedTaskIds().stream()
                 .filter(x -> activeTasksToCreate.containsKey(x) || 
standbyTasksToCreate.containsKey(x))
                 .collect(Collectors.toSet());
 
@@ -533,7 +533,7 @@ public class TaskManager {
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
-        for (final Task task : tasks.allNonFailedTasks()) {
+        for (final Task task : tasks.allNonFailedInitializedTasks()) {
             if (!task.isActive()) {
                 throw new IllegalStateException("Standby tasks should only be 
managed by the state updater, " +
                     "but standby task " + task.id() + " is managed by the 
stream thread");
@@ -733,7 +733,7 @@ public class TaskManager {
         while (iter.hasNext()) {
             final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
             final TaskId taskId = entry.getKey();
-            final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
+            final boolean taskIsOwned = 
tasks.allInitializedTaskIds().contains(taskId)
                 || (stateUpdater.tasks().stream().anyMatch(task -> 
task.id().equals(taskId)));
             if (taskId.topologyName() != null && !taskIsOwned && 
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
                 log.info("Cannot create the assigned task {} since it's 
topology name cannot be recognized, will put it " +
@@ -1079,7 +1079,7 @@ public class TaskManager {
                      e.corruptedTasks());
 
             // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
-            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
             closeDirtyAndRevive(dirtyTasks, true);
         } catch (final TimeoutException e) {
             log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
@@ -1193,8 +1193,8 @@ public class TaskManager {
     }
 
     private void closeRunningTasksDirty() {
-        final Set<Task> allTask = tasks.allTasks();
-        final Set<TaskId> allTaskIds = tasks.allTaskIds();
+        final Set<Task> allTask = tasks.allInitializedTasks();
+        final Set<TaskId> allTaskIds = tasks.allInitializedTaskIds();
         maybeLockTasks(allTaskIds);
         for (final Task task : allTask) {
             // Even though we've apparently dropped out of the group, we can 
continue safely to maintain our
@@ -1413,10 +1413,10 @@ public class TaskManager {
 
         // TODO: change type to `StreamTask`
         final Set<Task> activeTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
-        activeTasks.addAll(tasks.activeTasks());
+        activeTasks.addAll(tasks.activeInitializedTasks());
         // TODO: change type to `StandbyTask`
         final Set<Task> standbyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
-        standbyTasks.addAll(tasks.standbyTasks());
+        standbyTasks.addAll(tasks.standbyInitializedTasks());
 
         final Set<Task> pendingActiveTasks = 
tasks.drainPendingActiveTasksToInit();
         activeTasks.addAll(pendingActiveTasks);
@@ -1670,7 +1670,7 @@ public class TaskManager {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
         final Map<TaskId, Task> ret = 
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
-        ret.putAll(tasks.allTasksPerId());
+        ret.putAll(tasks.allInitializedTasksPerId());
         
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));
         return ret;
     }
@@ -1683,19 +1683,19 @@ public class TaskManager {
     Map<TaskId, Task> allRunningTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        return tasks.allTasksPerId();
+        return tasks.allInitializedTasksPerId();
     }
 
     Set<Task> readOnlyAllTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
         final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
-        ret.addAll(tasks.allTasks());
+        ret.addAll(tasks.allInitializedTasks());
         return Collections.unmodifiableSet(ret);
     }
 
     Map<TaskId, Task> notPausedTasks() {
-        return Collections.unmodifiableMap(tasks.allTasks()
+        return Collections.unmodifiableMap(tasks.allInitializedTasks()
             .stream()
             .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
             .collect(Collectors.toMap(Task::id, v -> v)));
@@ -1721,7 +1721,7 @@ public class TaskManager {
     }
 
     private Stream<Task> activeRunningTaskStream() {
-        return tasks.allTasks().stream().filter(Task::isActive);
+        return tasks.allInitializedTasks().stream().filter(Task::isActive);
     }
 
     Map<TaskId, Task> standbyTaskMap() {
@@ -1733,7 +1733,7 @@ public class TaskManager {
     }
 
     private Stream<Task> standbyTaskStream() {
-        final Stream<Task> standbyTasksInTaskRegistry = 
tasks.allTasks().stream().filter(t -> !t.isActive());
+        final Stream<Task> standbyTasksInTaskRegistry = 
tasks.allInitializedTasks().stream().filter(t -> !t.isActive());
         return Stream.concat(
             stateUpdater.standbyTasks().stream(),
             standbyTasksInTaskRegistry
@@ -1741,7 +1741,7 @@ public class TaskManager {
     }
     // For testing only.
     int commitAll() {
-        return commit(tasks.allTasks());
+        return commit(tasks.allInitializedTasks());
     }
 
     /**
@@ -1749,7 +1749,7 @@ public class TaskManager {
      * the corresponding record queues have capacity (again).
      */
     public void resumePollingForPartitionsWithAvailableSpace() {
-        for (final Task t: tasks.activeTasks()) {
+        for (final Task t: tasks.activeInitializedTasks()) {
             t.resumePollingForPartitionsWithAvailableSpace();
         }
     }
@@ -1758,7 +1758,7 @@ public class TaskManager {
      * Fetches up-to-date lag information from the consumer.
      */
     public void updateLags() {
-        for (final Task t: tasks.activeTasks()) {
+        for (final Task t: tasks.activeInitializedTasks()) {
             t.updateLags();
         }
     }
@@ -1808,7 +1808,7 @@ public class TaskManager {
     }
 
     private Task getActiveTask(final TopicPartition partition) {
-        final Task activeTask = tasks.activeTasksForInputPartition(partition);
+        final Task activeTask = 
tasks.activeInitializedTasksForInputPartition(partition);
 
         if (activeTask == null) {
             log.error("Unable to locate active task for received-record 
partition {}. Current tasks: {}",
@@ -1912,7 +1912,7 @@ public class TaskManager {
     }
 
     public void updateTaskEndMetadata(final TopicPartition topicPartition, 
final Long offset) {
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.activeInitializedTasks()) {
             if (task instanceof StreamTask) {
                 if (task.inputPartitions().contains(topicPartition)) {
                     ((StreamTask) task).updateEndOffsets(topicPartition, 
offset);
@@ -1943,7 +1943,7 @@ public class TaskManager {
         try {
             final Set<Task> activeTasksToRemove = new 
TreeSet<>(Comparator.comparing(Task::id));
             final Set<Task> standbyTasksToRemove = new 
TreeSet<>(Comparator.comparing(Task::id));
-            for (final Task task : tasks.allTasks()) {
+            for (final Task task : tasks.allInitializedTasks()) {
                 if 
(!currentNamedTopologies.contains(task.id().topologyName())) {
                     if (task.isActive()) {
                         activeTasksToRemove.add(task);
@@ -2032,7 +2032,7 @@ public class TaskManager {
         stringBuilder.append("TaskManager\n");
         stringBuilder.append(indent).append("\tMetadataState:\n");
         stringBuilder.append(indent).append("\tTasks:\n");
-        for (final Task task : tasks.allTasks()) {
+        for (final Task task : tasks.allInitializedTasks()) {
             stringBuilder.append(indent)
                          .append("\t\t")
                          .append(task.id())
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 4af90d181c6..2e6175446ad 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -280,6 +280,7 @@ class Tasks implements TasksRegistry {
     @Override
     public synchronized void clear() {
         pendingTasksToInit.clear();
+        pendingTasksToClose.clear();
         pendingActiveTasksToCreate.clear();
         pendingStandbyTasksToCreate.clear();
         activeTasksPerId.clear();
@@ -290,7 +291,7 @@ class Tasks implements TasksRegistry {
 
     // TODO: change return type to `StreamTask`
     @Override
-    public Task activeTasksForInputPartition(final TopicPartition partition) {
+    public Task activeInitializedTasksForInputPartition(final TopicPartition 
partition) {
         return activeTasksPerPartition.get(partition);
     }
 
@@ -305,7 +306,7 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public Task task(final TaskId taskId) {
+    public Task initializedTask(final TaskId taskId) {
         final Task task = getTask(taskId);
 
         if (task != null)
@@ -315,26 +316,26 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public Collection<Task> tasks(final Collection<TaskId> taskIds) {
+    public Collection<Task> initializedTasks(final Collection<TaskId> taskIds) 
{
         final Set<Task> tasks = new HashSet<>();
         for (final TaskId taskId : taskIds) {
-            tasks.add(task(taskId));
+            tasks.add(initializedTask(taskId));
         }
         return tasks;
     }
 
     @Override
-    public synchronized Collection<TaskId> activeTaskIds() {
+    public synchronized Collection<TaskId> activeInitializedTaskIds() {
         return Collections.unmodifiableCollection(activeTasksPerId.keySet());
     }
 
     @Override
-    public synchronized Collection<Task> activeTasks() {
+    public synchronized Collection<Task> activeInitializedTasks() {
         return Collections.unmodifiableCollection(activeTasksPerId.values());
     }
 
     @Override
-    public synchronized Collection<Task> standbyTasks() {
+    public synchronized Collection<Task> standbyInitializedTasks() {
         return Collections.unmodifiableCollection(standbyTasksPerId.values());
     }
 
@@ -343,12 +344,12 @@ class Tasks implements TasksRegistry {
      * and the returned task could be modified by other threads concurrently
      */
     @Override
-    public synchronized Set<Task> allTasks() {
+    public synchronized Set<Task> allInitializedTasks() {
         return union(HashSet::new, new HashSet<>(activeTasksPerId.values()), 
new HashSet<>(standbyTasksPerId.values()));
     }
 
     @Override
-    public synchronized Set<Task> allNonFailedTasks() {
+    public synchronized Set<Task> allNonFailedInitializedTasks() {
         final Set<Task> nonFailedActiveTasks = 
activeTasksPerId.values().stream()
             .filter(task -> !failedTaskIds.contains(task.id()))
             .collect(Collectors.toSet());
@@ -359,12 +360,12 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public synchronized Set<TaskId> allTaskIds() {
+    public synchronized Set<TaskId> allInitializedTaskIds() {
         return union(HashSet::new, activeTasksPerId.keySet(), 
standbyTasksPerId.keySet());
     }
 
     @Override
-    public synchronized Map<TaskId, Task> allTasksPerId() {
+    public synchronized Map<TaskId, Task> allInitializedTasksPerId() {
         final Map<TaskId, Task> ret = new HashMap<>();
         ret.putAll(activeTasksPerId);
         ret.putAll(standbyTasksPerId);
@@ -372,7 +373,7 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public boolean contains(final TaskId taskId) {
+    public boolean containsInitialized(final TaskId taskId) {
         return getTask(taskId) != null;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 6099efb5bc7..4e037fbd6f6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -69,25 +69,25 @@ public interface TasksRegistry {
 
     void clear();
 
-    Task activeTasksForInputPartition(final TopicPartition partition);
+    Task activeInitializedTasksForInputPartition(final TopicPartition 
partition);
 
-    Task task(final TaskId taskId);
+    Task initializedTask(final TaskId taskId);
 
-    Collection<Task> tasks(final Collection<TaskId> taskIds);
+    Collection<Task> initializedTasks(final Collection<TaskId> taskIds);
 
-    Collection<TaskId> activeTaskIds();
+    Collection<TaskId> activeInitializedTaskIds();
 
-    Collection<Task> activeTasks();
+    Collection<Task> activeInitializedTasks();
 
-    Collection<Task> standbyTasks();
+    Collection<Task> standbyInitializedTasks();
 
-    Set<Task> allTasks();
+    Set<Task> allInitializedTasks();
 
-    Set<Task> allNonFailedTasks();
+    Set<Task> allNonFailedInitializedTasks();
 
-    Map<TaskId, Task> allTasksPerId();
+    Map<TaskId, Task> allInitializedTasksPerId();
 
-    Set<TaskId> allTaskIds();
+    Set<TaskId> allInitializedTaskIds();
 
-    boolean contains(final TaskId taskId);
+    boolean containsInitialized(final TaskId taskId);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 51ec80d05ac..2259f7768c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -102,7 +102,7 @@ public final class DefaultTaskManager implements 
TaskManager {
             }
 
             // the most naive scheduling algorithm for now: give the next 
unlocked, unassigned, and  processable task
-            for (final Task task : tasks.activeTasks()) {
+            for (final Task task : tasks.activeInitializedTasks()) {
                 if (!assignedTasks.containsKey(task.id()) &&
                     !lockedTasks.contains(task.id()) &&
                     canProgress((StreamTask) task, time.milliseconds()) &&
@@ -126,7 +126,7 @@ public final class DefaultTaskManager implements 
TaskManager {
     @Override
     public void awaitProcessableTasks(final Supplier<Boolean> isShuttingDown) 
throws InterruptedException {
         final boolean interrupted = returnWithTasksLocked(() -> {
-            for (final Task task : tasks.activeTasks()) {
+            for (final Task task : tasks.activeInitializedTasks()) {
                 if (!assignedTasks.containsKey(task.id()) &&
                     !lockedTasks.contains(task.id()) &&
                     canProgress((StreamTask) task, time.milliseconds()) &&
@@ -200,7 +200,7 @@ public final class DefaultTaskManager implements 
TaskManager {
             final Set<TaskId> remainingTaskIds = new 
ConcurrentSkipListSet<>(taskIds);
 
             for (final TaskId taskId : taskIds) {
-                final Task task = tasks.task(taskId);
+                final Task task = tasks.initializedTask(taskId);
 
                 if (task == null) {
                     throw new IllegalArgumentException("Trying to lock task " 
+ taskId + " but it's not owned");
@@ -243,7 +243,7 @@ public final class DefaultTaskManager implements 
TaskManager {
     @Override
     public KafkaFuture<Void> lockAllTasks() {
         return returnWithTasksLocked(() ->
-            
lockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet()))
+            
lockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet()))
         );
     }
 
@@ -263,7 +263,7 @@ public final class DefaultTaskManager implements 
TaskManager {
 
     @Override
     public void unlockAllTasks() {
-        executeWithTasksLocked(() -> 
unlockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
+        executeWithTasksLocked(() -> 
unlockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet())));
     }
 
     @Override
@@ -290,11 +290,11 @@ public final class DefaultTaskManager implements 
TaskManager {
                 throw new IllegalArgumentException("The task to remove is not 
locked yet by the task manager");
             }
 
-            if (!tasks.contains(taskId)) {
+            if (!tasks.containsInitialized(taskId)) {
                 throw new IllegalArgumentException("The task to remove is not 
owned by the task manager");
             }
 
-            tasks.removeTask(tasks.task(taskId));
+            tasks.removeTask(tasks.initializedTask(taskId));
         });
 
         log.info("Removed task {} from the task manager", taskId);
@@ -302,7 +302,7 @@ public final class DefaultTaskManager implements 
TaskManager {
 
     @Override
     public Set<ReadOnlyTask> getTasks() {
-        return returnWithTasksLocked(() -> 
tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
+        return returnWithTasksLocked(() -> 
tasks.activeInitializedTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 8d9ef70c5e3..568998931c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -38,7 +38,7 @@ public class TaskExecutorTest {
         final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, 
metadata, new LogContext());
 
         taskExecutor.punctuate();
-        verify(tasks).activeTasks();
+        verify(tasks).activeInitializedTasks();
     }
 
     @Test
@@ -59,4 +59,4 @@ public class TaskExecutorTest {
 
         verify(producer).commitTransaction(Collections.emptyMap(), 
groupMetadata);
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 86e732185fb..92d150096a7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -239,8 +239,8 @@ public class TaskManagerTest {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
-        when(tasks.task(taskId00)).thenReturn(activeTask1);
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
+        when(tasks.initializedTask(taskId00)).thenReturn(activeTask1);
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -274,7 +274,7 @@ public class TaskManagerTest {
     public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -297,7 +297,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(activeTask1, 
activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -333,7 +333,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1, 
activeTask2));
 
         taskManager.resumePollingForPartitionsWithAvailableSpace();
 
@@ -351,7 +351,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1, 
activeTask2));
 
         taskManager.updateLags();
 
@@ -630,7 +630,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(reassignedActiveTask.id(), 
reassignedActiveTask.inputPartitions())),
@@ -669,7 +669,7 @@ public class TaskManagerTest {
         assertEquals(taskException, exception.getCause());
         verify(tasks).addFailedTask(failedActiveTaskToRecycle);
         verify(tasks, never()).addTask(failedActiveTaskToRecycle);
-        verify(tasks).allNonFailedTasks();
+        verify(tasks).allNonFailedInitializedTasks();
         verify(standbyTaskCreator, 
never()).createStandbyTaskFromActive(failedActiveTaskToRecycle, 
taskId03Partitions);
     }
 
@@ -699,7 +699,7 @@ public class TaskManagerTest {
         assertEquals(taskException, exception.getCause());
         verify(tasks).addFailedTask(failedStandbyTaskToRecycle);
         verify(tasks, never()).addTask(failedStandbyTaskToRecycle);
-        verify(tasks).allNonFailedTasks();
+        verify(tasks).allNonFailedInitializedTasks();
         verify(activeTaskCreator, 
never()).createActiveTaskFromStandby(failedStandbyTaskToRecycle, 
taskId03Partitions, consumer);
     }
 
@@ -729,7 +729,7 @@ public class TaskManagerTest {
         assertEquals(taskException, exception.getCause());
         verify(tasks).addFailedTask(failedActiveTaskToReassign);
         verify(tasks, never()).addTask(failedActiveTaskToReassign);
-        verify(tasks).allNonFailedTasks();
+        verify(tasks).allNonFailedInitializedTasks();
         verify(tasks, 
never()).updateActiveTaskInputPartitions(failedActiveTaskToReassign, 
taskId00Partitions);
     }
 
@@ -743,7 +743,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask1));
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
         when(stateUpdater.remove(reassignedActiveTask2.id()))
             .thenReturn(CompletableFuture.completedFuture(new 
StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
@@ -848,7 +848,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
runningActiveTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
runningActiveTask)));
         when(tasks.pendingTasksToInit()).thenReturn(Set.of(activeTaskToInit));
         assertEquals(
             taskManager.allTasks(),
@@ -871,7 +871,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
activeTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
activeTask)));
         assertEquals(taskManager.allRunningTasks(), mkMap(mkEntry(taskId03, 
activeTask)));
     }
 
@@ -923,7 +923,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions)
             .inState(State.CREATED).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
             .thenReturn(standbyTask);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -943,7 +943,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final IllegalStateException illegalStateException = assertThrows(
@@ -966,7 +966,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
@@ -984,7 +984,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToClose));
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -1004,7 +1004,7 @@ public class TaskManagerTest {
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
         
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, 
newInputPartitions)).thenReturn(true);
 
         taskManager.handleAssignment(
@@ -1024,7 +1024,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
@@ -1042,7 +1042,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
@@ -1064,7 +1064,7 @@ public class TaskManagerTest {
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -1089,7 +1089,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions())),
@@ -1770,7 +1770,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(statefulTask0));
         final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
         when(consumer.assignment()).thenReturn(assigned);
 
@@ -1792,9 +1792,9 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, 
restoringStatefulTask));
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(runningStatefulTask));
         expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
         expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
         makeTaskFolders(
@@ -1826,7 +1826,7 @@ public class TaskManagerTest {
         
when(runningStatefulTask.changelogOffsets()).thenReturn(changelogOffsets);
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
 
         assertThat(
             taskManager.taskOffsetSums(),
@@ -1912,7 +1912,7 @@ public class TaskManagerTest {
             .thenReturn(mkMap(mkEntry(t1p2changelog, 
changelogOffsetOfRestoringStandbyTask)));
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, 
restoringStatefulTask));
 
         assertThat(
@@ -1937,7 +1937,7 @@ public class TaskManagerTest {
             ));
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
         assertThat(
@@ -2009,7 +2009,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
task)));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -2069,7 +2069,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -2090,7 +2090,7 @@ public class TaskManagerTest {
         doThrow(new RuntimeException("KABOOM!")).when(task00).closeClean();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -2121,8 +2121,8 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
-        when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(2);
         taskFolders.add(new 
TaskDirectory(testFolder.resolve(taskId00.toString()).toFile(), null));
@@ -2183,8 +2183,8 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
-        when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
+        when(tasks.initializedTask(taskId03)).thenReturn(corruptedActiveTask);
+        when(tasks.initializedTask(taskId02)).thenReturn(corruptedStandbyTask);
 
         taskManager.handleCorruption(Set.of(corruptedActiveTask.id(), 
corruptedStandbyTask.id()));
 
@@ -2209,9 +2209,9 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(task00);
-        when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+        when(tasks.initializedTask(taskId00)).thenReturn(task00);
+        
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00, 
task00));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
 
         when(task00.prepareCommit(false)).thenReturn(emptyMap());
         doNothing().when(task00).postCommit(anyBoolean());
@@ -2241,9 +2241,9 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(task00);
-        when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+        when(tasks.initializedTask(taskId00)).thenReturn(task00);
+        
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00, 
task00));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
 
         when(task00.prepareCommit(false)).thenReturn(emptyMap());
         
when(task00.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
@@ -2276,12 +2276,12 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedTask);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedTask);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedTask),
             mkEntry(taskId01, nonCorruptedTask)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         when(nonCorruptedTask.commitNeeded()).thenReturn(true);
         when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
@@ -2315,8 +2315,8 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions)
             .inState(State.RUNNING).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
-        when(tasks.task(taskId02)).thenReturn(corruptedTask);
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
+        when(tasks.initializedTask(taskId02)).thenReturn(corruptedTask);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(consumer.assignment()).thenReturn(intersection(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
@@ -2340,12 +2340,12 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedStandby);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedStandby);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedStandby),
             mkEntry(taskId01, runningNonCorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId01));
 
         when(runningNonCorruptedActive.commitNeeded()).thenReturn(true);
         when(runningNonCorruptedActive.prepareCommit(true))
@@ -2386,12 +2386,12 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedActive);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedActive),
             mkEntry(taskId01, uncorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         when(uncorruptedActive.commitNeeded()).thenReturn(true);
         when(uncorruptedActive.prepareCommit(true)).thenReturn(emptyMap());
@@ -2430,12 +2430,12 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedActive);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedActive),
             mkEntry(taskId01, uncorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2446,7 +2446,7 @@ public class TaskManagerTest {
 
         // mock uncorrupted task to indicate that it needs commit and will 
return offsets
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
-        
when(tasks.tasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
+        
when(tasks.initializedTasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
         when(uncorruptedActive.commitNeeded()).thenReturn(true);
         when(uncorruptedActive.prepareCommit(true)).thenReturn(offsets);
         when(uncorruptedActive.prepareCommit(false)).thenReturn(emptyMap());
@@ -2511,13 +2511,13 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedActive);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedActive),
             mkEntry(taskId01, uncorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
-        when(tasks.activeTasks()).thenReturn(Set.of(corruptedActive, 
uncorruptedActive));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
+        
when(tasks.activeInitializedTasks()).thenReturn(Set.of(corruptedActive, 
uncorruptedActive));
 
         // we need to mock uncorrupted task to indicate that it needs commit 
and will return offsets
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -2591,7 +2591,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
 
         when(consumer.assignment()).thenReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
@@ -2677,8 +2677,8 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
-        when(tasks.tasks(Set.of(taskId00, 
taskId01))).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+        when(tasks.initializedTasks(Set.of(taskId00, 
taskId01))).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2791,7 +2791,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
 
         when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
         when(tasks.hasPendingTasksToInit()).thenReturn(false);
@@ -2827,7 +2827,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final Set<TopicPartition> newPartitionsSet = Set.of(t1p1);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
         when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
         when(tasks.hasPendingTasksToInit()).thenReturn(false);
         when(tasks.updateActiveTaskInputPartitions(task00, 
newPartitionsSet)).thenReturn(true);
@@ -2963,7 +2963,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
 
@@ -3007,7 +3007,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
 
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task10));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02, task10));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -3099,7 +3099,7 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(offsets01);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02, task03));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3150,7 +3150,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         when(task00.commitNeeded()).thenReturn(false);
         when(task01.commitNeeded()).thenReturn(true); // only task01 needs 
commit
@@ -3181,7 +3181,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
         when(stateUpdater.tasks()).thenReturn(Set.of(task01));
 
         final Map<TaskId, Set<TopicPartition>> assignmentActive = 
singletonMap(taskId00, taskId00Partitions);
@@ -3215,7 +3215,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
         when(stateUpdater.tasks()).thenReturn(Set.of(task01));
 
         // mock to remove standby task from state updater
@@ -3283,7 +3283,7 @@ public class TaskManagerTest {
         doThrow(new RuntimeException("KABOOM!")).when(task00).suspend();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3323,7 +3323,7 @@ public class TaskManagerTest {
         doThrow(new RuntimeException("oops"))
             .when(task02).suspend();
 
-        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         final RuntimeException exception = assertThrows(
             RuntimeException.class,
@@ -3360,7 +3360,7 @@ public class TaskManagerTest {
 
         doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
 
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final RuntimeException exception = assertThrows(
             RuntimeException.class,
@@ -3460,7 +3460,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         final RuntimeException thrown = assertThrows(RuntimeException.class,
             () -> taskManager.handleRevocation(union(HashSet::new, 
taskId01Partitions, taskId02Partitions)));
@@ -3497,7 +3497,7 @@ public class TaskManagerTest {
             .when(task02).suspend();
         doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
 
-        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         taskManager.shutdown(false);
 
@@ -3530,7 +3530,7 @@ public class TaskManagerTest {
 
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask00)).thenReturn(Set.of());
         when(stateUpdater.standbyTasks()).thenReturn(Set.of(standbyTask00));
-        when(tasks.standbyTasks()).thenReturn(Set.of(standbyTask00));
+        
when(tasks.standbyInitializedTasks()).thenReturn(Set.of(standbyTask00));
 
         final CompletableFuture<StateUpdater.RemovedTaskResult> 
futureForStandbyTask = new CompletableFuture<>();
         when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
@@ -3750,7 +3750,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(emptyMap());
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3846,7 +3846,7 @@ public class TaskManagerTest {
         when(task00.prepareCommit(true)).thenReturn(emptyMap());
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3874,7 +3874,7 @@ public class TaskManagerTest {
         when(task01.commitNeeded()).thenReturn(true);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3903,7 +3903,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(offsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3928,7 +3928,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01, task02));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01, task02));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -3975,7 +3975,7 @@ public class TaskManagerTest {
         when(task00.prepareCommit(true)).thenThrow(new 
RuntimeException("opsh."));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3998,7 +3998,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenThrow(new 
RuntimeException("opsh."));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4030,7 +4030,7 @@ public class TaskManagerTest {
             .thenReturn(singletonMap(t1p1, 17L));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4060,7 +4060,7 @@ public class TaskManagerTest {
             .thenReturn(singletonMap(t1p1, 17L));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4088,7 +4088,7 @@ public class TaskManagerTest {
         when(task00.purgeableOffsets()).thenReturn(singletonMap(t1p1, 5L));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4145,7 +4145,7 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(offsets1);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02, task03));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4205,7 +4205,7 @@ public class TaskManagerTest {
             .thenReturn(false); // no more records
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, 
task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4250,7 +4250,7 @@ public class TaskManagerTest {
             .thenReturn(false);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4277,7 +4277,7 @@ public class TaskManagerTest {
             .thenThrow(new TaskMigratedException("migrated", new 
RuntimeException("cause")));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4294,7 +4294,7 @@ public class TaskManagerTest {
         when(task00.process(anyLong())).thenThrow(new 
RuntimeException("oops"));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4315,7 +4315,7 @@ public class TaskManagerTest {
             .thenThrow(new TaskMigratedException("migrated", new 
RuntimeException("cause")));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4332,7 +4332,7 @@ public class TaskManagerTest {
         when(task00.maybePunctuateStreamTime()).thenThrow(new 
KafkaException("oops"));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4350,7 +4350,7 @@ public class TaskManagerTest {
         when(task00.maybePunctuateSystemTime()).thenReturn(true);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4386,7 +4386,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(TaskManager.class)) {
             appender.setClassLogger(TaskManager.class, Level.DEBUG);
@@ -4591,7 +4591,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(offsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4706,7 +4706,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(offsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4737,7 +4737,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(offsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4769,7 +4769,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
 
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4863,7 +4863,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 0887c982873..bb2865d223c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -75,25 +75,25 @@ public class TasksTest {
         tasks.addActiveTasks(Set.of(statefulTask, statelessTask));
         tasks.addStandbyTasks(Collections.singletonList(standbyTask));
 
-        assertEquals(statefulTask, tasks.task(statefulTask.id()));
-        assertEquals(statelessTask, tasks.task(statelessTask.id()));
-        assertEquals(standbyTask, tasks.task(standbyTask.id()));
-
-        assertEquals(Set.of(statefulTask, statelessTask), new 
HashSet<>(tasks.activeTasks()));
-        assertEquals(Set.of(standbyTask), new HashSet<>(tasks.standbyTasks()));
-        assertEquals(Set.of(statefulTask, statelessTask, standbyTask), 
tasks.allTasks());
-        assertEquals(Set.of(statefulTask, standbyTask), 
tasks.tasks(Set.of(statefulTask.id(), standbyTask.id())));
-        assertEquals(Set.of(statefulTask.id(), statelessTask.id(), 
standbyTask.id()), tasks.allTaskIds());
+        assertEquals(statefulTask, tasks.initializedTask(statefulTask.id()));
+        assertEquals(statelessTask, tasks.initializedTask(statelessTask.id()));
+        assertEquals(standbyTask, tasks.initializedTask(standbyTask.id()));
+
+        assertEquals(Set.of(statefulTask, statelessTask), new 
HashSet<>(tasks.activeInitializedTasks()));
+        assertEquals(Set.of(standbyTask), new 
HashSet<>(tasks.standbyInitializedTasks()));
+        assertEquals(Set.of(statefulTask, statelessTask, standbyTask), 
tasks.allInitializedTasks());
+        assertEquals(Set.of(statefulTask, standbyTask), 
tasks.initializedTasks(Set.of(statefulTask.id(), standbyTask.id())));
+        assertEquals(Set.of(statefulTask.id(), statelessTask.id(), 
standbyTask.id()), tasks.allInitializedTaskIds());
         assertEquals(
             mkMap(
                 mkEntry(statefulTask.id(), statefulTask),
                 mkEntry(statelessTask.id(), statelessTask),
                 mkEntry(standbyTask.id(), standbyTask)
             ),
-            tasks.allTasksPerId());
-        assertTrue(tasks.contains(statefulTask.id()));
-        assertTrue(tasks.contains(statelessTask.id()));
-        assertTrue(tasks.contains(statefulTask.id()));
+            tasks.allInitializedTasksPerId());
+        assertTrue(tasks.containsInitialized(statefulTask.id()));
+        assertTrue(tasks.containsInitialized(statelessTask.id()));
+        assertTrue(tasks.containsInitialized(statefulTask.id()));
     }
 
     @Test
@@ -192,12 +192,12 @@ public class TasksTest {
 
         tasks.addFailedTask(activeTask1);
 
-        assertEquals(activeTask1, tasks.task(TASK_0_0));
-        assertEquals(activeTask2, tasks.task(TASK_0_1));
-        assertTrue(tasks.allTasks().contains(activeTask1));
-        assertTrue(tasks.allTasks().contains(activeTask2));
-        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
-        assertTrue(tasks.allNonFailedTasks().contains(activeTask2));
+        assertEquals(activeTask1, tasks.initializedTask(TASK_0_0));
+        assertEquals(activeTask2, tasks.initializedTask(TASK_0_1));
+        assertTrue(tasks.allInitializedTasks().contains(activeTask1));
+        assertTrue(tasks.allInitializedTasks().contains(activeTask2));
+        
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+        assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask2));
     }
 
     @Test
@@ -207,11 +207,11 @@ public class TasksTest {
         tasks.addFailedTask(activeTask1);
 
         tasks.removeTask(activeTask1);
-        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
-        assertFalse(tasks.allTasks().contains(activeTask1));
+        
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+        assertFalse(tasks.allInitializedTasks().contains(activeTask1));
 
         tasks.addTask(activeTask1);
-        assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+        assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
     }
 
     @Test
@@ -221,11 +221,11 @@ public class TasksTest {
         tasks.addFailedTask(activeTask1);
 
         tasks.clear();
-        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
-        assertFalse(tasks.allTasks().contains(activeTask1));
+        
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+        assertFalse(tasks.allInitializedTasks().contains(activeTask1));
 
         tasks.addTask(activeTask1);
-        assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+        assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
     }
 
     @Test
@@ -260,6 +260,6 @@ public class TasksTest {
 
         tasks.removeTask(activeTask1);
         assertFalse(tasks.pendingTasksToInit().contains(activeTask1));
-        assertFalse(tasks.allTasks().contains(activeTask1));
+        assertFalse(tasks.allInitializedTasks().contains(activeTask1));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 0261ac99720..879f62f0b08 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -69,8 +69,8 @@ public class DefaultTaskManagerTest {
     @BeforeEach
     public void setUp() {
         when(task.isProcessable(anyLong())).thenReturn(true);
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
-        when(tasks.task(taskId)).thenReturn(task);
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.initializedTask(taskId)).thenReturn(task);
     }
 
     @Test
@@ -94,14 +94,14 @@ public class DefaultTaskManagerTest {
         taskManager.add(Collections.singleton(task));
 
         verify(tasks).addTask(task);
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         assertEquals(1, taskManager.getTasks().size());
     }
 
     @Test
     public void shouldAssignTaskThatCanBeProcessed() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -145,7 +145,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         awaitingThread.interrupt();
 
@@ -160,7 +160,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.signalTaskExecutors();
 
@@ -180,7 +180,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.unassignTask(task, taskExecutor);
 
@@ -195,7 +195,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.add(Collections.singleton(task));
 
@@ -212,7 +212,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.unlockAllTasks();
 
@@ -225,7 +225,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldAssignTasksThatCanBeSystemTimePunctuated() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         when(task.canPunctuateSystemTime()).thenReturn(true);
 
@@ -236,7 +236,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldAssignTasksThatCanBeStreamTimePunctuated() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
         when(task.canPunctuateStreamTime()).thenReturn(true);
 
@@ -247,7 +247,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         ensureTaskMakesProgress();
         taskManager.assignNextTask(taskExecutor);
         taskManager.setUncaughtException(new StreamsException("Exception"), 
taskId);
@@ -259,7 +259,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(false);
         when(task.canPunctuateStreamTime()).thenReturn(true);
         when(task.canPunctuateSystemTime()).thenReturn(true);
@@ -270,7 +270,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignTasksForProcessingIfProcessingDisabled() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(false);
         when(task.isProcessable(anyLong())).thenReturn(true);
 
@@ -280,7 +280,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldUnassignTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -292,7 +292,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotUnassignNotOwnedTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -319,16 +319,16 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldRemoveTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         taskManager.lockTasks(Collections.singleton(task.id()));
         taskManager.remove(task.id());
 
         verify(tasks).removeTask(task);
         reset(tasks);
-        when(tasks.activeTasks()).thenReturn(Collections.emptySet());
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.emptySet());
 
         assertEquals(0, taskManager.getTasks().size());
     }
@@ -336,10 +336,10 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignLockedTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         
assertTrue(taskManager.lockTasks(Collections.singleton(task.id())).isDone());
 
@@ -349,10 +349,10 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldLockAnEmptySetOfTasks() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         assertTrue(taskManager.lockTasks(Collections.emptySet()).isDone());
 
@@ -365,10 +365,10 @@ public class DefaultTaskManagerTest {
         final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
 
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
         when(taskExecutor.unassign()).thenReturn(future);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -386,10 +386,10 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignAnyLockedTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         assertTrue(taskManager.lockAllTasks().isDone());
 
@@ -407,7 +407,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotSetUncaughtExceptionsTwice() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         taskManager.assignNextTask(taskExecutor);
         taskManager.setUncaughtException(exception, task.id());
@@ -419,7 +419,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldReturnAndClearExceptionsOnDrainExceptions() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         taskManager.assignNextTask(taskExecutor);
         taskManager.setUncaughtException(exception, task.id());
@@ -433,9 +433,9 @@ public class DefaultTaskManagerTest {
         final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
 
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
         when(taskExecutor.unassign()).thenReturn(future);
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 


Reply via email to