This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new b84618a143d KAFKA-18168: Adding checkpointing for GlobalKTable during
restoration and closing (#18752)
b84618a143d is described below
commit b84618a143da13dc17c5de98a818c5028764e7ce
Author: Janindu Pathirana <[email protected]>
AuthorDate: Mon Mar 3 01:46:48 2025 +0530
KAFKA-18168: Adding checkpointing for GlobalKTable during restoration and
closing (#18752)
To address the issue of not creating a checkpoint file during the
restoring and closing process, called the
GlobalStateUpdateTask.flushState() method in
GlobalStateUpdateTask.initialize() and GlobalStateUpdateTask.close()
methods. This will flush the state and create a checkpoint file thereby,
avoiding the need to completely restore the entire state.
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../processor/internals/GlobalStateUpdateTask.java | 4 +++
.../processor/internals/GlobalStateTaskTest.java | 33 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 12a6beedbcd..ea0359225de 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -96,6 +96,7 @@ public class GlobalStateUpdateTask implements
GlobalStateMaintainer {
}
initTopology();
processorContext.initialize();
+ flushState();
lastFlush = time.milliseconds();
return stateMgr.changelogOffsets();
}
@@ -138,6 +139,9 @@ public class GlobalStateUpdateTask implements
GlobalStateMaintainer {
}
public void close(final boolean wipeStateStore) throws IOException {
+ if (!wipeStateStore) {
+ flushState();
+ }
stateMgr.close();
if (wipeStateStore) {
try {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 44839f5ddb3..d839644c703 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -255,6 +255,10 @@ public class GlobalStateTaskTest {
globalStateTask.initialize();
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L,
"foo".getBytes(), "foo".getBytes()));
time.sleep(flushInterval); // flush interval elapsed
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets());
@@ -270,6 +274,10 @@ public class GlobalStateTaskTest {
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 10000L,
"foo".getBytes(), "foo".getBytes()));
time.sleep(flushInterval / 2);
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets());
@@ -289,6 +297,10 @@ public class GlobalStateTaskTest {
// 10000 records received since last flush => do not flush
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9999L,
"foo".getBytes(), "foo".getBytes()));
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets());
@@ -334,4 +346,25 @@ public class GlobalStateTaskTest {
globalStateTask.close(true);
assertFalse(stateMgr.baseDir().exists());
}
+
+ @Test
+ public void shouldCheckpointDuringInitialization() {
+ globalStateTask.initialize();
+
+ assertTrue(stateMgr.checkpointWritten);
+ assertTrue(stateMgr.flushed);
+ }
+
+ @Test
+ public void shouldCheckpointDuringClose() throws Exception {
+ globalStateTask.initialize();
+
+ stateMgr.checkpointWritten = false;
+ stateMgr.flushed = false;
+
+ globalStateTask.close(false);
+
+ assertTrue(stateMgr.checkpointWritten);
+ assertTrue(stateMgr.flushed);
+ }
}