cadonna commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925471614


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1307,9 +1307,9 @@ public Set<Characteristics> characteristics() {
     }
 
     @SafeVarargs
-    public static <E> Set<E> union(final Supplier<Set<E>> constructor, final 
Set<E>... set) {
+    public static <E> Set<E> union(final Supplier<Set<E>> constructor, final 
Collection<E>... set) {
         final Set<E> result = constructor.get();
-        for (final Set<E> s : set) {
+        for (final Collection<E> s : set) {

Review Comment:
   This seems a bit weird to me. I see that you changed this because 
`Tasks#allTaskIds()` returns a `Collection` instead of a `Set`. I think it is 
fine to let `Tasks#allTaskIds()` return a `Set`. The other reason you changed 
that is that is `Tasks#allTasks()` computing its result from two collections. 
Those two collections can be transformed to sets. Maybe we should also consider 
to change the return type of `allTasks()` to a set.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -34,29 +33,40 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) 
and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring 
active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
     private final Logger log;
-    private final TopologyMetadata topologyMetadata;
-
-    private final Map<TaskId, Task> allTasksPerId = 
Collections.synchronizedSortedMap(new TreeMap<>());
-    private final Map<TaskId, Task> readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
-    private final Collection<Task> readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
 
     // TODO: change type to `StreamTask`
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
+    // TODO: change type to `StandbyTask`
+    private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
+
+    // Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
+    // these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
+    // we receive a new assignment and they are revoked from the thread.
+
+    // Tasks may have been assigned but not yet created because:
+    // 1. They are for a NamedTopology that is yet known by this host.
+    // 2. They are to be recycled from an existing restoring task yet to be 
returned from the task updater.

Review Comment:
   ```suggestion
       // 2. They are to be recycled from an existing restoring task yet to be 
returned from the state updater.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java:
##########
@@ -55,25 +57,28 @@ public abstract class AbstractTask implements Task {
     protected Map<TopicPartition, Long> offsetSnapshotSinceLastFlush = null;
 
     protected final TaskId id;
+    protected final TaskConfig config;
     protected final ProcessorTopology topology;
     protected final StateDirectory stateDirectory;
     protected final ProcessorStateManager stateMgr;
+
     private final long taskTimeoutMs;
 
     AbstractTask(final TaskId id,
                  final ProcessorTopology topology,
                  final StateDirectory stateDirectory,
                  final ProcessorStateManager stateMgr,
                  final Set<TopicPartition> inputPartitions,
-                 final long taskTimeoutMs,
+                 final TaskConfig config,
                  final String taskType,
                  final Class<? extends AbstractTask> clazz) {
         this.id = id;
         this.stateMgr = stateMgr;
         this.topology = topology;
+        this.config = config;
         this.inputPartitions = inputPartitions;
         this.stateDirectory = stateDirectory;
-        this.taskTimeoutMs = taskTimeoutMs;
+        this.taskTimeoutMs = config.taskTimeoutMs;

Review Comment:
   I would not use a dedicated field for the task timeout but just directly use 
the task config object `config`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -80,125 +87,165 @@ void setMainConsumer(final Consumer<byte[], byte[]> 
mainConsumer) {
         this.mainConsumer = mainConsumer;
     }
 
-    void handleNewAssignmentAndCreateTasks(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
-                                           final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
-                                           final Set<TaskId> 
assignedActiveTasks,
-                                           final Set<TaskId> 
assignedStandbyTasks) {
-        activeTaskCreator.removeRevokedUnknownTasks(assignedActiveTasks);
-        standbyTaskCreator.removeRevokedUnknownTasks(assignedStandbyTasks);
-        createTasks(activeTasksToCreate, standbyTasksToCreate);
+    void purgePendingTasks(final Set<TaskId> assignedActiveTasks, final 
Set<TaskId> assignedStandbyTasks) {
+        pendingActiveTasks.keySet().retainAll(assignedActiveTasks);
+        pendingStandbyTasks.keySet().retainAll(assignedStandbyTasks);
+    }
+
+    void addActivePendingTasks(final Map<TaskId, Set<TopicPartition>> 
pendingTasks) {
+        pendingActiveTasks.putAll(pendingTasks);
+    }
+
+    void addStandbyPendingTasks(final Map<TaskId, Set<TopicPartition>> 
pendingTasks) {
+        pendingStandbyTasks.putAll(pendingTasks);
     }
 
-    void maybeCreateTasksFromNewTopologies(final Set<String> 
currentNamedTopologies) {
+    void createPendingTasks(final Set<String> currentNamedTopologies) {
         createTasks(
-            
activeTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies),
-            
standbyTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies)
+            pendingActiveTasksForTopologies(currentNamedTopologies),
+            pendingStandbyTasksForTopologies(currentNamedTopologies)
         );
     }
 
-    double totalProducerBlockedTime() {
-        return activeTaskCreator.totalProducerBlockedTime();
+    private Map<TaskId, Set<TopicPartition>> 
pendingActiveTasksForTopologies(final Set<String> currentTopologies) {
+        return filterMap(pendingActiveTasks, t -> 
currentTopologies.contains(t.getKey().topologyName()));
+    }
+
+    private Map<TaskId, Set<TopicPartition>> 
pendingStandbyTasksForTopologies(final Set<String> currentTopologies) {
+        return filterMap(pendingStandbyTasks, t -> 
currentTopologies.contains(t.getKey().topologyName()));
     }
 
     void createTasks(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
                      final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate) {
+        createActiveTasks(activeTasksToCreate);
+        createStandbyTasks(standbyTasksToCreate);
+    }
+
+    private void createActiveTasks(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate) {
         for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated : 
activeTasksToCreate.entrySet()) {
             final TaskId taskId = taskToBeCreated.getKey();
 
             if (activeTasksPerId.containsKey(taskId)) {
                 throw new IllegalStateException("Attempted to create an active 
task that we already own: " + taskId);
             }
-        }
 
-        for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated : 
standbyTasksToCreate.entrySet()) {
-            final TaskId taskId = taskToBeCreated.getKey();
-
-            if (standbyTasksPerId.containsKey(taskId)) {
-                throw new IllegalStateException("Attempted to create a standby 
task that we already own: " + taskId);
+            if (pendingStandbyTasks.containsKey(taskId)) {
+                throw new IllegalStateException("Attempted to create an active 
task while we already own its standby: " + taskId);
             }
         }
 
-        // keep this check to simplify testing (ie, no need to mock 
`activeTaskCreator`)
         if (!activeTasksToCreate.isEmpty()) {
-            // TODO: change type to `StreamTask`
             for (final Task activeTask : 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
                 activeTasksPerId.put(activeTask.id(), activeTask);
-                allTasksPerId.put(activeTask.id(), activeTask);
+                pendingActiveTasks.remove(activeTask.id());
                 for (final TopicPartition topicPartition : 
activeTask.inputPartitions()) {
                     activeTasksPerPartition.put(topicPartition, activeTask);
                 }
             }
         }
+    }
+
+    private void createStandbyTasks(final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate) {
+        for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated : 
standbyTasksToCreate.entrySet()) {
+            final TaskId taskId = taskToBeCreated.getKey();
+
+            if (standbyTasksPerId.containsKey(taskId)) {
+                throw new IllegalStateException("Attempted to create an active 
task that we already own: " + taskId);
+            }
+
+            if (pendingActiveTasks.containsKey(taskId)) {
+                throw new IllegalStateException("Attempted to create an active 
task while we already own its standby: " + taskId);
+            }
+        }
 
-        // keep this check to simplify testing (ie, no need to mock 
`standbyTaskCreator`)
         if (!standbyTasksToCreate.isEmpty()) {
-            // TODO: change type to `StandbyTask`
             for (final Task standbyTask : 
standbyTaskCreator.createTasks(standbyTasksToCreate)) {
                 standbyTasksPerId.put(standbyTask.id(), standbyTask);
-                allTasksPerId.put(standbyTask.id(), standbyTask);
+                pendingActiveTasks.remove(standbyTask.id());
+            }
+        }
+    }
+
+    void removeTask(final Task taskToRemove) {
+        final TaskId taskId = taskToRemove.id();
+
+        if (taskToRemove.state() != Task.State.CLOSED) {
+            throw new IllegalStateException("Attempted to remove an task that 
is not closed: " + taskId);

Review Comment:
   ```suggestion
               throw new IllegalStateException("Attempted to remove a task that 
is not closed: " + taskId);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##########
@@ -234,6 +236,43 @@ public void closeCleanAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
+    /**
+     * Create an active task from this standby task without closing and 
re-initializing the state stores.
+     * The task should have been in suspended state when calling this function
+     *
+     * TODO: we should be able to not need the input partitions as input param 
in future but always reuse
+     *       the task's input partitions when we have fixed partitions -> 
tasks mapping
+     */
+    public StreamTask recycle(final Time time,

Review Comment:
   Could you add unit tests for this method?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1012,14 +1048,16 @@ Set<TaskId> standbyTaskIds() {
             .collect(Collectors.toSet());
     }
 
-    Map<TaskId, Task> tasks() {
+    Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        return tasks.tasksPerId();
+        return tasks.allTasksPerId();
     }
 
     Map<TaskId, Task> notPausedTasks() {

Review Comment:
   Could you please add unit tests for this method? I think you can reuse the 
one that was deleted in `TasksTest`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -573,6 +572,45 @@ public void closeCleanAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
+    /**
+     * Create a standby task from this active task without closing and 
re-initializing the state stores.
+     * The task should have been in suspended state when calling this function
+     *
+     * TODO: we should be able to not need the input partitions as input param 
in future but always reuse
+     *       the task's input partitions when we have fixed partitions -> 
tasks mapping
+     */
+    public StandbyTask recycle(final Set<TopicPartition> inputPartitions) {

Review Comment:
   Could you add unit tests for this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to