cadonna commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r954595559
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
return executeWithQueuesLocked(() ->
getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
}
+ @Override
+ public boolean restoresActiveTasks() {
Review Comment:
I called this method `restoresActiveTasks()` instead of `hasActiveTasks()`
or `containsActiveTasks()` because there might also be active tasks in
restoration that are paused. IMO, we should not return true if the state
updater only has active tasks in restoration that are paused. If we did that we
would not transit the stream thread to `RUNNING` when all non-paused active
tasks left the state updater.
However, that would also mean that if all non-paused active tasks are
`RUNNING` (i.e. the stream thread is also `RUNNING`) and the paused active
tasks in restoration are unpaused, `restoresActiveTasks()` would again return
`true` (i.e. there are active tasks in restoration) without a rebalancing in
between. Due to this case we need to be careful to not switch the state of the
stream thread if the stream thread is `RUNNING` and `restoresActiveTasks()`
returns `true`.
For all of this, I think it is not a good idea to provide a default
implementation to `restoresActiveTasks()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]