This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e14824885 [flink][cdc] Fix schema event position in source reader 
(#8099)
6e14824885 is described below

commit 6e14824885462f0fdb9db33361e67aae51d30fa6
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 3 21:45:56 2026 +0800

    [flink][cdc] Fix schema event position in source reader (#8099)
---
 .../cdc/source/reader/CDCSourceSplitReader.java    |  60 ++++----
 .../source/reader/CDCSourceSplitReaderTest.java    | 166 ++++++++++++++++++++-
 2 files changed, 191 insertions(+), 35 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
index 48c0ef82f2..9aeb48567d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java
@@ -76,8 +76,9 @@ public class CDCSourceSplitReader
     private TableReaderInfo currentTableReaderInfo;
     @Nullable private LazyRecordReader currentReader;
     @Nullable private String currentSplitId;
-    private long currentNumRead;
+    private long currentDataRowsRead;
     private RecordIterator<InternalRow> currentFirstBatch;
+    private final Queue<SchemaChangeEvent> currentSchemaChangeEvents = new 
LinkedList<>();
 
     private boolean paused;
     private final AtomicBoolean wakeup;
@@ -183,6 +184,7 @@ public class CDCSourceSplitReader
 
     @Override
     public void close() throws Exception {
+        currentSchemaChangeEvents.clear();
         if (currentReader != null) {
             if (currentReader.lazyRecordReader != null) {
                 currentReader.lazyRecordReader.close();
@@ -214,13 +216,17 @@ public class CDCSourceSplitReader
         List<SchemaChangeEvent> schemaChangeEvents =
                 tableManager.generateSchemaChangeEventList(
                         identifier, nextSplit.getLastSchemaId(), 
nextSplit.getSchemaId());
-        currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema, 
schemaChangeEvents);
+        currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema);
         currentSplitId = nextSplit.splitId();
         currentReader = createLazyRecordReader(nextSplit.split());
-        currentNumRead = nextSplit.recordsToSkip();
+        currentDataRowsRead = nextSplit.recordsToSkip();
+        currentSchemaChangeEvents.clear();
+        if (currentDataRowsRead == 0) {
+            currentSchemaChangeEvents.addAll(schemaChangeEvents);
+        }
 
-        if (currentNumRead > 0) {
-            seek(currentNumRead);
+        if (currentDataRowsRead > 0) {
+            seek(currentDataRowsRead);
         }
     }
 
@@ -257,6 +263,7 @@ public class CDCSourceSplitReader
             currentReader = null;
         }
 
+        currentSchemaChangeEvents.clear();
         final CDCRecordsWithSplitIds finishRecords =
                 CDCRecordsWithSplitIds.finishedSplit(currentSplitId);
         currentSplitId = null;
@@ -271,33 +278,24 @@ public class CDCSourceSplitReader
                 new MutableRecordAndPosition<>();
 
         private TableReaderInfo tableReaderInfo;
-        private final Queue<SchemaChangeEvent> schemaChangeEventList = new 
LinkedList<>();
 
         public FileStoreRecordIterator replace(
                 RecordIterator<InternalRow> iterator, TableReaderInfo 
tableReaderInfo) {
             this.iterator = iterator;
-            this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, 
currentNumRead);
+            this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, 
currentDataRowsRead);
             this.tableReaderInfo = tableReaderInfo;
-            
this.schemaChangeEventList.addAll(tableReaderInfo.schemaChangeEvents);
             return this;
         }
 
         @Nullable
         @Override
         public RecordAndPosition<Event> next() {
-            Event event = nextEvent();
-            if (event == null) {
-                return null;
-            }
-
-            recordAndPosition.setNext(event);
-            currentNumRead++;
-            return recordAndPosition;
-        }
-
-        private Event nextEvent() {
-            if (!schemaChangeEventList.isEmpty()) {
-                return schemaChangeEventList.poll();
+            if (!currentSchemaChangeEvents.isEmpty()) {
+                recordAndPosition.set(
+                        currentSchemaChangeEvents.poll(),
+                        RecordAndPosition.NO_OFFSET,
+                        currentDataRowsRead);
+                return recordAndPosition;
             }
 
             InternalRow row;
@@ -310,11 +308,14 @@ public class CDCSourceSplitReader
                 return null;
             }
 
-            return convertRowToDataChangeEvent(
-                    tableReaderInfo.tableId,
-                    row,
-                    tableReaderInfo.fieldGetters,
-                    tableReaderInfo.generator);
+            recordAndPosition.setNext(
+                    convertRowToDataChangeEvent(
+                            tableReaderInfo.tableId,
+                            row,
+                            tableReaderInfo.fieldGetters,
+                            tableReaderInfo.generator));
+            currentDataRowsRead++;
+            return recordAndPosition;
         }
 
         @Override
