guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930175351


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -493,7 +531,9 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
         final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.allTasks()) {
             try {
-                task.initializeIfNeeded();
+                if (task.initializeIfNeeded() && stateUpdater != null) {
+                    stateUpdater.add(task);
+                }

Review Comment:
   The motivation is that, if we add the newly created tasks to the state 
updater immediately, they will fail since the state updater expect their states 
to be `restoring / running`, i.e. after the tasks have already initialized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to