This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 922d0d0e5cb MINOR: Do not check whether updating tasks exist in the 
waiting loop (#14791)
922d0d0e5cb is described below

commit 922d0d0e5cbc759c80a5464f88f3338026e56d0e
Author: Bruno Cadonna <[email protected]>
AuthorDate: Sat Nov 18 21:10:21 2023 +0100

    MINOR: Do not check whether updating tasks exist in the waiting loop 
(#14791)
    
    The state updater waits on a condition variable if no tasks exist that need 
to be updated. The condition variable is wrapped by a loop to account for 
spurious wake-ups. The check whether updating tasks exist is done in the 
condition of the loop. Actually, the state updater thread can change whether 
updating tasks exists, but since the state updater thread is waiting for the 
condition variable the check for the existence of updating tasks will always 
return the same value as long as th [...]
    
    This commit moves check before the loop making also the usage of mocks more 
robust since the processing becomes more deterministic.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../apache/kafka/streams/processor/internals/DefaultStateUpdater.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index c2872caee5b..e5a6aff47c9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -306,11 +306,13 @@ public class DefaultStateUpdater implements StateUpdater {
 
         private void waitIfAllChangelogsCompletelyRead() {
             tasksAndActionsLock.lock();
+            final boolean noTasksToUpdate = 
changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty();
             try {
                 while (isRunning.get() &&
-                    (changelogReader.allChangelogsCompleted() || 
updatingTasks.isEmpty()) &&
+                    noTasksToUpdate &&
                     tasksAndActions.isEmpty() &&
                     !isTopologyResumed.get()) {
+
                     isIdle.set(true);
                     tasksAndActionsCondition.await();
                 }

Reply via email to