cadonna commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r931357457


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -493,7 +531,9 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
         final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.allTasks()) {
             try {
-                task.initializeIfNeeded();
+                if (task.initializeIfNeeded() && stateUpdater != null) {
+                    stateUpdater.add(task);
+                }

Review Comment:
   We discussed this offline yesterday. For future reference, even if we 
initialize the tasks in `createNewTasks()` and we do not run into failures in 
the state updater, we are afraid that the initialization would take too long 
due to state store initializations. The handle assignment logic is executed 
during a rebalance, i.e., within a call to `Consumer#poll()` and it should not 
take too long to avoid timeouts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to