This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f837d1599 [flink][cdc] Keep table-aware split type during checkpoint 
(#8093)
0f837d1599 is described below

commit 0f837d1599d867eafc021d8e4a55b0084866e59b
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 3 14:12:01 2026 +0800

    [flink][cdc] Keep table-aware split type during checkpoint (#8093)
    
    `TableAwareFileStoreSourceSplit` extends `FileStoreSourceSplit`, but it
    inherited `updateWithRecordsToSkip` from the parent class. During reader
    checkpointing, `FileStoreSourceSplitState.toSourceSplit()` calls that
    method and returned a plain `FileStoreSourceSplit`, dropping the CDC
    table metadata and causing CDC reader state restoration to cast the
    split back to `TableAwareFileStoreSourceSplit`.
    
    This PR overrides `updateWithRecordsToSkip` in
    `TableAwareFileStoreSourceSplit` so checkpointed active splits keep
    their table-aware type and preserve `identifier`, `lastSchemaId`, and
    `schemaId`.
---
 .../cdc/source/TableAwareFileStoreSourceSplit.java |  6 +++
 ...bleAwareFileStoreSourceSplitSerializerTest.java | 50 +++++++++++++++++-----
 2 files changed, 45 insertions(+), 11 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
index ed84e14133..aa05922520 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
@@ -69,6 +69,12 @@ public class TableAwareFileStoreSourceSplit extends 
FileStoreSourceSplit {
         return schemaId;
     }
 
+    @Override
+    public TableAwareFileStoreSourceSplit updateWithRecordsToSkip(long 
recordsToSkip) {
+        return new TableAwareFileStoreSourceSplit(
+                splitId(), split(), recordsToSkip, identifier, lastSchemaId, 
schemaId);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof TableAwareFileStoreSourceSplit)) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
index 901c7d0b1a..79ccabf518 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
@@ -19,8 +19,11 @@
 package org.apache.paimon.flink.pipeline.cdc.source;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitState;
 import org.apache.paimon.table.source.DataSplit;
 
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -37,18 +40,9 @@ public class TableAwareFileStoreSourceSplitSerializerTest {
     @Test
     public void test() throws Exception {
         Identifier identifier = Identifier.create("test_database", 
"test_table");
-        DataSplit dataSplit =
-                DataSplit.builder()
-                        .withSnapshot(1)
-                        .withPartition(row(1))
-                        .withBucket(2)
-                        .withDataFiles(Arrays.asList(newFile(0), newFile(1)))
-                        .isStreaming(false)
-                        .rawConvertible(false)
-                        .withBucketPath("/temp/2") // not used
-                        .build();
         TableAwareFileStoreSourceSplit split =
-                new TableAwareFileStoreSourceSplit("split-1", dataSplit, 0L, 
identifier, null, 1L);
+                new TableAwareFileStoreSourceSplit(
+                        "split-1", newDataSplit(), 0L, identifier, null, 1L);
 
         TableAwareFileStoreSourceSplit.Serializer serializer =
                 new TableAwareFileStoreSourceSplit.Serializer();
@@ -57,4 +51,38 @@ public class TableAwareFileStoreSourceSplitSerializerTest {
                 serializer.deserialize(serializer.getVersion(), serialized);
         assertThat(deserialized).isEqualTo(split);
     }
+
+    @Test
+    public void testUpdateWithRecordsToSkipKeepsTableAwareSplit() {
+        Identifier identifier = Identifier.create("test_database", 
"test_table");
+        DataSplit dataSplit = newDataSplit();
+        TableAwareFileStoreSourceSplit split =
+                new TableAwareFileStoreSourceSplit("split-1", dataSplit, 0L, 
identifier, 1L, 2L);
+        FileStoreSourceSplitState state = new FileStoreSourceSplitState(split);
+
+        state.setPosition(new RecordAndPosition<>(null, 
RecordAndPosition.NO_OFFSET, 10L));
+
+        FileStoreSourceSplit restored = state.toSourceSplit();
+        
assertThat(restored).isInstanceOf(TableAwareFileStoreSourceSplit.class);
+        TableAwareFileStoreSourceSplit tableAwareRestored =
+                (TableAwareFileStoreSourceSplit) restored;
+        assertThat(tableAwareRestored.splitId()).isEqualTo(split.splitId());
+        assertThat(tableAwareRestored.split()).isEqualTo(split.split());
+        assertThat(tableAwareRestored.recordsToSkip()).isEqualTo(10L);
+        assertThat(tableAwareRestored.getIdentifier()).isEqualTo(identifier);
+        assertThat(tableAwareRestored.getLastSchemaId()).isEqualTo(1L);
+        assertThat(tableAwareRestored.getSchemaId()).isEqualTo(2L);
+    }
+
+    private static DataSplit newDataSplit() {
+        return DataSplit.builder()
+                .withSnapshot(1)
+                .withPartition(row(1))
+                .withBucket(2)
+                .withDataFiles(Arrays.asList(newFile(0), newFile(1)))
+                .isStreaming(false)
+                .rawConvertible(false)
+                .withBucketPath("/temp/2") // not used
+                .build();
+    }
 }

Reply via email to