This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 9f64ef255b4 KAFKA-17402: DefaultStateUpdated should transite task 
atomically (#18607)
9f64ef255b4 is described below

commit 9f64ef255b4d1f9b3c5bec6fc5718e2ba28bbe1a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jan 21 12:33:48 2025 -0800

    KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607)
    
    Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy 
<[email protected]>
---
 .../apache/kafka/streams/processor/internals/DefaultStateUpdater.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 3ab94f689f7..d4d90246ca7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -675,7 +675,6 @@ public class DefaultStateUpdater implements StateUpdater {
                 measureCheckpointLatency(() -> task.maybeCheckpoint(true));
                 changelogReader.unregister(changelogPartitions);
                 addToRestoredTasks(task);
-                updatingTasks.remove(task.id());
                 log.info("Stateful active task " + task.id() + " completed 
restoration");
                 transitToUpdateStandbysIfOnlyStandbysLeft();
             }
@@ -691,6 +690,7 @@ public class DefaultStateUpdater implements StateUpdater {
             restoredActiveTasksLock.lock();
             try {
                 restoredActiveTasks.add(task);
+                updatingTasks.remove(task.id());
                 log.debug("Active task " + task.id() + " was added to the 
restored tasks");
                 restoredActiveTasksCondition.signalAll();
             } finally {

Reply via email to