This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 7f9cf895d0d KAFKA-17402: DefaultStateUpdated should transite task
atomically (#18607)
7f9cf895d0d is described below
commit 7f9cf895d0ddc64474aa9164a4e1b6caa188c86d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jan 21 12:33:48 2025 -0800
KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607)
Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../apache/kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index addef5a9f15..bcb76966f6e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -673,7 +673,6 @@ public class DefaultStateUpdater implements StateUpdater {
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
changelogReader.unregister(changelogPartitions);
addToRestoredTasks(task);
- updatingTasks.remove(task.id());
log.info("Stateful active task " + task.id() + " completed
restoration");
transitToUpdateStandbysIfOnlyStandbysLeft();
}
@@ -689,6 +688,7 @@ public class DefaultStateUpdater implements StateUpdater {
restoredActiveTasksLock.lock();
try {
restoredActiveTasks.add(task);
+ updatingTasks.remove(task.id());
log.debug("Active task " + task.id() + " was added to the
restored tasks");
restoredActiveTasksCondition.signalAll();
} finally {