This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0f837d1599 [flink][cdc] Keep table-aware split type during checkpoint
(#8093)
0f837d1599 is described below
commit 0f837d1599d867eafc021d8e4a55b0084866e59b
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 3 14:12:01 2026 +0800
[flink][cdc] Keep table-aware split type during checkpoint (#8093)
`TableAwareFileStoreSourceSplit` extends `FileStoreSourceSplit`, but it
inherited `updateWithRecordsToSkip` from the parent class. During reader
checkpointing, `FileStoreSourceSplitState.toSourceSplit()` calls that
method and returned a plain `FileStoreSourceSplit`, dropping the CDC
table metadata and causing CDC reader state restoration to cast the
split back to `TableAwareFileStoreSourceSplit`.
This PR overrides `updateWithRecordsToSkip` in
`TableAwareFileStoreSourceSplit` so checkpointed active splits keep
their table-aware type and preserve `identifier`, `lastSchemaId`, and
`schemaId`.
---
.../cdc/source/TableAwareFileStoreSourceSplit.java | 6 +++
...bleAwareFileStoreSourceSplitSerializerTest.java | 50 +++++++++++++++++-----
2 files changed, 45 insertions(+), 11 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
index ed84e14133..aa05922520 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java
@@ -69,6 +69,12 @@ public class TableAwareFileStoreSourceSplit extends
FileStoreSourceSplit {
return schemaId;
}
+ @Override
+ public TableAwareFileStoreSourceSplit updateWithRecordsToSkip(long
recordsToSkip) {
+ return new TableAwareFileStoreSourceSplit(
+ splitId(), split(), recordsToSkip, identifier, lastSchemaId,
schemaId);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof TableAwareFileStoreSourceSplit)) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
index 901c7d0b1a..79ccabf518 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplitSerializerTest.java
@@ -19,8 +19,11 @@
package org.apache.paimon.flink.pipeline.cdc.source;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitState;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -37,18 +40,9 @@ public class TableAwareFileStoreSourceSplitSerializerTest {
@Test
public void test() throws Exception {
Identifier identifier = Identifier.create("test_database",
"test_table");
- DataSplit dataSplit =
- DataSplit.builder()
- .withSnapshot(1)
- .withPartition(row(1))
- .withBucket(2)
- .withDataFiles(Arrays.asList(newFile(0), newFile(1)))
- .isStreaming(false)
- .rawConvertible(false)
- .withBucketPath("/temp/2") // not used
- .build();
TableAwareFileStoreSourceSplit split =
- new TableAwareFileStoreSourceSplit("split-1", dataSplit, 0L,
identifier, null, 1L);
+ new TableAwareFileStoreSourceSplit(
+ "split-1", newDataSplit(), 0L, identifier, null, 1L);
TableAwareFileStoreSourceSplit.Serializer serializer =
new TableAwareFileStoreSourceSplit.Serializer();
@@ -57,4 +51,38 @@ public class TableAwareFileStoreSourceSplitSerializerTest {
serializer.deserialize(serializer.getVersion(), serialized);
assertThat(deserialized).isEqualTo(split);
}
+
+ @Test
+ public void testUpdateWithRecordsToSkipKeepsTableAwareSplit() {
+ Identifier identifier = Identifier.create("test_database",
"test_table");
+ DataSplit dataSplit = newDataSplit();
+ TableAwareFileStoreSourceSplit split =
+ new TableAwareFileStoreSourceSplit("split-1", dataSplit, 0L,
identifier, 1L, 2L);
+ FileStoreSourceSplitState state = new FileStoreSourceSplitState(split);
+
+ state.setPosition(new RecordAndPosition<>(null,
RecordAndPosition.NO_OFFSET, 10L));
+
+ FileStoreSourceSplit restored = state.toSourceSplit();
+
assertThat(restored).isInstanceOf(TableAwareFileStoreSourceSplit.class);
+ TableAwareFileStoreSourceSplit tableAwareRestored =
+ (TableAwareFileStoreSourceSplit) restored;
+ assertThat(tableAwareRestored.splitId()).isEqualTo(split.splitId());
+ assertThat(tableAwareRestored.split()).isEqualTo(split.split());
+ assertThat(tableAwareRestored.recordsToSkip()).isEqualTo(10L);
+ assertThat(tableAwareRestored.getIdentifier()).isEqualTo(identifier);
+ assertThat(tableAwareRestored.getLastSchemaId()).isEqualTo(1L);
+ assertThat(tableAwareRestored.getSchemaId()).isEqualTo(2L);
+ }
+
+ private static DataSplit newDataSplit() {
+ return DataSplit.builder()
+ .withSnapshot(1)
+ .withPartition(row(1))
+ .withBucket(2)
+ .withDataFiles(Arrays.asList(newFile(0), newFile(1)))
+ .isStreaming(false)
+ .rawConvertible(false)
+ .withBucketPath("/temp/2") // not used
+ .build();
+ }
}