This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 3311b9a5d2a KAFKA-18168: Adding checkpointing for GlobalKTable during 
restoration and closing (#18752)
3311b9a5d2a is described below

commit 3311b9a5d2a2f728072ca6ca367772b5f16072bd
Author: Janindu Pathirana <[email protected]>
AuthorDate: Mon Mar 3 01:46:48 2025 +0530

    KAFKA-18168: Adding checkpointing for GlobalKTable during restoration and 
closing (#18752)
    
    To address the issue of not creating a checkpoint file during the
    restoring and closing process, called the
    GlobalStateUpdateTask.flushState() method in
    GlobalStateUpdateTask.initialize() and GlobalStateUpdateTask.close()
    methods. This will flush the state and create a checkpoint file thereby,
    avoiding the need to completely restore the entire state.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../processor/internals/GlobalStateUpdateTask.java |  4 +++
 .../processor/internals/GlobalStateTaskTest.java   | 33 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 12a6beedbcd..ea0359225de 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -96,6 +96,7 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
         }
         initTopology();
         processorContext.initialize();
+        flushState();
         lastFlush = time.milliseconds();
         return stateMgr.changelogOffsets();
     }
@@ -138,6 +139,9 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
     }
 
     public void close(final boolean wipeStateStore) throws IOException {
+        if (!wipeStateStore) {
+            flushState();
+        }
         stateMgr.close();
         if (wipeStateStore) {
             try {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 24b09024d2f..5aa3a248b87 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -254,6 +254,10 @@ public class GlobalStateTaskTest {
         globalStateTask.initialize();
         globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L, 
"foo".getBytes(), "foo".getBytes()));
         time.sleep(flushInterval); // flush interval elapsed
+
+        stateMgr.checkpointWritten = false;
+        stateMgr.flushed = false;
+
         globalStateTask.maybeCheckpoint();
 
         assertEquals(offsets, stateMgr.changelogOffsets());
@@ -269,6 +273,10 @@ public class GlobalStateTaskTest {
         globalStateTask.update(record(topic1, 1, currentOffsetT1 + 10000L, 
"foo".getBytes(), "foo".getBytes()));
 
         time.sleep(flushInterval / 2);
+
+        stateMgr.checkpointWritten = false;
+        stateMgr.flushed = false;
+
         globalStateTask.maybeCheckpoint();
 
         assertEquals(offsets, stateMgr.changelogOffsets());
@@ -288,6 +296,10 @@ public class GlobalStateTaskTest {
 
         // 10000 records received since last flush => do not flush
         globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9999L, 
"foo".getBytes(), "foo".getBytes()));
+
+        stateMgr.checkpointWritten = false;
+        stateMgr.flushed = false;
+
         globalStateTask.maybeCheckpoint();
 
         assertEquals(offsets, stateMgr.changelogOffsets());
@@ -333,4 +345,25 @@ public class GlobalStateTaskTest {
         globalStateTask.close(true);
         assertFalse(stateMgr.baseDir().exists());
     }
+
+    @Test
+    public void shouldCheckpointDuringInitialization() {
+        globalStateTask.initialize();
+
+        assertTrue(stateMgr.checkpointWritten);
+        assertTrue(stateMgr.flushed);
+    }
+
+    @Test
+    public void shouldCheckpointDuringClose() throws Exception {
+        globalStateTask.initialize();
+
+        stateMgr.checkpointWritten = false;
+        stateMgr.flushed = false;
+
+        globalStateTask.close(false);
+
+        assertTrue(stateMgr.checkpointWritten);
+        assertTrue(stateMgr.flushed);
+    }
 }

Reply via email to