This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 8609ce97446 KAFKA-17402: DefaultStateUpdated should transite task
atomically (#18607)
8609ce97446 is described below
commit 8609ce9744611fa43bdd0a35e6ff0497c066528d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jan 21 12:33:48 2025 -0800
KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607)
Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../apache/kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 2fc7b537c8d..fb37b26fbf8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -674,7 +674,6 @@ public class DefaultStateUpdater implements StateUpdater {
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
changelogReader.unregister(changelogPartitions);
addToRestoredTasks(task);
- updatingTasks.remove(task.id());
log.info("Stateful active task " + task.id() + " completed
restoration");
transitToUpdateStandbysIfOnlyStandbysLeft();
}
@@ -690,6 +689,7 @@ public class DefaultStateUpdater implements StateUpdater {
restoredActiveTasksLock.lock();
try {
restoredActiveTasks.add(task);
+ updatingTasks.remove(task.id());
log.debug("Active task " + task.id() + " was added to the
restored tasks");
restoredActiveTasksCondition.signalAll();
} finally {