This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 7f9cf895d0d KAFKA-17402: DefaultStateUpdated should transite task 
atomically (#18607)
7f9cf895d0d is described below

commit 7f9cf895d0ddc64474aa9164a4e1b6caa188c86d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jan 21 12:33:48 2025 -0800

    KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607)
    
    Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy 
<[email protected]>
---
 .../apache/kafka/streams/processor/internals/DefaultStateUpdater.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index addef5a9f15..bcb76966f6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -673,7 +673,6 @@ public class DefaultStateUpdater implements StateUpdater {
                 measureCheckpointLatency(() -> task.maybeCheckpoint(true));
                 changelogReader.unregister(changelogPartitions);
                 addToRestoredTasks(task);
-                updatingTasks.remove(task.id());
                 log.info("Stateful active task " + task.id() + " completed 
restoration");
                 transitToUpdateStandbysIfOnlyStandbysLeft();
             }
@@ -689,6 +688,7 @@ public class DefaultStateUpdater implements StateUpdater {
             restoredActiveTasksLock.lock();
             try {
                 restoredActiveTasks.add(task);
+                updatingTasks.remove(task.id());
                 log.debug("Active task " + task.id() + " was added to the 
restored tasks");
                 restoredActiveTasksCondition.signalAll();
             } finally {

Reply via email to