cadonna commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930069951


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final 
TaskId taskId,
         );
     }
 
+    /*
+     * TODO: we pass in the new input partitions to validate if they still 
match,
+     *       in the future we when we have fixed partitions -> tasks mapping,
+     *       we should always reuse the input partition and hence no need 
validations
+     */
     StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
                                            final Set<TopicPartition> 
inputPartitions,
                                            final Consumer<byte[], byte[]> 
consumer) {
+        if (!inputPartitions.equals(standbyTask.inputPartitions)) {
+            log.warn("Detected unmatched input partitions for task {} when 
recycling it from standby to active", standbyTask.id);
+        }
+
         final RecordCollector recordCollector = 
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), 
standbyTask.topology);
-        final StreamTask task = standbyTask.recycle(time, cache, 
recordCollector, inputPartitions, consumer);
+        final StreamTask task = new StreamTask(
+            standbyTask.id,
+            inputPartitions,
+            standbyTask.topology,
+            consumer,
+            standbyTask.config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            standbyTask.stateMgr,
+            recordCollector,
+            standbyTask.processorContext,
+            standbyTask.logContext
+        );
 
-        log.trace("Created active task {} with assigned partitions {}", 
task.id, inputPartitions);
+        log.trace("Recycled active task {} from recycled standby with assigned 
partitions {}", task.id, inputPartitions);

Review Comment:
   ```suggestion
           log.trace("Create active task {} from recycled standby task with 
assigned partitions {}", task.id, inputPartitions);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -493,7 +531,9 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
         final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.allTasks()) {
             try {
-                task.initializeIfNeeded();
+                if (task.initializeIfNeeded() && stateUpdater != null) {
+                    stateUpdater.add(task);
+                }

Review Comment:
   I am not sure this is the right place to add the task to the state updater. 
I would rather add brand new tasks in `createNewTasks()`. We can add recycled 
tasks once we read the tasks to recycle from the removed tasks queue of the 
state updater and we recycled them. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -219,7 +219,7 @@ public boolean isActive() {
      * @throws StreamsException fatal error, should close the thread
      */
     @Override
-    public void initializeIfNeeded() {
+    public boolean initializeIfNeeded() {

Review Comment:
   See my comment in `StandbyTask` regarding this return value.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java:
##########
@@ -109,11 +107,30 @@ Collection<Task> createTasks(final Map<TaskId, 
Set<TopicPartition>> tasksToBeCre
         return createdTasks;
     }
 
+    /*
+     * TODO: we pass in the new input partitions to validate if they still 
match,
+     *       in the future we when we have fixed partitions -> tasks mapping,
+     *       we should always reuse the input partition and hence no need 
validations
+     */
     StandbyTask createStandbyTaskFromActive(final StreamTask streamTask,

Review Comment:
   Could you please add a unit test for this method?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##########
@@ -91,7 +92,7 @@ public boolean isActive() {
      * @throws StreamsException fatal error, should close the thread
      */
     @Override
-    public void initializeIfNeeded() {
+    public boolean initializeIfNeeded() {

Review Comment:
   I think you do not need to add the boolean return value here. See my other 
comment in `tryToCompleteRestoration()`. I would rather extract the if-branch 
of `if (state() == State.CREATED)` to a separate method and call that method in 
`TaskManager#createNewTasks()`. WDYT?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##########
@@ -77,6 +93,22 @@ public void registerTaskError(final Task task, final 
Throwable t, final long now
         }
     }
 
+    Collection<Task> successfullyProcessed() {
+        return successfullyProcessed;
+    }
+
+    void addToSuccessfullyProcessed(final Task task) {
+        successfullyProcessed.add(task);
+    }
+
+    void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {
+        successfullyProcessed.remove(task);
+    }
+
+    void clearSuccessfullyProcessed() {
+        successfullyProcessed.clear();
+    }
+

Review Comment:
   Is this better suited for `TaskExecutor`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final 
TaskId taskId,
         );
     }
 
+    /*
+     * TODO: we pass in the new input partitions to validate if they still 
match,
+     *       in the future we when we have fixed partitions -> tasks mapping,
+     *       we should always reuse the input partition and hence no need 
validations
+     */
     StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,

Review Comment:
   Could you please add a unit test for this method?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -480,6 +503,21 @@ private void closeAndRecycleTasks(final Map<Task, 
Set<TopicPartition>> tasksToRe
         }
     }
 
+    private void convertActiveToStandby(final StreamTask activeTask,
+                                        final Set<TopicPartition> partitions) {
+        activeTask.recycleAndConvert();

Review Comment:
   Could we do this inside `createStandbyTaskFromActive()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -480,6 +503,21 @@ private void closeAndRecycleTasks(final Map<Task, 
Set<TopicPartition>> tasksToRe
         }
     }
 
+    private void convertActiveToStandby(final StreamTask activeTask,
+                                        final Set<TopicPartition> partitions) {
+        activeTask.recycleAndConvert();
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
+        final StandbyTask standbyTask = 
standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
+        tasks.replaceActiveWithStandby(standbyTask);
+    }
+
+    private void convertStandbyToActive(final StandbyTask standbyTask,
+                                        final Set<TopicPartition> partitions) {
+        standbyTask.recycleAndConvert();

Review Comment:
   See my comment above in `convertActiveToStandby()`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -544,7 +548,7 @@ public void updateInputPartitions(final Set<TopicPartition> 
topicPartitions, fin
     }
 
     @Override
-    public void closeCleanAndRecycleState() {
+    public void recycleAndConvert() {

Review Comment:
   This name is a bit misleading. Something like `prepareForRecycle()` or 
similar would be better, IMO.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -377,7 +390,17 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             throw first.getValue();
         }
 
-        tasks.createTasks(activeTasksToCreate, standbyTasksToCreate);
+        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+    }
+
+    private void createNewTasks(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate) {
+        final Collection<Task> newActiveTasks = activeTasksToCreate.isEmpty() ?
+            Collections.emptySet() : 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate);
+        final Collection<Task> newStandbyTask = standbyTasksToCreate.isEmpty() 
?
+            Collections.emptySet() : 
standbyTaskCreator.createTasks(standbyTasksToCreate);

Review Comment:
   Why do you use this expression? Method `ActiveTaskCreator#createTasks()` 
would return an empty set when `activeTasksToCreate` is empty. Same is true for 
`StandbyTaskCreator#createTasks()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to