@@ -358,19 +359,14 @@ public class CDCSourceSplitReader
         private final Identifier identifier;
         private final TableId tableId;
         private final TableSchema currentSchema;
-        private final List<SchemaChangeEvent> schemaChangeEvents;
         private final BinaryRecordDataGenerator generator;
         private final List<InternalRow.FieldGetter> fieldGetters;
 
-        private TableReaderInfo(
-                Identifier identifier,
-                TableSchema currentSchema,
-                List<SchemaChangeEvent> schemaChangeEvents) {
+        private TableReaderInfo(Identifier identifier, TableSchema 
currentSchema) {
             this.identifier = identifier;
             this.tableId = TableId.tableId(identifier.getDatabaseName(), 
identifier.getTableName());
 
             this.currentSchema = currentSchema;
-            this.schemaChangeEvents = schemaChangeEvents;
 
             org.apache.flink.cdc.common.schema.Schema currentCDCSchema =
                     convertPaimonSchemaToFlinkCDCSchema(currentSchema);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
index e12ff8f19e..82199811b5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java
@@ -42,9 +42,13 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.utils.RecordWriter;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
@@ -62,6 +66,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -144,8 +149,15 @@ public class CDCSourceSplitReaderTest {
     }
 
     private CDCSourceSplitReader createReader(TableRead tableRead) {
+        return createReader(tableRead, Collections.emptyList());
+    }
+
+    private CDCSourceSplitReader createReader(
+            TableRead tableRead, List<SchemaChangeEvent> schemaChangeEvents) {
         return new TestCDCSourceSplitReader(
-                new FileStoreSourceReaderMetrics(new DummyMetricGroup()), 
tableRead);
+                new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
+                tableRead,
+                schemaChangeEvents);
     }
 
     private void innerTestOnce(int skip) throws Exception {
@@ -251,6 +263,92 @@ public class CDCSourceSplitReaderTest {
         reader.close();
     }
 
+    @Test
+    public void testSchemaChangeEventOnlyEmittedOnceInMultipleBatchSplit() 
throws Exception {
+        TestChangelogDataReadWrite rw = new 
TestChangelogDataReadWrite(tablePath);
+        CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), 
schemaChangeEvents());
+
+        List<Tuple2<Long, Long>> input1 = kvs();
+        List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
+
+        List<Tuple2<Long, Long>> input2 = kvs(6);
+        List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
+        files.addAll(files2);
+
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files));
+
+        RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records = 
reader.fetch();
+        assertThat(readEventTypes(records, "id1"))
+                .containsExactly(
+                        SchemaChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class);
+
+        records = reader.fetch();
+        assertThat(readEventTypes(records, "id1"))
+                .containsExactly(
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class,
+                        DataChangeEvent.class);
+
+        records = reader.fetch();
+        assertRecords(records, "id1", "id1", 0, null);
+
+        reader.close();
+    }
+
+    @Test
+    public void testSchemaChangeEventDoesNotAdvanceRecordsToSkip() throws 
Exception {
+        TestChangelogDataReadWrite rw = new 
TestChangelogDataReadWrite(tablePath);
+        CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), 
schemaChangeEvents());
+
+        List<Tuple2<Long, Long>> input = kvs();
+        List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
+
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files));
+
+        RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records = 
reader.fetch();
+        assertThat(readRecordSkipCounts(records, "id1"))
+                .containsExactly(0L, 1L, 2L, 3L, 4L, 5L, 6L);
+
+        reader.close();
+    }
+
+    @Test
+    public void testRestoreWithSchemaChangeEventsDoesNotReemitSchemaEvent() 
throws Exception {
+        TestChangelogDataReadWrite rw = new 
TestChangelogDataReadWrite(tablePath);
+        CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), 
schemaChangeEvents());
+
+        List<Tuple2<Long, Long>> input1 = kvs();
+        List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);
+
+        List<Tuple2<Long, Long>> input2 = kvs(6);
+        List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
+        files.addAll(files2);
+
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 
input1.size()));
+
+        RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records = 
reader.fetch();
+        assertRecords(records, null, "id1", input1.size(), 
Collections.emptyList());
+
+        records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id1",
+                input1.size(),
+                input2.stream().map(t -> t.f1).collect(Collectors.toList()));
+
+        reader.close();
+    }
+
     @Test
     public void testRestore() throws Exception {
         TestChangelogDataReadWrite rw = new 
TestChangelogDataReadWrite(tablePath);
@@ -456,6 +554,53 @@ public class CDCSourceSplitReaderTest {
         return result;
     }
 
+    private List<Class<?>> readEventTypes(
+            RecordsWithSplitIds<RecordIterator<Event>> records, String 
nextSplit) {
+        assertThat(records.finishedSplits()).isEmpty();
+        assertThat(records.nextSplit()).isEqualTo(nextSplit);
+        List<Class<?>> result = new ArrayList<>();
+        RecordIterator<Event> iterator;
+        while ((iterator = records.nextRecordFromSplit()) != null) {
+            RecordAndPosition<Event> record;
+            while ((record = iterator.next()) != null) {
+                result.add(
+                        record.getRecord() instanceof SchemaChangeEvent
+                                ? SchemaChangeEvent.class
+                                : DataChangeEvent.class);
+            }
+        }
+        records.recycle();
+        return result;
+    }
+
+    private List<Long> readRecordSkipCounts(
+            RecordsWithSplitIds<RecordIterator<Event>> records, String 
nextSplit) {
+        assertThat(records.finishedSplits()).isEmpty();
+        assertThat(records.nextSplit()).isEqualTo(nextSplit);
+        List<Long> result = new ArrayList<>();
+        RecordIterator<Event> iterator;
+        while ((iterator = records.nextRecordFromSplit()) != null) {
+            RecordAndPosition<Event> record;
+            while ((record = iterator.next()) != null) {
+                result.add(record.getRecordSkipCount());
+            }
+        }
+        records.recycle();
+        return result;
+    }
+
+    private List<SchemaChangeEvent> schemaChangeEvents() {
+        return Collections.singletonList(
+                new AddColumnEvent(
+                        TableId.tableId(DATABASE, TABLE),
+                        Arrays.asList(
+                                AddColumnEvent.last(
+                                        Column.physicalColumn(
+                                                "extra",
+                                                
org.apache.flink.cdc.common.types.DataTypes
+                                                        .BIGINT())))));
+    }
+
     private List<Tuple2<Long, Long>> kvs() {
         return kvs(0);
     }
@@ -524,8 +669,11 @@ public class CDCSourceSplitReaderTest {
     private static class TestCDCSourceSplitReader extends CDCSourceSplitReader 
{
         private final TableRead tableRead;
 
-        public TestCDCSourceSplitReader(FileStoreSourceReaderMetrics metrics, 
TableRead tableRead) {
-            super(metrics, new TestTableManager(tableRead));
+        public TestCDCSourceSplitReader(
+                FileStoreSourceReaderMetrics metrics,
+                TableRead tableRead,
+                List<SchemaChangeEvent> schemaChangeEvents) {
+            super(metrics, new TestTableManager(tableRead, 
schemaChangeEvents));
             this.tableRead = tableRead;
         }
 
@@ -555,10 +703,16 @@ public class CDCSourceSplitReaderTest {
 
     private static class TestTableManager extends CDCSource.TableManager {
         private final TableRead tableRead;
+        private final List<SchemaChangeEvent> schemaChangeEvents;
 
         public TestTableManager(TableRead tableRead) {
+            this(tableRead, Collections.emptyList());
+        }
+
+        public TestTableManager(TableRead tableRead, List<SchemaChangeEvent> 
schemaChangeEvents) {
             super(null, null, null);
             this.tableRead = tableRead;
+            this.schemaChangeEvents = schemaChangeEvents;
         }
 
         @Override
@@ -571,5 +725,11 @@ public class CDCSourceSplitReaderTest {
         public TableRead getTableRead(Identifier identifier, TableSchema 
schema) {
             return tableRead;
         }
+
+        @Override
+        public List<SchemaChangeEvent> generateSchemaChangeEventList(
+                Identifier identifier, @Nullable Long lastSchemaId, long 
schemaId) {
+            return schemaChangeEvents;
+        }
     }
 }

Reply via email to