This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 773f942441 [flink][cdc] Preserve schema id in source checkpoint (#8163)
773f942441 is described below
commit 773f942441aa6b4a51975bb9190ddda84c83a448
Author: QuakeWang <[email protected]>
AuthorDate: Tue Jun 9 13:58:45 2026 +0800
[flink][cdc] Preserve schema id in source checkpoint (#8163)
CDC source enumerator checkpoint currently stores pending splits and
per-table next snapshot id, but it does not persist the last observed
schema id for each table.
After restore, the next generated split may use `lastSchemaId = null`
even though the table had already emitted splits with an earlier schema.
In that case schema diff generation treats the current schema as a
create-table event instead of the expected schema evolution event, so
downstream schema changes can be lost for existing target tables.
This PR adds explicit per-table checkpoint progress for CDC source
enumerator:
- stores both `nextSnapshotId` and `schemaId`
- restores `TableStatus.schemaId` together with the stream scan position
- upgrades `CDCCheckpoint.Serializer` to v2
- keeps v1 checkpoint compatibility with missing `schemaId` restored as
`null`
- writes the nested split serializer version in the new checkpoint
format
---
.../cdc/source/enumerator/CDCCheckpoint.java | 135 +++++++++++++++--
.../cdc/source/enumerator/CDCSourceEnumerator.java | 68 +++++++--
.../enumerator/CDCCheckpointSerializerTest.java | 64 +++++++-
.../source/enumerator/CDCSourceEnumeratorTest.java | 163 +++++++++++++++++++--
4 files changed, 390 insertions(+), 40 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
index b0e41fd922..1d3f959f27 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpoint.java
@@ -26,6 +26,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import javax.annotation.Nullable;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -42,13 +44,13 @@ import java.util.Objects;
public class CDCCheckpoint {
private final Collection<TableAwareFileStoreSourceSplit> splits;
- private final Map<Identifier, Long> currentSnapshotIdMap;
+ private final Map<Identifier, TableProgress> tableProgressMap;
public CDCCheckpoint(
Collection<TableAwareFileStoreSourceSplit> splits,
- Map<Identifier, Long> currentSnapshotIdMap) {
+ Map<Identifier, TableProgress> tableProgressMap) {
this.splits = splits;
- this.currentSnapshotIdMap = currentSnapshotIdMap;
+ this.tableProgressMap = tableProgressMap;
}
public Collection<TableAwareFileStoreSourceSplit> getSplits() {
@@ -56,12 +58,20 @@ public class CDCCheckpoint {
}
public Map<Identifier, Long> getCurrentSnapshotIdMap() {
+ Map<Identifier, Long> currentSnapshotIdMap = new HashMap<>();
+ for (Map.Entry<Identifier, TableProgress> entry :
tableProgressMap.entrySet()) {
+ currentSnapshotIdMap.put(entry.getKey(),
entry.getValue().nextSnapshotId());
+ }
return currentSnapshotIdMap;
}
+ public Map<Identifier, TableProgress> getTableProgressMap() {
+ return tableProgressMap;
+ }
+
@Override
public int hashCode() {
- return Objects.hash(splits, currentSnapshotIdMap);
+ return Objects.hash(splits, tableProgressMap);
}
@Override
@@ -72,17 +82,67 @@ public class CDCCheckpoint {
CDCCheckpoint cdcCheckpoint = (CDCCheckpoint) obj;
return Objects.equals(cdcCheckpoint.splits, splits)
- && Objects.equals(cdcCheckpoint.currentSnapshotIdMap,
currentSnapshotIdMap);
+ && Objects.equals(cdcCheckpoint.tableProgressMap,
tableProgressMap);
+ }
+
+ /** Per-table progress stored in CDC enumerator checkpoints. */
+ public static class TableProgress {
+ @Nullable private final Long nextSnapshotId;
+ @Nullable private final Long schemaId;
+
+ public TableProgress(@Nullable Long nextSnapshotId, @Nullable Long
schemaId) {
+ this.nextSnapshotId = nextSnapshotId;
+ this.schemaId = schemaId;
+ }
+
+ @Nullable
+ public Long nextSnapshotId() {
+ return nextSnapshotId;
+ }
+
+ @Nullable
+ public Long schemaId() {
+ return schemaId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nextSnapshotId, schemaId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof TableProgress)) {
+ return false;
+ }
+
+ TableProgress that = (TableProgress) obj;
+ return Objects.equals(nextSnapshotId, that.nextSnapshotId)
+ && Objects.equals(schemaId, that.schemaId);
+ }
+
+ @Override
+ public String toString() {
+ return "TableProgress{"
+ + "nextSnapshotId="
+ + nextSnapshotId
+ + ", schemaId="
+ + schemaId
+ + '}';
+ }
}
/** {@link SimpleVersionedSerializer} for {@link CDCCheckpoint}. */
public static class Serializer implements
SimpleVersionedSerializer<CDCCheckpoint> {
+ private static final int VERSION_1 = 1;
+ private static final int VERSION_2 = 2;
+
private final
SimpleVersionedSerializer<TableAwareFileStoreSourceSplit> splitSerializer =
new TableAwareFileStoreSourceSplit.Serializer();
@Override
public int getVersion() {
- return 1;
+ return VERSION_2;
}
@Override
@@ -93,14 +153,17 @@ public class CDCCheckpoint {
view.writeInt(checkpoint.splits.size());
for (TableAwareFileStoreSourceSplit split : checkpoint.splits) {
byte[] bytes = splitSerializer.serialize(split);
+ view.writeInt(splitSerializer.getVersion());
view.writeInt(bytes.length);
view.write(bytes);
}
- view.writeInt(checkpoint.currentSnapshotIdMap.size());
- for (Map.Entry<Identifier, Long> entry :
checkpoint.currentSnapshotIdMap.entrySet()) {
+ view.writeInt(checkpoint.tableProgressMap.size());
+ for (Map.Entry<Identifier, TableProgress> entry :
+ checkpoint.tableProgressMap.entrySet()) {
view.writeUTF(JsonSerdeUtil.toJson(entry.getKey()));
- view.writeLong(entry.getValue());
+ writeNullableLong(view, entry.getValue().nextSnapshotId());
+ writeNullableLong(view, entry.getValue().schemaId());
}
return out.toByteArray();
@@ -108,26 +171,72 @@ public class CDCCheckpoint {
@Override
public CDCCheckpoint deserialize(int version, byte[] serialized)
throws IOException {
+ if (version == VERSION_1) {
+ return deserializeV1(serialized);
+ }
+ if (version != VERSION_2) {
+ throw new IOException("Unsupported CDC checkpoint version: " +
version);
+ }
+
DataInputDeserializer view = new DataInputDeserializer(serialized);
int splitNumber = view.readInt();
List<TableAwareFileStoreSourceSplit> splits = new
ArrayList<>(splitNumber);
for (int i = 0; i < splitNumber; i++) {
+ int splitVersion = view.readInt();
int byteNumber = view.readInt();
byte[] bytes = new byte[byteNumber];
view.readFully(bytes);
- splits.add(splitSerializer.deserialize(version, bytes));
+ splits.add(splitSerializer.deserialize(splitVersion, bytes));
+ }
+
+ int tableProgressMapSize = view.readInt();
+ Map<Identifier, TableProgress> tableProgressMap = new
HashMap<>(tableProgressMapSize);
+ for (int i = 0; i < tableProgressMapSize; i++) {
+ Identifier identifier = JsonSerdeUtil.fromJson(view.readUTF(),
Identifier.class);
+ Long nextSnapshotId = readNullableLong(view);
+ Long schemaId = readNullableLong(view);
+ tableProgressMap.put(identifier, new
TableProgress(nextSnapshotId, schemaId));
+ }
+
+ return new CDCCheckpoint(splits, tableProgressMap);
+ }
+
+ private CDCCheckpoint deserializeV1(byte[] serialized) throws
IOException {
+ DataInputDeserializer view = new DataInputDeserializer(serialized);
+
+ int splitNumber = view.readInt();
+ List<TableAwareFileStoreSourceSplit> splits = new
ArrayList<>(splitNumber);
+ for (int i = 0; i < splitNumber; i++) {
+ int byteNumber = view.readInt();
+ byte[] bytes = new byte[byteNumber];
+ view.readFully(bytes);
+ splits.add(splitSerializer.deserialize(VERSION_1, bytes));
}
int currentSnapshotIdMapSize = view.readInt();
- Map<Identifier, Long> currentSnapshotIdMap = new
HashMap<>(currentSnapshotIdMapSize);
+ Map<Identifier, TableProgress> tableProgressMap =
+ new HashMap<>(currentSnapshotIdMapSize);
for (int i = 0; i < currentSnapshotIdMapSize; i++) {
Identifier identifier = JsonSerdeUtil.fromJson(view.readUTF(),
Identifier.class);
long currentSnapshotId = view.readLong();
- currentSnapshotIdMap.put(identifier, currentSnapshotId);
+ tableProgressMap.put(identifier, new
TableProgress(currentSnapshotId, null));
}
- return new CDCCheckpoint(splits, currentSnapshotIdMap);
+ return new CDCCheckpoint(splits, tableProgressMap);
+ }
+
+ private void writeNullableLong(DataOutputViewStreamWrapper view,
@Nullable Long value)
+ throws IOException {
+ view.writeBoolean(value != null);
+ if (value != null) {
+ view.writeLong(value);
+ }
+ }
+
+ @Nullable
+ private Long readNullableLong(DataInputDeserializer view) throws
IOException {
+ return view.readBoolean() ? view.readLong() : null;
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
index c8789bf30e..5e62aeda29 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import
org.apache.paimon.flink.pipeline.cdc.source.enumerator.CDCCheckpoint.TableProgress;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataSplit;
@@ -57,6 +58,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -118,7 +120,9 @@ public class CDCSourceEnumerator
this.splitIdGenerator = new SplitIdGenerator();
if (checkpoint != null) {
- for (Identifier identifier :
checkpoint.getCurrentSnapshotIdMap().keySet()) {
+ for (Map.Entry<Identifier, TableProgress> entry :
+ checkpoint.getTableProgressMap().entrySet()) {
+ Identifier identifier = entry.getKey();
Table table;
try {
table = catalog.getTable(identifier);
@@ -139,13 +143,14 @@ public class CDCSourceEnumerator
}
TableStatus tableStatus = new TableStatus(context,
(FileStoreTable) table);
- long currentSnapshotId =
checkpoint.getCurrentSnapshotIdMap().get(identifier);
- tableStatus.restore(currentSnapshotId);
+ TableProgress tableProgress = entry.getValue();
+ tableStatus.restore(tableProgress.nextSnapshotId(),
tableProgress.schemaId());
tableStatusMap.put(identifier, tableStatus);
LOG.info(
- "Restoring state for table {}. Next snapshot id: {}",
+ "Restoring state for table {}. Next snapshot id: {},
schema id: {}",
identifier,
- currentSnapshotId);
+ tableProgress.nextSnapshotId(),
+ tableProgress.schemaId());
}
addSplits(checkpoint.getSplits());
@@ -205,11 +210,13 @@ public class CDCSourceEnumerator
public CDCCheckpoint snapshotState(long checkpointId) {
Collection<TableAwareFileStoreSourceSplit> splits =
splitAssigner.remainingSplits();
- Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
+ Map<Identifier, TableProgress> tableProgressMap = new HashMap<>();
for (Map.Entry<Identifier, TableStatus> entry :
tableStatusMap.entrySet()) {
- nextSnapshotIdMap.put(entry.getKey(),
entry.getValue().nextSnapshotId);
+ tableProgressMap.put(
+ entry.getKey(),
+ new TableProgress(entry.getValue().nextSnapshotId,
entry.getValue().schemaId));
}
- final CDCCheckpoint checkpoint = new CDCCheckpoint(splits,
nextSnapshotIdMap);
+ final CDCCheckpoint checkpoint = new CDCCheckpoint(splits,
tableProgressMap);
LOG.debug("Source Checkpoint is {}", checkpoint);
return checkpoint;
@@ -271,7 +278,8 @@ public class CDCSourceEnumerator
FileStoreTable table =
tableStatusMap.get(tableAwarePlan.identifier).table;
List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
for (Split split : plan.splits()) {
- Long lastSchemaId =
tableStatusMap.get(tableAwarePlan.identifier).schemaId;
+ TableStatus tableStatus =
tableStatusMap.get(tableAwarePlan.identifier);
+ Long lastSchemaId = tableStatus.schemaId;
TableAwareFileStoreSourceSplit tableAwareFileStoreSourceSplit =
toTableAwareSplit(
splitIdGenerator.getNextId(),
@@ -279,8 +287,20 @@ public class CDCSourceEnumerator
table,
tableAwarePlan.identifier,
lastSchemaId);
- tableStatusMap.get(tableAwarePlan.identifier).schemaId =
- tableAwareFileStoreSourceSplit.getSchemaId();
+ Long consumedLastSchemaId =
+ tableStatus.consumeLastSchemaId(
+ lastSchemaId,
tableAwareFileStoreSourceSplit.getSchemaId());
+ if (!Objects.equals(consumedLastSchemaId, lastSchemaId)) {
+ tableAwareFileStoreSourceSplit =
+ new TableAwareFileStoreSourceSplit(
+ tableAwareFileStoreSourceSplit.splitId(),
+ tableAwareFileStoreSourceSplit.split(),
+ tableAwareFileStoreSourceSplit.recordsToSkip(),
+ tableAwareFileStoreSourceSplit.getIdentifier(),
+ consumedLastSchemaId,
+ tableAwareFileStoreSourceSplit.getSchemaId());
+ }
+ tableStatus.schemaId =
tableAwareFileStoreSourceSplit.getSchemaId();
splits.add(tableAwareFileStoreSourceSplit);
}
@@ -391,7 +411,16 @@ public class CDCSourceEnumerator
@VisibleForTesting
void setScan(Identifier identifier, FileStoreTable table,
StreamDataTableScan scan) {
- tableStatusMap.put(identifier, new TableStatus(table, scan));
+ TableStatus previous = tableStatusMap.get(identifier);
+ TableStatus current = new TableStatus(table, scan);
+ if (previous != null) {
+ current.schemaId = previous.schemaId;
+ current.subtaskId = previous.subtaskId;
+ current.nextSnapshotId = previous.nextSnapshotId;
+ current.schemaRestored = previous.schemaRestored;
+ current.scan.restore(previous.nextSnapshotId);
+ }
+ tableStatusMap.put(identifier, current);
}
private static class TableAwarePlan {
@@ -412,6 +441,7 @@ public class CDCSourceEnumerator
private Long schemaId;
private Integer subtaskId;
private Long nextSnapshotId;
+ private boolean schemaRestored;
private TableStatus(SplitEnumeratorContext<?> context, FileStoreTable
table) {
this.table = table;
@@ -427,12 +457,24 @@ public class CDCSourceEnumerator
this.scan = scan;
}
- private void restore(Long nextSnapshotId) {
+ private void restore(Long nextSnapshotId, @Nullable Long schemaId) {
this.subtaskId = null;
this.nextSnapshotId = nextSnapshotId;
+ this.schemaId = schemaId;
+ this.schemaRestored = schemaId != null;
this.scan.restore(nextSnapshotId);
}
+ private @Nullable Long consumeLastSchemaId(
+ @Nullable Long lastSchemaId, long currentSchemaId) {
+ if (schemaRestored && Objects.equals(lastSchemaId,
currentSchemaId)) {
+ schemaRestored = false;
+ return null;
+ }
+ schemaRestored = false;
+ return lastSchemaId;
+ }
+
@Nullable
private SplitEnumeratorMetricGroup
metricGroup(SplitEnumeratorContext<?> context) {
try {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
index 84906edd73..38388b12cf 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCCheckpointSerializerTest.java
@@ -20,10 +20,15 @@ package
org.apache.paimon.flink.pipeline.cdc.source.enumerator;
import org.apache.paimon.catalog.Identifier;
import
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import
org.apache.paimon.flink.pipeline.cdc.source.enumerator.CDCCheckpoint.TableProgress;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -37,8 +42,9 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link CDCCheckpoint.Serializer}. */
public class CDCCheckpointSerializerTest {
@Test
- public void test() throws Exception {
+ public void testVersion2RoundTrip() throws Exception {
Identifier identifier = Identifier.create("test_database",
"test_table");
+ Identifier identifier2 = Identifier.create("test_database",
"test_table2");
List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
DataSplit dataSplit =
DataSplit.builder()
@@ -52,10 +58,11 @@ public class CDCCheckpointSerializerTest {
.build();
splits.add(new TableAwareFileStoreSourceSplit("1", dataSplit, 0,
identifier, null, 1L));
- Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
- nextSnapshotIdMap.put(identifier, 3L);
+ Map<Identifier, TableProgress> tableProgressMap = new HashMap<>();
+ tableProgressMap.put(identifier, new TableProgress(3L, 1L));
+ tableProgressMap.put(identifier2, new TableProgress(null, null));
- CDCCheckpoint checkpoint = new CDCCheckpoint(splits,
nextSnapshotIdMap);
+ CDCCheckpoint checkpoint = new CDCCheckpoint(splits, tableProgressMap);
CDCCheckpoint.Serializer serializer = new CDCCheckpoint.Serializer();
CDCCheckpoint newCheckpoint =
@@ -63,4 +70,53 @@ public class CDCCheckpointSerializerTest {
assertThat(newCheckpoint).isEqualTo(checkpoint);
}
+
+ @Test
+ public void testDeserializeVersion1() throws Exception {
+ Identifier identifier = Identifier.create("test_database",
"test_table");
+ List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(1)
+ .withPartition(row(1))
+ .withBucket(2)
+ .withDataFiles(Arrays.asList(newFile(0), newFile(1)))
+ .isStreaming(false)
+ .rawConvertible(false)
+ .withBucketPath("/temp/2") // not used
+ .build();
+ TableAwareFileStoreSourceSplit split =
+ new TableAwareFileStoreSourceSplit("1", dataSplit, 0,
identifier, null, 1L);
+ splits.add(split);
+
+ byte[] version1Bytes = serializeVersion1(splits, identifier, 3L);
+
+ CDCCheckpoint.Serializer serializer = new CDCCheckpoint.Serializer();
+ CDCCheckpoint checkpoint = serializer.deserialize(1, version1Bytes);
+
+ assertThat(checkpoint.getSplits()).containsExactly(split);
+ assertThat(checkpoint.getTableProgressMap())
+ .containsEntry(identifier, new TableProgress(3L, null));
+ }
+
+ private byte[] serializeVersion1(
+ List<TableAwareFileStoreSourceSplit> splits, Identifier
identifier, long nextSnapshotId)
+ throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ SimpleVersionedSerializer<TableAwareFileStoreSourceSplit>
splitSerializer =
+ new TableAwareFileStoreSourceSplit.Serializer();
+
+ view.writeInt(splits.size());
+ for (TableAwareFileStoreSourceSplit split : splits) {
+ byte[] bytes = splitSerializer.serialize(split);
+ view.writeInt(bytes.length);
+ view.write(bytes);
+ }
+
+ view.writeInt(1);
+ view.writeUTF(JsonSerdeUtil.toJson(identifier));
+ view.writeLong(nextSnapshotId);
+ return out.toByteArray();
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
index 70d2505554..8d5f227ef3 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumeratorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.pipeline.cdc.CDCOptions;
import
org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
+import
org.apache.paimon.flink.pipeline.cdc.source.enumerator.CDCCheckpoint.TableProgress;
import org.apache.paimon.flink.source.FileSplitEnumeratorTestBase;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
@@ -400,6 +401,110 @@ public class CDCSourceEnumeratorTest
assertThat(checkpoint.getSplits()).containsExactly(splits.get(1));
}
+ @Test
+ public void testRestoreKeepsLastSchemaIdForNewSplits() throws Exception {
+ Identifier identifier = Identifier.create(DATABASE, TABLE + 0);
+ FileStoreTable fileStoreTable = (FileStoreTable)
catalog.getTable(identifier);
+
+ TreeMap<Long, TableScan.Plan> firstResults = new TreeMap<>();
+ MockScan firstScan = new MockScan(firstResults);
+ final TestingSplitEnumeratorContext<TableAwareFileStoreSourceSplit>
firstContext =
+ getSplitEnumeratorContext(1);
+ CDCSourceEnumerator firstEnumerator =
+ new Builder()
+ .setSplitEnumeratorContext(firstContext)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setTable(TABLE + 0)
+ .setScan(firstScan)
+ .setSchemaIdFromSnapshot(true)
+ .build();
+ firstEnumerator.start();
+
+ DataSplit firstSplit = createDataSplit(1, 0, Collections.emptyList());
+ firstResults.put(1L, new
DataFilePlan(Collections.singletonList(firstSplit)));
+ firstContext.triggerAllActions();
+ firstEnumerator.handleSplitRequest(0, "test-host");
+
+ CDCCheckpoint checkpoint = firstEnumerator.snapshotState(1L);
+ assertThat(checkpoint.getTableProgressMap())
+ .containsEntry(identifier, new TableProgress(2L, 1L));
+ assertThat(checkpoint.getSplits()).isEmpty();
+
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ MockScan scan = new MockScan(results);
+ final TestingSplitEnumeratorContext<TableAwareFileStoreSourceSplit>
context =
+ getSplitEnumeratorContext(1);
+ CDCSourceEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setTable(TABLE + 0)
+ .setCheckpoint(checkpoint)
+ .setSchemaIdFromSnapshot(true)
+ .build();
+ enumerator.setScan(identifier, fileStoreTable, scan);
+ enumerator.start();
+
+ DataSplit split = createDataSplit(2, 0, Collections.emptyList());
+ results.put(3L, new DataFilePlan(Collections.singletonList(split)));
+ context.triggerAllActions();
+
+ enumerator.handleSplitRequest(0, "test-host");
+ Map<
+ Integer,
+ TestingSplitEnumeratorContext.SplitAssignmentState<
+ TableAwareFileStoreSourceSplit>>
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0);
+ TableAwareFileStoreSourceSplit assignedSplit =
+ assignments.get(0).getAssignedSplits().get(0);
+ assertThat(assignedSplit.getLastSchemaId()).isEqualTo(1L);
+ assertThat(assignedSplit.getSchemaId()).isEqualTo(2L);
+ }
+
+ @Test
+ public void testRestoreBootstrapsSchemaWhenSchemaIdIsUnchanged() throws
Exception {
+ Identifier identifier = Identifier.create(DATABASE, TABLE + 0);
+ FileStoreTable fileStoreTable = (FileStoreTable)
catalog.getTable(identifier);
+ CDCCheckpoint checkpoint =
+ new CDCCheckpoint(
+ Collections.emptyList(),
+ Collections.singletonMap(identifier, new
TableProgress(2L, 1L)));
+
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ MockScan scan = new MockScan(results);
+ final TestingSplitEnumeratorContext<TableAwareFileStoreSourceSplit>
context =
+ getSplitEnumeratorContext(1);
+ CDCSourceEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setTable(TABLE + 0)
+ .setCheckpoint(checkpoint)
+ .build();
+ enumerator.setScan(identifier, fileStoreTable, scan);
+ enumerator.start();
+
+ DataSplit split = createDataSplit(2, 0, Collections.emptyList());
+ results.put(2L, new DataFilePlan(Collections.singletonList(split)));
+ context.triggerAllActions();
+
+ enumerator.handleSplitRequest(0, "test-host");
+ Map<
+ Integer,
+ TestingSplitEnumeratorContext.SplitAssignmentState<
+ TableAwareFileStoreSourceSplit>>
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0);
+ TableAwareFileStoreSourceSplit assignedSplit =
+ assignments.get(0).getAssignedSplits().get(0);
+ assertThat(assignedSplit.getLastSchemaId()).isNull();
+ assertThat(assignedSplit.getSchemaId()).isEqualTo(1L);
+ }
+
private class Builder {
protected SplitEnumeratorContext<TableAwareFileStoreSourceSplit>
context;
protected Collection<TableAwareFileStoreSourceSplit> initialSplits =
@@ -410,6 +515,9 @@ public class CDCSourceEnumeratorTest
protected boolean unawareBucket = false;
protected int splitMaxPerTask = 10;
+ protected CDCCheckpoint checkpoint;
+ protected boolean schemaIdFromSnapshot = false;
+ protected String table;
public Builder setSplitEnumeratorContext(
SplitEnumeratorContext<TableAwareFileStoreSourceSplit>
context) {
@@ -442,17 +550,39 @@ public class CDCSourceEnumeratorTest
return this;
}
+ public Builder setCheckpoint(CDCCheckpoint checkpoint) {
+ this.checkpoint = checkpoint;
+ return this;
+ }
+
+ public Builder setSchemaIdFromSnapshot(boolean schemaIdFromSnapshot) {
+ this.schemaIdFromSnapshot = schemaIdFromSnapshot;
+ return this;
+ }
+
+ public Builder setTable(String table) {
+ this.table = table;
+ return this;
+ }
+
public CDCSourceEnumerator build() {
Options options = new Options();
options.setString("warehouse",
warehouseFolder.toAbsolutePath().toString());
options.set(CoreOptions.SCAN_MAX_SPLITS_PER_TASK, splitMaxPerTask);
Configuration cdcConfig = new Configuration();
cdcConfig.set(toCDCOption(CDCOptions.DATABASE), DATABASE);
+ if (table != null) {
+ cdcConfig.set(toCDCOption(CDCOptions.TABLE), table);
+ }
- Map<Identifier, Long> nextSnapshotIdMap = new HashMap<>();
+ Map<Identifier, TableProgress> tableProgressMap = new HashMap<>();
for (TableAwareFileStoreSourceSplit split : initialSplits) {
- nextSnapshotIdMap.put(split.getIdentifier(), 1L);
+ tableProgressMap.put(split.getIdentifier(), new
TableProgress(1L, null));
}
+ CDCCheckpoint checkpoint =
+ this.checkpoint == null
+ ? new CDCCheckpoint(initialSplits,
tableProgressMap)
+ : this.checkpoint;
TestCDCSourceEnumerator enumerator;
try {
@@ -463,17 +593,25 @@ public class CDCSourceEnumeratorTest
discoveryInterval,
CatalogContext.create(options),
cdcConfig,
- new CDCCheckpoint(initialSplits,
nextSnapshotIdMap));
+ checkpoint,
+ schemaIdFromSnapshot);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (scan != null) {
try {
- for (String tableName : catalog.listTables(DATABASE)) {
- FileStoreTable table =
- (FileStoreTable)
-
catalog.getTable(Identifier.create(DATABASE, tableName));
- enumerator.setScan(Identifier.create(DATABASE,
tableName), table, scan);
+ if (table != null) {
+ Identifier identifier = Identifier.create(DATABASE,
table);
+ FileStoreTable fileStoreTable =
+ (FileStoreTable) catalog.getTable(identifier);
+ enumerator.setScan(identifier, fileStoreTable, scan);
+ } else {
+ for (String tableName : catalog.listTables(DATABASE)) {
+ Identifier identifier =
Identifier.create(DATABASE, tableName);
+ FileStoreTable fileStoreTable =
+ (FileStoreTable)
catalog.getTable(identifier);
+ enumerator.setScan(identifier, fileStoreTable,
scan);
+ }
}
} catch (Catalog.DatabaseNotExistException |
Catalog.TableNotExistException e) {
throw new RuntimeException(e);
@@ -511,10 +649,14 @@ public class CDCSourceEnumeratorTest
long discoveryInterval,
CatalogContext catalogContext,
Configuration cdcConfig,
- @Nullable CDCCheckpoint checkpoint) {
+ @Nullable CDCCheckpoint checkpoint,
+ boolean schemaIdFromSnapshot) {
super(context, flinkConfig, discoveryInterval, catalogContext,
cdcConfig, checkpoint);
+ this.schemaIdFromSnapshot = schemaIdFromSnapshot;
}
+ private final boolean schemaIdFromSnapshot;
+
@Override
protected TableAwareFileStoreSourceSplit toTableAwareSplit(
String splitId,
@@ -523,8 +665,9 @@ public class CDCSourceEnumeratorTest
Identifier identifier,
@Nullable Long lastSchemaId) {
Preconditions.checkState(split instanceof DataSplit);
+ long schemaId = schemaIdFromSnapshot ? ((DataSplit)
split).snapshotId() : 1L;
return new TableAwareFileStoreSourceSplit(
- splitId, split, 0, identifier, lastSchemaId, 1L);
+ splitId, split, 0, identifier, lastSchemaId, schemaId);
}
}
}