This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6e14824885 [flink][cdc] Fix schema event position in source reader
(#8099)
6e14824885 is described below
commit 6e14824885462f0fdb9db33361e67aae51d30fa6
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 3 21:45:56 2026 +0800
[flink][cdc] Fix schema event position in source reader (#8099)
---
.../cdc/source/reader/CDCSourceSplitReader.java | 60 ++++----
.../source/reader/CDCSourceSplitReaderTest.java | 166 ++++++++++++++++++++-
2 files changed, 191 insertions(+), 35 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
index 48c0ef82f2..9aeb48567d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
@@ -76,8 +76,9 @@ public class CDCSourceSplitReader
private TableReaderInfo currentTableReaderInfo;
@Nullable private LazyRecordReader currentReader;
@Nullable private String currentSplitId;
- private long currentNumRead;
+ private long currentDataRowsRead;
private RecordIterator<InternalRow> currentFirstBatch;
+ private final Queue<SchemaChangeEvent> currentSchemaChangeEvents = new
LinkedList<>();
private boolean paused;
private final AtomicBoolean wakeup;
@@ -183,6 +184,7 @@ public class CDCSourceSplitReader
@Override
public void close() throws Exception {
+ currentSchemaChangeEvents.clear();
if (currentReader != null) {
if (currentReader.lazyRecordReader != null) {
currentReader.lazyRecordReader.close();
@@ -214,13 +216,17 @@ public class CDCSourceSplitReader
List<SchemaChangeEvent> schemaChangeEvents =
tableManager.generateSchemaChangeEventList(
identifier, nextSplit.getLastSchemaId(),
nextSplit.getSchemaId());
- currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema,
schemaChangeEvents);
+ currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema);
currentSplitId = nextSplit.splitId();
currentReader = createLazyRecordReader(nextSplit.split());
- currentNumRead = nextSplit.recordsToSkip();
+ currentDataRowsRead = nextSplit.recordsToSkip();
+ currentSchemaChangeEvents.clear();
+ if (currentDataRowsRead == 0) {
+ currentSchemaChangeEvents.addAll(schemaChangeEvents);
+ }
- if (currentNumRead > 0) {
- seek(currentNumRead);
+ if (currentDataRowsRead > 0) {
+ seek(currentDataRowsRead);
}
}
@@ -257,6 +263,7 @@ public class CDCSourceSplitReader
currentReader = null;
}
+ currentSchemaChangeEvents.clear();
final CDCRecordsWithSplitIds finishRecords =
CDCRecordsWithSplitIds.finishedSplit(currentSplitId);
currentSplitId = null;
@@ -271,33 +278,24 @@ public class CDCSourceSplitReader
new MutableRecordAndPosition<>();
private TableReaderInfo tableReaderInfo;
- private final Queue<SchemaChangeEvent> schemaChangeEventList = new
LinkedList<>();
public FileStoreRecordIterator replace(
RecordIterator<InternalRow> iterator, TableReaderInfo
tableReaderInfo) {
this.iterator = iterator;
- this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET,
currentNumRead);
+ this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET,
currentDataRowsRead);
this.tableReaderInfo = tableReaderInfo;
-
this.schemaChangeEventList.addAll(tableReaderInfo.schemaChangeEvents);
return this;
}
@Nullable
@Override
public RecordAndPosition<Event> next() {
- Event event = nextEvent();
- if (event == null) {
- return null;
- }
-
- recordAndPosition.setNext(event);
- currentNumRead++;
- return recordAndPosition;
- }
-
- private Event nextEvent() {
- if (!schemaChangeEventList.isEmpty()) {
- return schemaChangeEventList.poll();
+ if (!currentSchemaChangeEvents.isEmpty()) {
+ recordAndPosition.set(
+ currentSchemaChangeEvents.poll(),
+ RecordAndPosition.NO_OFFSET,
+ currentDataRowsRead);
+ return recordAndPosition;
}
InternalRow row;
@@ -310,11 +308,14 @@ public class CDCSourceSplitReader
return null;
}
- return convertRowToDataChangeEvent(
- tableReaderInfo.tableId,
- row,
- tableReaderInfo.fieldGetters,
- tableReaderInfo.generator);
+ recordAndPosition.setNext(
+ convertRowToDataChangeEvent(
+ tableReaderInfo.tableId,
+ row,
+ tableReaderInfo.fieldGetters,
+ tableReaderInfo.generator));
+ currentDataRowsRead++;
+ return recordAndPosition;
}
@Override
@@ -358,19 +359,14 @@ public class CDCSourceSplitReader
private final Identifier identifier;
private final TableId tableId;
private final TableSchema currentSchema;
- private final List<SchemaChangeEvent> schemaChangeEvents;
private final BinaryRecordDataGenerator generator;
private final List<InternalRow.FieldGetter> fieldGetters;
- private TableReaderInfo(
- Identifier identifier,
- TableSchema currentSchema,
- List<SchemaChangeEvent> schemaChangeEvents) {
+ private TableReaderInfo(Identifier identifier, TableSchema
currentSchema) {
this.identifier = identifier;
this.tableId = TableId.tableId(identifier.getDatabaseName(),
identifier.getTableName());
this.currentSchema = currentSchema;
- this.schemaChangeEvents = schemaChangeEvents;
org.apache.flink.cdc.common.schema.Schema currentCDCSchema =
convertPaimonSchemaToFlinkCDCSchema(currentSchema);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
index e12ff8f19e..82199811b5 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
@@ -42,9 +42,13 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.RecordWriter;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
@@ -62,6 +66,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -144,8 +149,15 @@ public class CDCSourceSplitReaderTest {
}
private CDCSourceSplitReader createReader(TableRead tableRead) {
+ return createReader(tableRead, Collections.emptyList());
+ }
+
+ private CDCSourceSplitReader createReader(
+ TableRead tableRead, List<SchemaChangeEvent> schemaChangeEvents) {
return new TestCDCSourceSplitReader(
- new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
tableRead);
+ new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
+ tableRead,
+ schemaChangeEvents);
}
private void innerTestOnce(int skip) throws Exception {
@@ -251,6 +263,92 @@ public class CDCSourceSplitReaderTest {
reader.close();
}
+ @Test
+ public void testSchemaChangeEventOnlyEmittedOnceInMultipleBatchSplit()
throws Exception {
+ TestChangelogDataReadWrite rw = new
TestChangelogDataReadWrite(tablePath);
+ CDCSourceSplitReader reader = createReader(rw.createReadWithKey(),
schemaChangeEvents());
+
+ List<Tuple2<Long, Long>> input1 = kvs();
+ List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
+
+ List<Tuple2<Long, Long>> input2 = kvs(6);
+ List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
+ files.addAll(files2);
+
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files));
+
+ RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records =
reader.fetch();
+ assertThat(readEventTypes(records, "id1"))
+ .containsExactly(
+ SchemaChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class);
+
+ records = reader.fetch();
+ assertThat(readEventTypes(records, "id1"))
+ .containsExactly(
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class,
+ DataChangeEvent.class);
+
+ records = reader.fetch();
+ assertRecords(records, "id1", "id1", 0, null);
+
+ reader.close();
+ }
+
+ @Test
+ public void testSchemaChangeEventDoesNotAdvanceRecordsToSkip() throws
Exception {
+ TestChangelogDataReadWrite rw = new
TestChangelogDataReadWrite(tablePath);
+ CDCSourceSplitReader reader = createReader(rw.createReadWithKey(),
schemaChangeEvents());
+
+ List<Tuple2<Long, Long>> input = kvs();
+ List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
+
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files));
+
+ RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records =
reader.fetch();
+ assertThat(readRecordSkipCounts(records, "id1"))
+ .containsExactly(0L, 1L, 2L, 3L, 4L, 5L, 6L);
+
+ reader.close();
+ }
+
+ @Test
+ public void testRestoreWithSchemaChangeEventsDoesNotReemitSchemaEvent()
throws Exception {
+ TestChangelogDataReadWrite rw = new
TestChangelogDataReadWrite(tablePath);
+ CDCSourceSplitReader reader = createReader(rw.createReadWithKey(),
schemaChangeEvents());
+
+ List<Tuple2<Long, Long>> input1 = kvs();
+ List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
+
+ List<Tuple2<Long, Long>> input2 = kvs(6);
+ List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
+ files.addAll(files2);
+
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files,
input1.size()));
+
+ RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records =
reader.fetch();
+ assertRecords(records, null, "id1", input1.size(),
Collections.emptyList());
+
+ records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id1",
+ input1.size(),
+ input2.stream().map(t -> t.f1).collect(Collectors.toList()));
+
+ reader.close();
+ }
+
@Test
public void testRestore() throws Exception {
TestChangelogDataReadWrite rw = new
TestChangelogDataReadWrite(tablePath);
@@ -456,6 +554,53 @@ public class CDCSourceSplitReaderTest {
return result;
}
+ private List<Class<?>> readEventTypes(
+ RecordsWithSplitIds<RecordIterator<Event>> records, String
nextSplit) {
+ assertThat(records.finishedSplits()).isEmpty();
+ assertThat(records.nextSplit()).isEqualTo(nextSplit);
+ List<Class<?>> result = new ArrayList<>();
+ RecordIterator<Event> iterator;
+ while ((iterator = records.nextRecordFromSplit()) != null) {
+ RecordAndPosition<Event> record;
+ while ((record = iterator.next()) != null) {
+ result.add(
+ record.getRecord() instanceof SchemaChangeEvent
+ ? SchemaChangeEvent.class
+ : DataChangeEvent.class);
+ }
+ }
+ records.recycle();
+ return result;
+ }
+
+ private List<Long> readRecordSkipCounts(
+ RecordsWithSplitIds<RecordIterator<Event>> records, String
nextSplit) {
+ assertThat(records.finishedSplits()).isEmpty();
+ assertThat(records.nextSplit()).isEqualTo(nextSplit);
+ List<Long> result = new ArrayList<>();
+ RecordIterator<Event> iterator;
+ while ((iterator = records.nextRecordFromSplit()) != null) {
+ RecordAndPosition<Event> record;
+ while ((record = iterator.next()) != null) {
+ result.add(record.getRecordSkipCount());
+ }
+ }
+ records.recycle();
+ return result;
+ }
+
+ private List<SchemaChangeEvent> schemaChangeEvents() {
+ return Collections.singletonList(
+ new AddColumnEvent(
+ TableId.tableId(DATABASE, TABLE),
+ Arrays.asList(
+ AddColumnEvent.last(
+ Column.physicalColumn(
+ "extra",
+
org.apache.flink.cdc.common.types.DataTypes
+ .BIGINT())))));
+ }
+
private List<Tuple2<Long, Long>> kvs() {
return kvs(0);
}
@@ -524,8 +669,11 @@ public class CDCSourceSplitReaderTest {
private static class TestCDCSourceSplitReader extends CDCSourceSplitReader
{
private final TableRead tableRead;
- public TestCDCSourceSplitReader(FileStoreSourceReaderMetrics metrics,
TableRead tableRead) {
- super(metrics, new TestTableManager(tableRead));
+ public TestCDCSourceSplitReader(
+ FileStoreSourceReaderMetrics metrics,
+ TableRead tableRead,
+ List<SchemaChangeEvent> schemaChangeEvents) {
+ super(metrics, new TestTableManager(tableRead,
schemaChangeEvents));
this.tableRead = tableRead;
}
@@ -555,10 +703,16 @@ public class CDCSourceSplitReaderTest {
private static class TestTableManager extends CDCSource.TableManager {
private final TableRead tableRead;
+ private final List<SchemaChangeEvent> schemaChangeEvents;
public TestTableManager(TableRead tableRead) {
+ this(tableRead, Collections.emptyList());
+ }
+
+ public TestTableManager(TableRead tableRead, List<SchemaChangeEvent>
schemaChangeEvents) {
super(null, null, null);
this.tableRead = tableRead;
+ this.schemaChangeEvents = schemaChangeEvents;
}
@Override
@@ -571,5 +725,11 @@ public class CDCSourceSplitReaderTest {
public TableRead getTableRead(Identifier identifier, TableSchema
schema) {
return tableRead;
}
+
+ @Override
+ public List<SchemaChangeEvent> generateSchemaChangeEventList(
+ Identifier identifier, @Nullable Long lastSchemaId, long
schemaId) {
+ return schemaChangeEvents;
+ }
}
}