This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 922d0d0e5cb MINOR: Do not check whether updating tasks exist in the
waiting loop (#14791)
922d0d0e5cb is described below
commit 922d0d0e5cbc759c80a5464f88f3338026e56d0e
Author: Bruno Cadonna <[email protected]>
AuthorDate: Sat Nov 18 21:10:21 2023 +0100
MINOR: Do not check whether updating tasks exist in the waiting loop
(#14791)
The state updater waits on a condition variable if no tasks exist that need
to be updated. The condition variable is wrapped by a loop to account for
spurious wake-ups. The check whether updating tasks exist is done in the
condition of the loop. Actually, the state updater thread can change whether
updating tasks exists, but since the state updater thread is waiting for the
condition variable the check for the existence of updating tasks will always
return the same value as long as th [...]
This commit moves check before the loop making also the usage of mocks more
robust since the processing becomes more deterministic.
Reviewers: Lucas Brutschy <[email protected]>
---
.../apache/kafka/streams/processor/internals/DefaultStateUpdater.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index c2872caee5b..e5a6aff47c9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -306,11 +306,13 @@ public class DefaultStateUpdater implements StateUpdater {
private void waitIfAllChangelogsCompletelyRead() {
tasksAndActionsLock.lock();
+ final boolean noTasksToUpdate =
changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty();
try {
while (isRunning.get() &&
- (changelogReader.allChangelogsCompleted() ||
updatingTasks.isEmpty()) &&
+ noTasksToUpdate &&
tasksAndActions.isEmpty() &&
!isTopologyResumed.get()) {
+
isIdle.set(true);
tasksAndActionsCondition.await();
}