cadonna commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r954595559


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -565,6 +565,13 @@ public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> 
getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
+    @Override
+    public boolean restoresActiveTasks() {

Review Comment:
   I called this method `restoresActiveTasks()` instead of `hasActiveTasks()` 
or `containsActiveTasks()` because there might also be active tasks in 
restoration that are paused. IMO, we should not return true if the state 
updater only has active tasks in restoration that are paused. If we did that we 
would not transit the stream thread to `RUNNING` when all non-paused active 
tasks left the state updater.
   
   However, that would also mean that if all non-paused active tasks are 
`RUNNING` (i.e. the stream thread is also `RUNNING`) and the paused active 
tasks in restoration are unpaused, `restoresActiveTasks()` would again return 
`true` (i.e. there are active tasks in restoration) without a rebalancing in 
between. Due to this case we need to be careful to not switch the state of the 
stream thread if the stream thread is `RUNNING` and `restoresActiveTasks()` 
returns `true`. 
   
   For all of this, I think it is not a good idea to provide a default 
implementation to `restoresActiveTasks()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to