lucasbru commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1017943192
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -277,6 +294,9 @@ private void addTask(final Task task) {
}
}
}
+ if (task.state() == Task.State.CREATED) {
+ tasksToInitialize.offer(task);
+ }
Review Comment:
> With this order, you register the task's changelogs and state manager
before you initialize the task. Did you verify whether that causes any
side-effects?
I don't think it should make a difference, but I restored the order in the
current version anyways
> I think we also have a concurrency issue here. In case of a stateless
task, we add the task to the queue of restored tasks but keep a reference to
the task in the state updater for initialization. It could be that the main
thread reads the stateless task from the queue of restored tasks, but the state
updater thread still needs to intialize the task.
>
> So I think in `addTask()` we can only add the task to `tasksToInitialize`
and initialize the task as well as register it to the changelog reader or
adding it to the queue of restored tasks in a different method similar to
`initializeTasksIfNeeded()` (maybe named differently) that does not hold the
lock on the input queue.
Yes, this is the solution I proposed yesterday, and we agreed to do it in
the call today. However, during implementation I realized that there are a lot
of complications related to `tasksToInitialize` - in many places we now have to
handle the tasks from both `updatingTasks` and `tasksToInitialize`, and it felt
like an over-complication to me.
I propose an alternative solution to the problem: Instead of keeping an
additional queue, we simply acquire and release the `tasksAndActions` lock for
every item of `tasksAndActions` separately, so that we can initialize the task
outside the critical section. So this solution should solve all problems we
discussed:
- Initialization does not block the main thread
- Stateless tasks are only returned to the main thread after initialization
has finished
- It's rather trivial to handle exceptions during initialization, because
we remove tasks from `tasksAndActions` one by one and do not have multiple
tasks in-flight.
- The order of initialization and changelog registration should be the same
as before, so we are safe if changing the order would cause any problems.
> If we do this we can maybe also get rid of the verifications whether the
state updater is enabled to avoid registering the changelogs to the changelog
reader during intialization as done here:
>
>
https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L215
>
>
> and remove the explicit changelog registration in the state updater. Could
you check this?
To me, it seems that it's cleaner to leave this as is. Changelogs are
registered and unregistered when tasks are added/removed. Initialization only
happens when the task is added _AND_ its state is CREATED. So the two
operations have different lifetimes - changelogs are only registered when the
task is owned by the state updater, but tasks remain initialized after being
returned to the main thread. We could attempt to change this behavior
(essentially not unregister changelogs when tasks are removed from the state
updater, but I suppose this behavior was added for a reason). As long as we
want to keep this behavior, I think it is cleaner to handle both registration
and unregistration directly inside StateUpdater.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]