This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 773f942441 [flink][cdc] Preserve schema id in source checkpoint (#8163)
773f942441 is described below

commit 773f942441aa6b4a51975bb9190ddda84c83a448
Author: QuakeWang <[email protected]>
AuthorDate: Tue Jun 9 13:58:45 2026 +0800

    [flink][cdc] Preserve schema id in source checkpoint (#8163)
    
    CDC source enumerator checkpoint currently stores pending splits and
    per-table next snapshot id, but it does not persist the last observed
    schema id for each table.
    
    After restore, the next generated split may use `lastSchemaId = null`
    even though the table had already emitted splits with an earlier schema.
    In that case schema diff generation treats the current schema as a
    create-table event instead of the expected schema evolution event, so
    downstream schema changes can be lost for existing target tables.
    
    This PR adds explicit per-table checkpoint progress for CDC source
    enumerator:
    
    - stores both `nextSnapshotId` and `schemaId`
    - restores `TableStatus.schemaId` together with the stream scan position
    - upgrades `CDCCheckpoint.Serializer` to v2
    - keeps v1 checkpoint compatibility with missing `schemaId` restored as
    `null`
    - writes the nested split serializer version in the new checkpoint
    format
---
 .../cdc/source/enumerator/CDCCheckpoint.java       | 135 +++++++++++++++--
 .../cdc/source/enumerator/CDCSourceEnumerator.java |  68 +++++++--
 .../enumerator/CDCCheckpointSerializerTest.java    |  64 +++++++-
 .../source/enumerator/CDCSourceEnumeratorTest.java | 163 +++++++++++++++++++--
 4 files changed, 390 insertions(+), 40 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
index b0e41fd922..1d3f959f27 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
@@ -26,6 +26,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,13 +44,13 @@ import java.util.Objects;
 public class CDCCheckpoint {
     private final Collection<TableAwareFileStoreSourceSplit> splits;
 
-    private final Map<Identifier, Long> currentSnapshotIdMap;
+    private final Map<Identifier, TableProgress> tableProgressMap;
 
     public CDCCheckpoint(
             Collection<TableAwareFileStoreSourceSplit> splits,
-            Map<Identifier, Long> currentSnapshotIdMap) {
+            Map<Identifier, TableProgress> tableProgressMap) {
         this.splits = splits;
-        this.currentSnapshotIdMap = currentSnapshotIdMap;
+        this.tableProgressMap = tableProgressMap;
     }
 
     public Collection<TableAwareFileStoreSourceSplit> getSplits() {
@@ -56,12 +58,20 @@ public class CDCCheckpoint {
     }
 
     public Map<Identifier, Long> getCurrentSnapshotIdMap() {
+        Map<Identifier, Long> currentSnapshotIdMap = new HashMap<>();
+        for (Map.Entry<Identifier, TableProgress> entry : 
tableProgressMap.entrySet()) {
+            currentSnapshotIdMap.put(entry.getKey(), 
entry.getValue().nextSnapshotId());
+        }
         return currentSnapshotIdMap;
     }
 
+    public Map<Identifier, TableProgress> getTableProgressMap() {
+        return tableProgressMap;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(splits, currentSnapshotIdMap);
+        return Objects.hash(splits, tableProgressMap);
     }
 
     @Override
@@ -72,17 +82,67 @@ public class CDCCheckpoint {
 
         CDCCheckpoint cdcCheckpoint = (CDCCheckpoint) obj;
         return Objects.equals(cdcCheckpoint.splits, splits)
-                && Objects.equals(cdcCheckpoint.currentSnapshotIdMap, 
currentSnapshotIdMap);
+                && Objects.equals(cdcCheckpoint.tableProgressMap, 
tableProgressMap);
+    }
+
+    /** Per-table progress stored in CDC enumerator checkpoints. */
+    public static class TableProgress {
+        @Nullable private final Long nextSnapshotId;
+        @Nullable private final Long schemaId;
+
+        public TableProgress(@Nullable Long nextSnapshotId, @Nullable Long 
schemaId) {
+            this.nextSnapshotId = nextSnapshotId;
+            this.schemaId = schemaId;
+        }
+
+        @Nullable
+        public Long nextSnapshotId() {
+            return nextSnapshotId;
+        }
+
+        @Nullable
+        public Long schemaId() {
+            return schemaId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(nextSnapshotId, schemaId);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof TableProgress)) {
+                return false;
+            }
+
+            TableProgress that = (TableProgress) obj;
+            return Objects.equals(nextSnapshotId, that.nextSnapshotId)
+                    && Objects.equals(schemaId, that.schemaId);
+        }
+
+        @Override
+        public String toString() {
+            return "TableProgress{"
+                    + "nextSnapshotId="
+                    + nextSnapshotId
+                    + ", schemaId="
+                    + schemaId
+                    + '}';
+        }
     }
 
     /** {@link SimpleVersionedSerializer} for {@link CDCCheckpoint}. */
     public static class Serializer implements 
SimpleVersionedSerializer<CDCCheckpoint> {
+        private static final int VERSION_1 = 1;
+        private static final int VERSION_2 = 2;
+
         private final 
SimpleVersionedSerializer<TableAwareFileStoreSourceSplit> splitSerializer =
                 new TableAwareFileStoreSourceSplit.Serializer();
 
         @Override
         public int getVersion() {
-            return 1;
+            return VERSION_2;
         }
 
         @Override
@@ -93,14 +153,17 @@ public class CDCCheckpoint {
             view.writeInt(checkpoint.splits.size());
             for (TableAwareFileStoreSourceSplit split : checkpoint.splits) {
                 byte[] bytes = splitSerializer.serialize(split);
+                view.writeInt(splitSerializer.getVersion());
                 view.writeInt(bytes.length);
                 view.write(bytes);
             }
 
-            view.writeInt(checkpoint.currentSnapshotIdMap.size());
-            for (Map.Entry<Identifier, Long> entry : 
checkpoint.currentSnapshotIdMap.entrySet()) {
+            view.writeInt(checkpoint.tableProgressMap.size());
+            for (Map.Entry<Identifier, TableProgress> entry :
+                    checkpoint.tableProgressMap.entrySet()) {
                 view.writeUTF(JsonSerdeUtil.toJson(entry.getKey()));
-                view.writeLong(entry.getValue());
+                writeNullableLong(view, entry.getValue().nextSnapshotId());
+                writeNullableLong(view, entry.getValue().schemaId());
             }
 
             return out.toByteArray();
@@ -108,26 +171,72 @@ public class CDCCheckpoint {
 
         @Override
         public CDCCheckpoint deserialize(int version, byte[] serialized) 
throws IOException {
+            if (version == VERSION_1) {
+                return deserializeV1(serialized);
+            }
+            if (version != VERSION_2) {
+                throw new IOException("Unsupported CDC checkpoint version: " + 
version);
+            }
+
             DataInputDeserializer view = new DataInputDeserializer(serialized);
 
             int splitNumber = view.readInt();
             List<TableAwareFileStoreSourceSplit> splits = new 
ArrayList<>(splitNumber);
             for (int i = 0; i < splitNumber; i++) {
+                int splitVersion = view.readInt();
                 int byteNumber = view.readInt();
                 byte[] bytes = new byte[byteNumber];
                 view.readFully(bytes);
-                splits.add(splitSerializer.deserialize(version, bytes));
+                splits.add(splitSerializer.deserialize(splitVersion, bytes));
+            }
+
+            int tableProgressMapSize = view.readInt();
+            Map<Identifier, TableProgress> tableProgressMap = new 
HashMap<>(tableProgressMapSize);
+            for (int i = 0; i < tableProgressMapSize; i++) {
+                Identifier identifier = JsonSerdeUtil.fromJson(view.readUTF(), 
Identifier.class);
+                Long nextSnapshotId = readNullableLong(view);
+                Long schemaId = readNullableLong(view);
+                tableProgressMap.put(identifier, new 
TableProgress(nextSnapshotId, schemaId));
+            }
+
+            return new CDCCheckpoint(splits, tableProgressMap);
+        }
+
+        private CDCCheckpoint deserializeV1(byte[] serialized) throws 
IOException {
+            DataInputDeserializer view = new DataInputDeserializer(serialized);
+
+            int splitNumber = view.readInt();
+            List<TableAwareFileStoreSourceSplit> splits = new 
ArrayList<>(splitNumber);
+            for (int i = 0; i < splitNumber; i++) {
+                int byteNumber = view.readInt();
+                byte[] bytes = new byte[byteNumber];
+                view.readFully(bytes);
+                splits.add(splitSerializer.deserialize(VERSION_1, bytes));
             }
 
             int currentSnapshotIdMapSize = view.readInt();
-            Map<Identifier, Long> currentSnapshotIdMap = new 
HashMap<>(currentSnapshotIdMapSize);
+            Map<Identifier, TableProgress> tableProgressMap =
+                    new HashMap<>(currentSnapshotIdMapSize);
             for (int i = 0; i < currentSnapshotIdMapSize; i++) {
                 Identifier identifier = JsonSerdeUtil.fromJson(view.readUTF(), 
Identifier.class);
                 long currentSnapshotId = view.readLong();
-                currentSnapshotIdMap.put(identifier, currentSnapshotId);
+                tableProgressMap.put(identifier, new 
TableProgress(currentSnapshotId, null));
             }
 
-            return new CDCCheckpoint(splits, currentSnapshotIdMap);
+            return new CDCCheckpoint(splits, tableProgressMap);
+        }
+
+        private void writeNullableLong(DataOutputViewStreamWrapper view, 
@Nullable Long value)
+                throws IOException {
+            view.writeBoolean(value != null);
+            if (value != null) {
+                view.writeLong(value);
+            }
+        }
+
+        @Nullable
+        private Long readNullableLong(DataInputDeserializer view) throws 
IOException {
+            return view.readBoolean() ? view.readLong() : null;
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
index c8789bf30e..5e62aeda29 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import 
org.apache.paimon.flink.pipeline.cdc.source.enumerator.CDCCheckpoint.TableProgress;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataSplit;
@@ -57,6 +58,7 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -118,7 +120,9 @@ public class CDCSourceEnumerator
         this.splitIdGenerator = new SplitIdGenerator();
 
         if (checkpoint != null) {
-            for (Identifier identifier : 
checkpoint.getCurrentSnapshotIdMap().keySet()) {
+            for (Map.Entry<Identifier, TableProgress> entry :
+                    checkpoint.getTableProgressMap().entrySet()) {
+                Identifier identifier = entry.getKey();
                 Table table;
                 try {
                     table = catalog.getTable(identifier);
@@ -139,13 +143,14 @@ public class CDCSourceEnumerator
                 }
 
                 TableStatus tableStatus = new TableStatus(context, 
(FileStoreTable) table);
-                long currentSnapshotId = 
checkpoint.getCurrentSnapshotIdMap().get(identifier);
-                tableStatus.restore(currentSnapshotId);
+                TableProgress tableProgress = entry.getValue();
+                tableStatus.restore(tableProgress.nextSnapshotId(), 
tableProgress.schemaId());
                 tableStatusMap.put(identifier, tableStatus);
                 LOG.info(
-                        "Restoring state for table {}. Next snapshot id: {}",
+                        "Restoring state for table {}. Next snapshot id: {}, 
schema id: {}",
                         identifier,
-                        currentSnapshotId);
+                        tableProgress.nextSnapshotId(),
+                        tableProgress.schemaId());
             }
 
             addSplits(checkpoint.getSplits());
@@ -205,11 +210,13 @@ public class CDCSourceEnumerator
     public CDCCheckpoint snapshotState(long checkpointId) {
         Collection<TableAwareFileStoreSourceSplit> splits = 
splitAssigner.remainingSplits();
 
-        Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
+        Map<Identifier, TableProgress> tableProgressMap = new HashMap<>();
         for (Map.Entry<Identifier, TableStatus> entry : 
tableStatusMap.entrySet()) {
-            nextSnapshotIdMap.put(entry.getKey(), 
entry.getValue().nextSnapshotId);
+            tableProgressMap.put(
+                    entry.getKey(),
+                    new TableProgress(entry.getValue().nextSnapshotId, 
entry.getValue().schemaId));
         }
-        final CDCCheckpoint checkpoint = new CDCCheckpoint(splits, 
nextSnapshotIdMap);
+        final CDCCheckpoint checkpoint = new CDCCheckpoint(splits, 
tableProgressMap);
 
         LOG.debug("Source Checkpoint is {}", checkpoint);
         return checkpoint;
@@ -271,7 +278,8 @@ public class CDCSourceEnumerator
         FileStoreTable table = 
tableStatusMap.get(tableAwarePlan.identifier).table;
         List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
         for (Split split : plan.splits()) {
-            Long lastSchemaId = 
tableStatusMap.get(tableAwarePlan.identifier).schemaId;
+            TableStatus tableStatus = 
tableStatusMap.get(tableAwarePlan.identifier);
+            Long lastSchemaId = tableStatus.schemaId;
             TableAwareFileStoreSourceSplit tableAwareFileStoreSourceSplit =
                     toTableAwareSplit(
                             splitIdGenerator.getNextId(),
@@ -279,8 +287,20 @@ public class CDCSourceEnumerator
                             table,
                             tableAwarePlan.identifier,
                             lastSchemaId);
-            tableStatusMap.get(tableAwarePlan.identifier).schemaId =
-                    tableAwareFileStoreSourceSplit.getSchemaId();
+            Long consumedLastSchemaId =
+                    tableStatus.consumeLastSchemaId(
+                            lastSchemaId, 
tableAwareFileStoreSourceSplit.getSchemaId());
+            if (!Objects.equals(consumedLastSchemaId, lastSchemaId)) {
+                tableAwareFileStoreSourceSplit =
+                        new TableAwareFileStoreSourceSplit(
+                                tableAwareFileStoreSourceSplit.splitId(),
+                                tableAwareFileStoreSourceSplit.split(),
+                                tableAwareFileStoreSourceSplit.recordsToSkip(),
+                                tableAwareFileStoreSourceSplit.getIdentifier(),
+                                consumedLastSchemaId,
+                                tableAwareFileStoreSourceSplit.getSchemaId());
+            }
+            tableStatus.schemaId = 
tableAwareFileStoreSourceSplit.getSchemaId();
             splits.add(tableAwareFileStoreSourceSplit);
         }
 
@@ -391,7 +411,16 @@ public class CDCSourceEnumerator
 
     @VisibleForTesting
     void setScan(Identifier identifier, FileStoreTable table, 
StreamDataTableScan scan) {
-        tableStatusMap.put(identifier, new TableStatus(table, scan));
+        TableStatus previous = tableStatusMap.get(identifier);
+        TableStatus current = new TableStatus(table, scan);
+        if (previous != null) {
+            current.schemaId = previous.schemaId;
+            current.subtaskId = previous.subtaskId;
+            current.nextSnapshotId = previous.nextSnapshotId;
+            current.schemaRestored = previous.schemaRestored;
+            current.scan.restore(previous.nextSnapshotId);
+        }
+        tableStatusMap.put(identifier, current);
     }
 
     private static class TableAwarePlan {
@@ -412,6 +441,7 @@ public class CDCSourceEnumerator
         private Long schemaId;
         private Integer subtaskId;
         private Long nextSnapshotId;
+        private boolean schemaRestored;
 
         private TableStatus(SplitEnumeratorContext<?> context, FileStoreTable 
table) {
             this.table = table;
@@ -427,12 +457,24 @@ public class CDCSourceEnumerator
             this.scan = scan;
         }
 
-        private void restore(Long nextSnapshotId) {
+        private void restore(Long nextSnapshotId, @Nullable Long schemaId) {
             this.subtaskId = null;
             this.nextSnapshotId = nextSnapshotId;
+            this.schemaId = schemaId;
+            this.schemaRestored = schemaId != null;
             this.scan.restore(nextSnapshotId);
         }
 
+        private @Nullable Long consumeLastSchemaId(
+                @Nullable Long lastSchemaId, long currentSchemaId) {
+            if (schemaRestored && Objects.equals(lastSchemaId, 
currentSchemaId)) {
+                schemaRestored = false;
+                return null;
+            }
+            schemaRestored = false;
+            return lastSchemaId;
+        }
+
         @Nullable
         private SplitEnumeratorMetricGroup 
metricGroup(SplitEnumeratorContext<?> context) {
             try {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
index 84906edd73..38388b12cf 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
@@ -20,10 +20,15 @@ package 
org.apache.paimon.flink.pipeline.cdc.source.enumerator;
 
 import org.apache.paimon.catalog.Identifier;
 import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import 
org.apache.paimon.flink.pipeline.cdc.source.enumerator.CDCCheckpoint.TableProgress;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -37,8 +42,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link CDCCheckpoint.Serializer}. */
 public class CDCCheckpointSerializerTest {
     @Test
-    public void test() throws Exception {
+    public void testVersion2RoundTrip() throws Exception {
         Identifier identifier = Identifier.create("test_database", 
"test_table");
+        Identifier identifier2 = Identifier.create("test_database", 
"test_table2");
         List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
         DataSplit dataSplit =
                 DataSplit.builder()
@@ -52,10 +58,11 @@ public class CDCCheckpointSerializerTest {
                         .build();
         splits.add(new TableAwareFileStoreSourceSplit("1", dataSplit, 0, 
identifier, null, 1L));
 
-        Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
-        nextSnapshotIdMap.put(identifier, 3L);
+        Map<Identifier, TableProgress> tableProgressMap = new HashMap<>();
+        tableProgressMap.put(identifier, new TableProgress(3L, 1L));
+        tableProgressMap.put(identifier2, new TableProgress(null, null));
 
-        CDCCheckpoint checkpoint = new CDCCheckpoint(splits, 
nextSnapshotIdMap);
+        CDCCheckpoint checkpoint = new CDCCheckpoint(splits, tableProgressMap);
 
         CDCCheckpoint.Serializer serializer = new CDCCheckpoint.Serializer();
         CDCCheckpoint newCheckpoint =
@@ -63,4 +70,53 @@ public class CDCCheckpointSerializerTest {
 
         assertThat(newCheckpoint).isEqualTo(checkpoint);
     }
+
+    @Test
+    public void testDeserializeVersion1() throws Exception {
+        Identifier identifier = Identifier.create("test_database", 
"test_table");
+        List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(1)
+                        .withPartition(row(1))
+                        .withBucket(2)
+                        .withDataFiles(Arrays.asList(newFile(0), newFile(1)))
+                        .isStreaming(false)
+                        .rawConvertible(false)
+                        .withBucketPath("/temp/2") // not used
+                        .build();
+        TableAwareFileStoreSourceSplit split =
+                new TableAwareFileStoreSourceSplit("1", dataSplit, 0, 
identifier, null, 1L);
+        splits.add(split);
+
+        byte[] version1Bytes = serializeVersion1(splits, identifier, 3L);
+
+        CDCCheckpoint.Serializer serializer = new CDCCheckpoint.Serializer();
+        CDCCheckpoint checkpoint = serializer.deserialize(1, version1Bytes);
+
+        assertThat(checkpoint.getSplits()).containsExactly(split);
+        assertThat(checkpoint.getTableProgressMap())
+                .containsEntry(identifier, new TableProgress(3L, null));
+    }
+
+    private byte[] serializeVersion1(
+            List<TableAwareFileStoreSourceSplit> splits, Identifier 
identifier, long nextSnapshotId)
+            throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        SimpleVersionedSerializer<TableAwareFileStoreSourceSplit> 
splitSerializer =
+                new TableAwareFileStoreSourceSplit.Serializer();
+
+        view.writeInt(splits.size());
+        for (TableAwareFileStoreSourceSplit split : splits) {
+            byte[] bytes = splitSerializer.serialize(split);
+            view.writeInt(bytes.length);
+            view.write(bytes);
+        }
+
+        view.writeInt(1);
+        view.writeUTF(JsonSerdeUtil.toJson(identifier));
+        view.writeLong(nextSnapshotId);
+        return out.toByteArray();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
index 70d2505554..8d5f227ef3 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.pipeline.cdc.CDCOptions;
 import 
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import 
org.apache.paimon.flink.pipeline.cdc.source.enumerator.CDCCheckpoint.TableProgress;
 import org.apache.paimon.flink.source.FileSplitEnumeratorTestBase;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.Options;
@@ -400,6 +401,110 @@ public class CDCSourceEnumeratorTest
         assertThat(checkpoint.getSplits()).containsExactly(splits.get(1));
     }
 
+    @Test
+    public void testRestoreKeepsLastSchemaIdForNewSplits() throws Exception {
+        Identifier identifier = Identifier.create(DATABASE, TABLE + 0);
+        FileStoreTable fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier);
+
+        TreeMap<Long, TableScan.Plan> firstResults = new TreeMap<>();
+        MockScan firstScan = new MockScan(firstResults);
+        final TestingSplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
firstContext =
+                getSplitEnumeratorContext(1);
+        CDCSourceEnumerator firstEnumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(firstContext)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setTable(TABLE + 0)
+                        .setScan(firstScan)
+                        .setSchemaIdFromSnapshot(true)
+                        .build();
+        firstEnumerator.start();
+
+        DataSplit firstSplit = createDataSplit(1, 0, Collections.emptyList());
+        firstResults.put(1L, new 
DataFilePlan(Collections.singletonList(firstSplit)));
+        firstContext.triggerAllActions();
+        firstEnumerator.handleSplitRequest(0, "test-host");
+
+        CDCCheckpoint checkpoint = firstEnumerator.snapshotState(1L);
+        assertThat(checkpoint.getTableProgressMap())
+                .containsEntry(identifier, new TableProgress(2L, 1L));
+        assertThat(checkpoint.getSplits()).isEmpty();
+
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        MockScan scan = new MockScan(results);
+        final TestingSplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context =
+                getSplitEnumeratorContext(1);
+        CDCSourceEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setTable(TABLE + 0)
+                        .setCheckpoint(checkpoint)
+                        .setSchemaIdFromSnapshot(true)
+                        .build();
+        enumerator.setScan(identifier, fileStoreTable, scan);
+        enumerator.start();
+
+        DataSplit split = createDataSplit(2, 0, Collections.emptyList());
+        results.put(3L, new DataFilePlan(Collections.singletonList(split)));
+        context.triggerAllActions();
+
+        enumerator.handleSplitRequest(0, "test-host");
+        Map<
+                        Integer,
+                        TestingSplitEnumeratorContext.SplitAssignmentState<
+                                TableAwareFileStoreSourceSplit>>
+                assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
+        TableAwareFileStoreSourceSplit assignedSplit =
+                assignments.get(0).getAssignedSplits().get(0);
+        assertThat(assignedSplit.getLastSchemaId()).isEqualTo(1L);
+        assertThat(assignedSplit.getSchemaId()).isEqualTo(2L);
+    }
+
+    @Test
+    public void testRestoreBootstrapsSchemaWhenSchemaIdIsUnchanged() throws 
Exception {
+        Identifier identifier = Identifier.create(DATABASE, TABLE + 0);
+        FileStoreTable fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier);
+        CDCCheckpoint checkpoint =
+                new CDCCheckpoint(
+                        Collections.emptyList(),
+                        Collections.singletonMap(identifier, new 
TableProgress(2L, 1L)));
+
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        MockScan scan = new MockScan(results);
+        final TestingSplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context =
+                getSplitEnumeratorContext(1);
+        CDCSourceEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setTable(TABLE + 0)
+                        .setCheckpoint(checkpoint)
+                        .build();
+        enumerator.setScan(identifier, fileStoreTable, scan);
+        enumerator.start();
+
+        DataSplit split = createDataSplit(2, 0, Collections.emptyList());
+        results.put(2L, new DataFilePlan(Collections.singletonList(split)));
+        context.triggerAllActions();
+
+        enumerator.handleSplitRequest(0, "test-host");
+        Map<
+                        Integer,
+                        TestingSplitEnumeratorContext.SplitAssignmentState<
+                                TableAwareFileStoreSourceSplit>>
+                assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
+        TableAwareFileStoreSourceSplit assignedSplit =
+                assignments.get(0).getAssignedSplits().get(0);
+        assertThat(assignedSplit.getLastSchemaId()).isNull();
+        assertThat(assignedSplit.getSchemaId()).isEqualTo(1L);
+    }
+
     private class Builder {
         protected SplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context;
         protected Collection<TableAwareFileStoreSourceSplit> initialSplits =
@@ -410,6 +515,9 @@ public class CDCSourceEnumeratorTest
         protected boolean unawareBucket = false;
 
         protected int splitMaxPerTask = 10;
+        protected CDCCheckpoint checkpoint;
+        protected boolean schemaIdFromSnapshot = false;
+        protected String table;
 
         public Builder setSplitEnumeratorContext(
                 SplitEnumeratorContext<TableAwareFileStoreSourceSplit> 
context) {
@@ -442,17 +550,39 @@ public class CDCSourceEnumeratorTest
             return this;
         }
 
+        public Builder setCheckpoint(CDCCheckpoint checkpoint) {
+            this.checkpoint = checkpoint;
+            return this;
+        }
+
+        public Builder setSchemaIdFromSnapshot(boolean schemaIdFromSnapshot) {
+            this.schemaIdFromSnapshot = schemaIdFromSnapshot;
+            return this;
+        }
+
+        public Builder setTable(String table) {
+            this.table = table;
+            return this;
+        }
+
         public CDCSourceEnumerator build() {
             Options options = new Options();
             options.setString("warehouse", 
warehouseFolder.toAbsolutePath().toString());
             options.set(CoreOptions.SCAN_MAX_SPLITS_PER_TASK, splitMaxPerTask);
             Configuration cdcConfig = new Configuration();
             cdcConfig.set(toCDCOption(CDCOptions.DATABASE), DATABASE);
+            if (table != null) {
+                cdcConfig.set(toCDCOption(CDCOptions.TABLE), table);
+            }
 
-            Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
+            Map<Identifier, TableProgress> tableProgressMap = new HashMap<>();
             for (TableAwareFileStoreSourceSplit split : initialSplits) {
-                nextSnapshotIdMap.put(split.getIdentifier(), 1L);
+                tableProgressMap.put(split.getIdentifier(), new 
TableProgress(1L, null));
             }
+            CDCCheckpoint checkpoint =
+                    this.checkpoint == null
+                            ? new CDCCheckpoint(initialSplits, 
tableProgressMap)
+                            : this.checkpoint;
 
             TestCDCSourceEnumerator enumerator;
             try {
@@ -463,17 +593,25 @@ public class CDCSourceEnumeratorTest
                                 discoveryInterval,
                                 CatalogContext.create(options),
                                 cdcConfig,
-                                new CDCCheckpoint(initialSplits, 
nextSnapshotIdMap));
+                                checkpoint,
+                                schemaIdFromSnapshot);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
             if (scan != null) {
                 try {
-                    for (String tableName : catalog.listTables(DATABASE)) {
-                        FileStoreTable table =
-                                (FileStoreTable)
-                                        
catalog.getTable(Identifier.create(DATABASE, tableName));
-                        enumerator.setScan(Identifier.create(DATABASE, 
tableName), table, scan);
+                    if (table != null) {
+                        Identifier identifier = Identifier.create(DATABASE, 
table);
+                        FileStoreTable fileStoreTable =
+                                (FileStoreTable) catalog.getTable(identifier);
+                        enumerator.setScan(identifier, fileStoreTable, scan);
+                    } else {
+                        for (String tableName : catalog.listTables(DATABASE)) {
+                            Identifier identifier = 
Identifier.create(DATABASE, tableName);
+                            FileStoreTable fileStoreTable =
+                                    (FileStoreTable) 
catalog.getTable(identifier);
+                            enumerator.setScan(identifier, fileStoreTable, 
scan);
+                        }
                     }
                 } catch (Catalog.DatabaseNotExistException | 
Catalog.TableNotExistException e) {
                     throw new RuntimeException(e);
@@ -511,10 +649,14 @@ public class CDCSourceEnumeratorTest
                 long discoveryInterval,
                 CatalogContext catalogContext,
                 Configuration cdcConfig,
-                @Nullable CDCCheckpoint checkpoint) {
+                @Nullable CDCCheckpoint checkpoint,
+                boolean schemaIdFromSnapshot) {
             super(context, flinkConfig, discoveryInterval, catalogContext, 
cdcConfig, checkpoint);
+            this.schemaIdFromSnapshot = schemaIdFromSnapshot;
         }
 
+        private final boolean schemaIdFromSnapshot;
+
         @Override
         protected TableAwareFileStoreSourceSplit toTableAwareSplit(
                 String splitId,
@@ -523,8 +665,9 @@ public class CDCSourceEnumeratorTest
                 Identifier identifier,
                 @Nullable Long lastSchemaId) {
             Preconditions.checkState(split instanceof DataSplit);
+            long schemaId = schemaIdFromSnapshot ? ((DataSplit) 
split).snapshotId() : 1L;
             return new TableAwareFileStoreSourceSplit(
-                    splitId, split, 0, identifier, lastSchemaId, 1L);
+                    splitId, split, 0, identifier, lastSchemaId, schemaId);
         }
     }
 }


Reply via email to