This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 3311b9a5d2a KAFKA-18168: Adding checkpointing for GlobalKTable during
restoration and closing (#18752)
3311b9a5d2a is described below
commit 3311b9a5d2a2f728072ca6ca367772b5f16072bd
Author: Janindu Pathirana <[email protected]>
AuthorDate: Mon Mar 3 01:46:48 2025 +0530
KAFKA-18168: Adding checkpointing for GlobalKTable during restoration and
closing (#18752)
To address the issue of not creating a checkpoint file during the
restoring and closing process, called the
GlobalStateUpdateTask.flushState() method in
GlobalStateUpdateTask.initialize() and GlobalStateUpdateTask.close()
methods. This will flush the state and create a checkpoint file thereby,
avoiding the need to completely restore the entire state.
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../processor/internals/GlobalStateUpdateTask.java | 4 +++
.../processor/internals/GlobalStateTaskTest.java | 33 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 12a6beedbcd..ea0359225de 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -96,6 +96,7 @@ public class GlobalStateUpdateTask implements
GlobalStateMaintainer {
}
initTopology();
processorContext.initialize();
+ flushState();
lastFlush = time.milliseconds();
return stateMgr.changelogOffsets();
}
@@ -138,6 +139,9 @@ public class GlobalStateUpdateTask implements
GlobalStateMaintainer {
}
public void close(final boolean wipeStateStore) throws IOException {
+ if (!wipeStateStore) {
+ flushState();
+ }
stateMgr.close();
if (wipeStateStore) {
try {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 24b09024d2f..5aa3a248b87 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -254,6 +254,10 @@ public class GlobalStateTaskTest {
globalStateTask.initialize();
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L,
"foo".getBytes(), "foo".getBytes()));
time.sleep(flushInterval); // flush interval elapsed
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets());
@@ -269,6 +273,10 @@ public class GlobalStateTaskTest {
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 10000L,
"foo".getBytes(), "foo".getBytes()));
time.sleep(flushInterval / 2);
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets());
@@ -288,6 +296,10 @@ public class GlobalStateTaskTest {
// 10000 records received since last flush => do not flush
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9999L,
"foo".getBytes(), "foo".getBytes()));
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets());
@@ -333,4 +345,25 @@ public class GlobalStateTaskTest {
globalStateTask.close(true);
assertFalse(stateMgr.baseDir().exists());
}
+
+ @Test
+ public void shouldCheckpointDuringInitialization() {
+ globalStateTask.initialize();
+
+ assertTrue(stateMgr.checkpointWritten);
+ assertTrue(stateMgr.flushed);
+ }
+
+ @Test
+ public void shouldCheckpointDuringClose() throws Exception {
+ globalStateTask.initialize();
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
+ globalStateTask.close(false);
+
+ assertTrue(stateMgr.checkpointWritten);
+ assertTrue(stateMgr.flushed);
+ }
}