This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 9f64ef255b4 KAFKA-17402: DefaultStateUpdated should transite task
atomically (#18607)
9f64ef255b4 is described below
commit 9f64ef255b4d1f9b3c5bec6fc5718e2ba28bbe1a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jan 21 12:33:48 2025 -0800
KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607)
Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../apache/kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 3ab94f689f7..d4d90246ca7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -675,7 +675,6 @@ public class DefaultStateUpdater implements StateUpdater {
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
changelogReader.unregister(changelogPartitions);
addToRestoredTasks(task);
- updatingTasks.remove(task.id());
log.info("Stateful active task " + task.id() + " completed
restoration");
transitToUpdateStandbysIfOnlyStandbysLeft();
}
@@ -691,6 +690,7 @@ public class DefaultStateUpdater implements StateUpdater {
restoredActiveTasksLock.lock();
try {
restoredActiveTasks.add(task);
+ updatingTasks.remove(task.id());
log.debug("Active task " + task.id() + " was added to the
restored tasks");
restoredActiveTasksCondition.signalAll();
} finally {