cadonna commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1012934682
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -395,8 +395,10 @@ private void createNewTasks(final Map<TaskId,
Set<TopicPartition>> activeTasksTo
tasks.addActiveTasks(newActiveTasks);
tasks.addStandbyTasks(newStandbyTask);
} else {
- tasks.addPendingTaskToInit(newActiveTasks);
- tasks.addPendingTaskToInit(newStandbyTask);
+ final Map<TaskId, RuntimeException> taskInitExceptions = new
LinkedHashMap<>();
+ Stream.concat(newActiveTasks.stream(), newStandbyTask.stream())
+ .forEach(t -> addTaskToStateUpdater(t,
taskInitExceptions));
Review Comment:
Yeah, we considered also to initialise the tasks inside the state updater,
but we decided to do it outside so that we do not need to change the state of a
task inside the state updater. It seemed cleaner to us to keep lifecycle
management away from the state updater. The state updater should only update
the states.
What are the advantages of moving the initialisation into the state updater?
I see that we would not need to explicitly handle exceptions during
intialisation of tasks since that would happen automatically within the state
updater. Is there something else?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]