This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db2b8fa13 [core] Code Refactor for Data Evolution
2db2b8fa13 is described below

commit 2db2b8fa136c806ae716a60f723630613093c15e
Author: JingsongLi <jingsongl...@gmail.com>
AuthorDate: Sat Aug 9 10:56:18 2025 +0800

    [core] Code Refactor for Data Evolution
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   2 +-
 .../java/org/apache/paimon/schema/TableSchema.java |  16 +--
 ...ileReader.java => DataEvolutionFileReader.java} |  33 +++---
 .../org/apache/paimon/AppendOnlyFileStore.java     |   8 +-
 .../org/apache/paimon/append/AppendOnlyWriter.java |   8 +-
 .../paimon/operation/BaseAppendFileStoreWrite.java |  13 +-
 ...eSplitRead.java => DataEvolutionSplitRead.java} |  12 +-
 .../org/apache/paimon/schema/SchemaValidation.java |   2 +-
 .../paimon/table/AppendOnlyFileStoreTable.java     |  25 +++-
 .../apache/paimon/table/sink/BatchTableWrite.java  |   5 +-
 .../paimon/table/source/AppendTableRead.java       |  36 ++----
 .../paimon/table/source/KeyValueTableRead.java     |  16 +--
 .../AppendTableRawFileSplitReadProvider.java       |   7 +-
 ...er.java => DataEvolutionSplitReadProvider.java} |  54 +++++----
 .../IncrementalChangelogReadProvider.java          |  15 +--
 .../splitread/IncrementalDiffReadProvider.java     |  15 +--
 .../splitread/MergeFieldSplitReadProvider.java     |  78 ------------
 .../splitread/MergeFileSplitReadProvider.java      |  15 +--
 .../PrimaryKeyTableRawFileSplitReadProvider.java   |   7 +-
 .../source/splitread/RawFileSplitReadProvider.java |  16 +--
 ...SplitReadProvider.java => SplitReadConfig.java} |  11 +-
 .../table/source/splitread/SplitReadProvider.java  |   5 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |   2 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   2 +-
 ...eFieldTest.java => DataEvolutionTableTest.java} |  13 +-
 .../DataEvolutionSplitReadProviderTest.java        | 132 +++++++++++++++++++++
 .../AppendPreCommitCompactWorkerOperator.java      |   2 +-
 27 files changed, 288 insertions(+), 262 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index a464e6dba6..32eff542a4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2901,7 +2901,7 @@ public class CoreOptions implements Serializable {
         return options.get(ROW_TRACKING_ENABLED);
     }
 
-    public boolean dataElolutionEnabled() {
+    public boolean dataEvolutionEnabled() {
         return options.get(DATA_EVOLUTION_ENABLED);
     }
 
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
index 0680a65675..6e012c016b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -274,23 +274,11 @@ public class TableSchema implements Serializable {
         if (writeCols == null || writeCols.isEmpty()) {
             return this;
         }
-        Map<String, DataField> fieldMap =
-                fields.stream()
-                        .collect(Collectors.toMap(DataField::name, field -> 
field, (a, b) -> a));
-        List<DataField> fields = new ArrayList<>();
-        for (String fieldId : writeCols) {
-            DataField dataField = fieldMap.get(fieldId);
-            if (dataField == null) {
-                throw new RuntimeException(
-                        String.format(
-                                "Projecting field %s, but not found in schema 
%s.", fieldId, this));
-            }
-            fields.add(dataField);
-        }
+
         return new TableSchema(
                 version,
                 id,
-                fields,
+                new RowType(fields).project(writeCols).getFields(),
                 highestFieldId,
                 partitionKeys,
                 primaryKeys,
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
similarity index 89%
rename from 
paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java
rename to 
paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
index 65748f5161..9c75f0e114 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
@@ -59,13 +59,13 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  *
  * </pre>
  */
-public class CompoundFileReader implements RecordReader<InternalRow> {
+public class DataEvolutionFileReader implements RecordReader<InternalRow> {
 
     private final int[] rowOffsets;
     private final int[] fieldOffsets;
     private final RecordReader<InternalRow>[] innerReaders;
 
-    public CompoundFileReader(
+    public DataEvolutionFileReader(
             int[] rowOffsets, int[] fieldOffsets, RecordReader<InternalRow>[] 
readers) {
         checkArgument(rowOffsets != null, "Row offsets must not be null");
         checkArgument(fieldOffsets != null, "Field offsets must not be null");
@@ -82,17 +82,17 @@ public class CompoundFileReader implements 
RecordReader<InternalRow> {
     @Override
     @Nullable
     public RecordIterator<InternalRow> readBatch() throws IOException {
-        CompoundRecordIterator compundFileRecordIterator =
-                new CompoundRecordIterator(innerReaders.length, rowOffsets, 
fieldOffsets);
+        DataEvolutionIterator iterator =
+                new DataEvolutionIterator(innerReaders.length, rowOffsets, 
fieldOffsets);
         for (int i = 0; i < innerReaders.length; i++) {
             RecordIterator<InternalRow> batch = innerReaders[i].readBatch();
             if (batch == null && !(innerReaders[i] instanceof 
EmptyFileRecordReader)) {
                 return null;
             }
-            compundFileRecordIterator.compound(i, batch);
+            iterator.set(i, batch);
         }
 
-        return compundFileRecordIterator;
+        return iterator;
     }
 
     @Override
@@ -105,20 +105,21 @@ public class CompoundFileReader implements 
RecordReader<InternalRow> {
     }
 
     /** The batch which is made up by several batches. */
-    public static class CompoundRecordIterator implements 
RecordIterator<InternalRow> {
+    private static class DataEvolutionIterator implements 
RecordIterator<InternalRow> {
 
-        private final CompundInternalRow compundInternalRow;
+        private final DataEvolutionRow dataEvolutionRow;
         private final RecordIterator<InternalRow>[] iterators;
 
-        public CompoundRecordIterator(
+        private DataEvolutionIterator(
                 int rowNumber,
                 int[] rowOffsets,
                 int[] fieldOffsets) { // Initialize with empty arrays, will be 
set later
-            this.compundInternalRow = new CompundInternalRow(rowNumber, 
rowOffsets, fieldOffsets);
+            this.dataEvolutionRow = new DataEvolutionRow(rowNumber, 
rowOffsets, fieldOffsets);
+            //noinspection unchecked
             this.iterators = new RecordIterator[rowNumber];
         }
 
-        public void compound(int i, RecordIterator<InternalRow> iterator) {
+        public void set(int i, RecordIterator<InternalRow> iterator) {
             iterators[i] = iterator;
         }
 
@@ -131,10 +132,10 @@ public class CompoundFileReader implements 
RecordReader<InternalRow> {
                     if (next == null) {
                         return null;
                     }
-                    compundInternalRow.setRow(i, next);
+                    dataEvolutionRow.setRow(i, next);
                 }
             }
-            return compundInternalRow;
+            return dataEvolutionRow;
         }
 
         @Override
@@ -148,19 +149,19 @@ public class CompoundFileReader implements 
RecordReader<InternalRow> {
     }
 
     /** The row which is made up by several rows. */
-    public static class CompundInternalRow implements InternalRow {
+    private static class DataEvolutionRow implements InternalRow {
 
         private final InternalRow[] rows;
         private final int[] rowOffsets;
         private final int[] fieldOffsets;
 
-        public CompundInternalRow(int rowNumber, int[] rowOffsets, int[] 
fieldOffsets) {
+        private DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] 
fieldOffsets) {
             this.rows = new InternalRow[rowNumber];
             this.rowOffsets = rowOffsets;
             this.fieldOffsets = fieldOffsets;
         }
 
-        public void setRow(int pos, InternalRow row) {
+        private void setRow(int pos, InternalRow row) {
             if (pos >= rows.length) {
                 throw new IndexOutOfBoundsException(
                         "Position " + pos + " is out of bounds for rows size " 
+ rows.length);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 7f0a6b5f19..be3d1662c1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -27,7 +27,7 @@ import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.operation.BucketSelectConverter;
 import org.apache.paimon.operation.BucketedAppendFileStoreWrite;
-import org.apache.paimon.operation.FieldMergeSplitRead;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
 import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
@@ -86,12 +86,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 options.rowTrackingEnabled());
     }
 
-    public FieldMergeSplitRead newFieldMergeRead() {
-        if (!options.dataElolutionEnabled()) {
+    public DataEvolutionSplitRead newDataEvolutionRead() {
+        if (!options.dataEvolutionEnabled()) {
             throw new IllegalStateException(
                     "Field merge read is only supported when 
data-evolution.enabled is true.");
         }
-        return new FieldMergeSplitRead(
+        return new DataEvolutionSplitRead(
                 fileIO,
                 schemaManager,
                 schema,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 646ef382a4..f8a074cab8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -69,8 +69,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     private final long schemaId;
     private final FileFormat fileFormat;
     private final long targetFileSize;
-    private final RowType rowType;
     private final RowType writeSchema;
+    @Nullable private final List<String> writeCols;
     private final DataFilePathFactory pathFactory;
     private final CompactManager compactManager;
     private final IOFunction<List<DataFileMeta>, 
RecordReaderIterator<InternalRow>> dataFileRead;
@@ -99,8 +99,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
             long schemaId,
             FileFormat fileFormat,
             long targetFileSize,
-            RowType rowType,
             RowType writeSchema,
+            @Nullable List<String> writeCols,
             long maxSequenceNumber,
             CompactManager compactManager,
             IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> 
dataFileRead,
@@ -120,8 +120,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
         this.targetFileSize = targetFileSize;
-        this.rowType = rowType;
         this.writeSchema = writeSchema;
+        this.writeCols = writeCols;
         this.pathFactory = pathFactory;
         this.compactManager = compactManager;
         this.dataFileRead = dataFileRead;
@@ -304,7 +304,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
                 FileSource.APPEND,
                 asyncFileWrite,
                 statsDenseStore,
-                writeSchema.equals(rowType) ? null : 
writeSchema.getFieldNames());
+                writeCols);
     }
 
     private void trySyncLatestCompaction(boolean blocking)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index eb2f1f669c..dca05a6522 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -75,6 +75,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     private final RowType rowType;
 
     private RowType writeType;
+    private @Nullable List<String> writeCols;
     private boolean forceBufferSpill = false;
 
     public BaseAppendFileStoreWrite(
@@ -95,6 +96,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         this.schemaId = schemaId;
         this.rowType = rowType;
         this.writeType = rowType;
+        this.writeCols = null;
         this.fileFormat = fileFormat(options);
         this.pathFactory = pathFactory;
 
@@ -116,8 +118,8 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 schemaId,
                 fileFormat,
                 options.targetFileSize(false),
-                rowType,
                 writeType,
+                writeCols,
                 restoredMaxSeqNumber,
                 getCompactManager(partition, bucket, restoredFiles, 
compactExecutor, dvMaintainer),
                 // it is only for new files, no dv
@@ -139,6 +141,15 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     @Override
     public void withWriteType(RowType writeType) {
         this.writeType = writeType;
+        int fullCount = rowType.getFieldCount();
+        List<String> fullNames = rowType.getFieldNames();
+        this.writeCols = writeType.getFieldNames();
+        // optimize writeCols to null in following cases:
+        // 1. writeType contains all columns
+        // 2. writeType contains all columns and append _ROW_ID cols
+        if (writeCols.size() >= fullCount && writeCols.subList(0, 
fullCount).equals(fullNames)) {
+            writeCols = null;
+        }
     }
 
     private SimpleColStatsCollector.Factory[] statsCollectors() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
similarity index 96%
rename from 
paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java
rename to 
paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 94b0d339ec..af1e5b767f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -27,7 +27,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.reader.CompoundFileReader;
+import org.apache.paimon.reader.DataEvolutionFileReader;
 import org.apache.paimon.reader.EmptyFileRecordReader;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
@@ -56,9 +56,9 @@ import static java.lang.String.format;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
-public class FieldMergeSplitRead extends RawFileSplitRead {
+public class DataEvolutionSplitRead extends RawFileSplitRead {
 
-    public FieldMergeSplitRead(
+    public DataEvolutionSplitRead(
             FileIO fileIO,
             SchemaManager schemaManager,
             TableSchema schema,
@@ -106,7 +106,7 @@ public class FieldMergeSplitRead extends RawFileSplitRead {
             } else {
                 suppliers.add(
                         () ->
-                                createCompoundFileReader(
+                                createFileReader(
                                         needMergeFiles,
                                         partition,
                                         dataFilePathFactory,
@@ -118,7 +118,7 @@ public class FieldMergeSplitRead extends RawFileSplitRead {
         return ConcatRecordReader.create(suppliers);
     }
 
-    private CompoundFileReader createCompoundFileReader(
+    private DataEvolutionFileReader createFileReader(
             List<DataFileMeta> needMergeFiles,
             BinaryRow partition,
             DataFilePathFactory dataFilePathFactory,
@@ -222,6 +222,6 @@ public class FieldMergeSplitRead extends RawFileSplitRead {
                                 allReadFields.get(i)));
             }
         }
-        return new CompoundFileReader(rowOffsets, fieldOffsets, 
fileRecordReaders);
+        return new DataEvolutionFileReader(rowOffsets, fieldOffsets, 
fileRecordReaders);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 82e62896df..06d7d4117f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -650,7 +650,7 @@ public class SchemaValidation {
     }
 
     private static void validateDataEvolution(CoreOptions options) {
-        if (options.dataElolutionEnabled()) {
+        if (options.dataEvolutionEnabled()) {
             checkArgument(
                     options.rowTrackingEnabled(),
                     "Data evolution config must enabled with 
row-tracking.enabled");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 9f59da690e..367eeb714b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -35,11 +35,18 @@ import org.apache.paimon.table.source.AppendTableRead;
 import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.SplitGenerator;
+import 
org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider;
+import org.apache.paimon.table.source.splitread.DataEvolutionSplitReadProvider;
+import org.apache.paimon.table.source.splitread.SplitReadConfig;
+import org.apache.paimon.table.source.splitread.SplitReadProvider;
 import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 
 /** {@link FileStoreTable} for append table. */
 public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
@@ -82,7 +89,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     protected SplitGenerator splitGenerator() {
         long targetSplitSize = store().options().splitTargetSize();
         long openFileCost = store().options().splitOpenFileCost();
-        return coreOptions().dataElolutionEnabled()
+        return coreOptions().dataEvolutionEnabled()
                 ? new DataEvolutionSplitGenerator(targetSplitSize, 
openFileCost)
                 : new AppendOnlySplitGenerator(targetSplitSize, openFileCost, 
bucketMode());
     }
@@ -99,11 +106,17 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public InnerTableRead newRead() {
-        return new AppendTableRead(
-                () -> store().newRead(),
-                () -> store().newFieldMergeRead(),
-                schema(),
-                coreOptions());
+        List<Function<SplitReadConfig, SplitReadProvider>> providerFactories = 
new ArrayList<>();
+        if (coreOptions().dataEvolutionEnabled()) {
+            // add data evolution first
+            providerFactories.add(
+                    config ->
+                            new DataEvolutionSplitReadProvider(
+                                    () -> store().newDataEvolutionRead(), 
config));
+        }
+        providerFactories.add(
+                config -> new AppendTableRawFileSplitReadProvider(() -> 
store().newRead(), config));
+        return new AppendTableRead(providerFactories, schema());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
index d8616d27f5..416bb58a30 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
@@ -38,6 +38,9 @@ public interface BatchTableWrite extends TableWrite {
      */
     List<CommitMessage> prepareCommit() throws Exception;
 
-    /** Specified the write rowType. */
+    /**
+     * Specified the writing rowType, currently only work for table without 
primary key and row
+     * tracking enabled.
+     */
     BatchTableWrite withWriteType(RowType writeType);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 063f09e841..83e26301ea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -18,17 +18,13 @@
 
 package org.apache.paimon.table.source;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.FieldMergeSplitRead;
 import org.apache.paimon.operation.MergeFileSplitRead;
-import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
-import 
org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider;
-import org.apache.paimon.table.source.splitread.MergeFieldSplitReadProvider;
+import org.apache.paimon.table.source.splitread.SplitReadConfig;
 import org.apache.paimon.table.source.splitread.SplitReadProvider;
 import org.apache.paimon.types.RowType;
 
@@ -37,7 +33,8 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.Supplier;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * An abstraction layer above {@link MergeFileSplitRead} to provide reading of 
{@link InternalRow}.
@@ -50,33 +47,26 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
     private Predicate predicate = null;
 
     public AppendTableRead(
-            Supplier<RawFileSplitRead> batchRawReadSupplier,
-            Supplier<FieldMergeSplitRead> fieldMergeSplitReadSupplier,
-            TableSchema schema,
-            CoreOptions coreOptions) {
+            List<Function<SplitReadConfig, SplitReadProvider>> 
providerFactories,
+            TableSchema schema) {
         super(schema);
-        this.readProviders = new ArrayList<>();
-        if (coreOptions.dataElolutionEnabled()) {
-            // MergeFieldSplitReadProvider is used to read the field merge 
split
-            readProviders.add(
-                    new MergeFieldSplitReadProvider(
-                            fieldMergeSplitReadSupplier, this::assignValues));
-        }
-        readProviders.add(
-                new AppendTableRawFileSplitReadProvider(batchRawReadSupplier, 
this::assignValues));
+        this.readProviders =
+                providerFactories.stream()
+                        .map(factory -> factory.apply(this::config))
+                        .collect(Collectors.toList());
     }
 
     private List<SplitRead<InternalRow>> initialized() {
         List<SplitRead<InternalRow>> readers = new ArrayList<>();
         for (SplitReadProvider readProvider : readProviders) {
-            if (readProvider.initialized()) {
-                readers.add(readProvider.getOrCreate());
+            if (readProvider.get().initialized()) {
+                readers.add(readProvider.get().get());
             }
         }
         return readers;
     }
 
-    private void assignValues(SplitRead<InternalRow> read) {
+    private void config(SplitRead<InternalRow> read) {
         if (readType != null) {
             read = read.withReadType(readType);
         }
@@ -101,7 +91,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         DataSplit dataSplit = (DataSplit) split;
         for (SplitReadProvider readProvider : readProviders) {
             if (readProvider.match(dataSplit, false)) {
-                return readProvider.getOrCreate().createReader(dataSplit);
+                return readProvider.get().get().createReader(dataSplit);
             }
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 4f6c6515f7..c4bede61d8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -63,23 +63,23 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         this.readProviders =
                 Arrays.asList(
                         new PrimaryKeyTableRawFileSplitReadProvider(
-                                batchRawReadSupplier, this::assignValues),
-                        new MergeFileSplitReadProvider(mergeReadSupplier, 
this::assignValues),
-                        new 
IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues),
-                        new IncrementalDiffReadProvider(mergeReadSupplier, 
this::assignValues));
+                                batchRawReadSupplier, this::config),
+                        new MergeFileSplitReadProvider(mergeReadSupplier, 
this::config),
+                        new 
IncrementalChangelogReadProvider(mergeReadSupplier, this::config),
+                        new IncrementalDiffReadProvider(mergeReadSupplier, 
this::config));
     }
 
     private List<SplitRead<InternalRow>> initialized() {
         List<SplitRead<InternalRow>> readers = new ArrayList<>();
         for (SplitReadProvider readProvider : readProviders) {
-            if (readProvider.initialized()) {
-                readers.add(readProvider.getOrCreate());
+            if (readProvider.get().initialized()) {
+                readers.add(readProvider.get().get());
             }
         }
         return readers;
     }
 
-    private void assignValues(SplitRead<InternalRow> read) {
+    private void config(SplitRead<InternalRow> read) {
         if (forceKeepDelete) {
             read = read.forceKeepDelete();
         }
@@ -121,7 +121,7 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         DataSplit dataSplit = (DataSplit) split;
         for (SplitReadProvider readProvider : readProviders) {
             if (readProvider.match(dataSplit, forceKeepDelete)) {
-                return readProvider.getOrCreate().createReader(dataSplit);
+                return readProvider.get().get().createReader(dataSplit);
             }
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
index 4dd85e839d..4c6bf86008 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
@@ -18,20 +18,17 @@
 
 package org.apache.paimon.table.source.splitread;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.table.source.DataSplit;
 
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /** Raw file split read for all append table. */
 public class AppendTableRawFileSplitReadProvider extends 
RawFileSplitReadProvider {
 
     public AppendTableRawFileSplitReadProvider(
-            Supplier<RawFileSplitRead> supplier, 
Consumer<SplitRead<InternalRow>> valuesAssigner) {
-        super(supplier, valuesAssigner);
+            Supplier<RawFileSplitRead> supplier, SplitReadConfig 
splitReadConfig) {
+        super(supplier, splitReadConfig);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
similarity index 51%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
index a335a7c030..bcf2e9fa1b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
@@ -18,48 +18,56 @@
 
 package org.apache.paimon.table.source.splitread;
 
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.MergeFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.utils.LazyField;
 
-import java.util.function.Consumer;
+import java.util.List;
 import java.util.function.Supplier;
 
-/** A {@link SplitReadProvider} to batch incremental diff read. */
-public class IncrementalDiffReadProvider implements SplitReadProvider {
+/** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */
+public class DataEvolutionSplitReadProvider implements SplitReadProvider {
 
-    private final LazyField<SplitRead<InternalRow>> splitRead;
+    private final LazyField<DataEvolutionSplitRead> splitRead;
 
-    public IncrementalDiffReadProvider(
-            Supplier<MergeFileSplitRead> supplier,
-            Consumer<SplitRead<InternalRow>> valuesAssigner) {
+    public DataEvolutionSplitReadProvider(
+            Supplier<DataEvolutionSplitRead> supplier, SplitReadConfig 
splitReadConfig) {
         this.splitRead =
                 new LazyField<>(
                         () -> {
-                            SplitRead<InternalRow> read = create(supplier);
-                            valuesAssigner.accept(read);
+                            DataEvolutionSplitRead read = supplier.get();
+                            splitReadConfig.config(read);
                             return read;
                         });
     }
 
-    private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> 
supplier) {
-        return new IncrementalDiffSplitRead(supplier.get());
-    }
-
     @Override
     public boolean match(DataSplit split, boolean forceKeepDelete) {
-        return !split.beforeFiles().isEmpty() && !split.isStreaming();
-    }
+        List<DataFileMeta> files = split.dataFiles();
+        if (files.size() < 2) {
+            return false;
+        }
 
-    @Override
-    public boolean initialized() {
-        return splitRead.initialized();
+        Long firstRowId = null;
+        for (DataFileMeta file : files) {
+            Long current = file.firstRowId();
+            if (current == null) {
+                return false;
+            }
+
+            if (firstRowId == null) {
+                firstRowId = current;
+            } else if (!firstRowId.equals(current)) {
+                return false;
+            }
+        }
+
+        return true;
     }
 
     @Override
-    public SplitRead<InternalRow> getOrCreate() {
-        return splitRead.get();
+    public LazyField<DataEvolutionSplitRead> get() {
+        return splitRead;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index eb41d02669..19d95001ba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -30,7 +30,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOFunction;
 import org.apache.paimon.utils.LazyField;
 
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
@@ -41,13 +40,12 @@ public class IncrementalChangelogReadProvider implements 
SplitReadProvider {
     private final LazyField<SplitRead<InternalRow>> splitRead;
 
     public IncrementalChangelogReadProvider(
-            Supplier<MergeFileSplitRead> supplier,
-            Consumer<SplitRead<InternalRow>> valuesAssigner) {
+            Supplier<MergeFileSplitRead> supplier, SplitReadConfig 
splitReadConfig) {
         this.splitRead =
                 new LazyField<>(
                         () -> {
                             SplitRead<InternalRow> read = create(supplier);
-                            valuesAssigner.accept(read);
+                            splitReadConfig.config(read);
                             return read;
                         });
     }
@@ -86,12 +84,7 @@ public class IncrementalChangelogReadProvider implements 
SplitReadProvider {
     }
 
     @Override
-    public boolean initialized() {
-        return splitRead.initialized();
-    }
-
-    @Override
-    public SplitRead<InternalRow> getOrCreate() {
-        return splitRead.get();
+    public LazyField<SplitRead<InternalRow>> get() {
+        return splitRead;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
index a335a7c030..6d45aeed7e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
@@ -24,7 +24,6 @@ import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.utils.LazyField;
 
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /** A {@link SplitReadProvider} to batch incremental diff read. */
@@ -33,13 +32,12 @@ public class IncrementalDiffReadProvider implements 
SplitReadProvider {
     private final LazyField<SplitRead<InternalRow>> splitRead;
 
     public IncrementalDiffReadProvider(
-            Supplier<MergeFileSplitRead> supplier,
-            Consumer<SplitRead<InternalRow>> valuesAssigner) {
+            Supplier<MergeFileSplitRead> supplier, SplitReadConfig 
splitReadConfig) {
         this.splitRead =
                 new LazyField<>(
                         () -> {
                             SplitRead<InternalRow> read = create(supplier);
-                            valuesAssigner.accept(read);
+                            splitReadConfig.config(read);
                             return read;
                         });
     }
@@ -54,12 +52,7 @@ public class IncrementalDiffReadProvider implements 
SplitReadProvider {
     }
 
     @Override
-    public boolean initialized() {
-        return splitRead.initialized();
-    }
-
-    @Override
-    public SplitRead<InternalRow> getOrCreate() {
-        return splitRead.get();
+    public LazyField<SplitRead<InternalRow>> get() {
+        return splitRead;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java
deleted file mode 100644
index 59a1f31b2a..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.splitread;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.operation.FieldMergeSplitRead;
-import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
-import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.utils.LazyField;
-
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/** A {@link SplitReadProvider} to create {@link MergeFieldSplitReadProvider}. 
*/
-public class MergeFieldSplitReadProvider implements SplitReadProvider {
-
-    private final LazyField<FieldMergeSplitRead> splitRead;
-
-    public MergeFieldSplitReadProvider(
-            Supplier<FieldMergeSplitRead> supplier,
-            Consumer<SplitRead<InternalRow>> valuesAssigner) {
-        this.splitRead =
-                new LazyField<>(
-                        () -> {
-                            FieldMergeSplitRead read = supplier.get();
-                            valuesAssigner.accept(read);
-                            return read;
-                        });
-    }
-
-    @Override
-    public boolean match(DataSplit split, boolean forceKeepDelete) {
-        List<DataFileMeta> files = split.dataFiles();
-        boolean onlyAppendFiles =
-                files.stream()
-                        .allMatch(
-                                f ->
-                                        f.fileSource().isPresent()
-                                                && f.fileSource().get() == 
FileSource.APPEND
-                                                && f.firstRowId() != null);
-        if (onlyAppendFiles) {
-            // contains same first row id, need merge fields
-            return 
files.stream().mapToLong(DataFileMeta::firstRowId).distinct().count()
-                    != files.size();
-        }
-        return false;
-    }
-
-    @Override
-    public boolean initialized() {
-        return splitRead.initialized();
-    }
-
-    @Override
-    public RawFileSplitRead getOrCreate() {
-        return splitRead.get();
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index a748828912..5aa0a8d62e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -25,7 +25,6 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.LazyField;
 
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
@@ -36,13 +35,12 @@ public class MergeFileSplitReadProvider implements 
SplitReadProvider {
     private final LazyField<SplitRead<InternalRow>> splitRead;
 
     public MergeFileSplitReadProvider(
-            Supplier<MergeFileSplitRead> supplier,
-            Consumer<SplitRead<InternalRow>> valuesAssigner) {
+            Supplier<MergeFileSplitRead> supplier, SplitReadConfig 
splitReadConfig) {
         this.splitRead =
                 new LazyField<>(
                         () -> {
                             SplitRead<InternalRow> read = create(supplier);
-                            valuesAssigner.accept(read);
+                            splitReadConfig.config(read);
                             return read;
                         });
     }
@@ -58,12 +56,7 @@ public class MergeFileSplitReadProvider implements 
SplitReadProvider {
     }
 
     @Override
-    public boolean initialized() {
-        return splitRead.initialized();
-    }
-
-    @Override
-    public SplitRead<InternalRow> getOrCreate() {
-        return splitRead.get();
+    public LazyField<SplitRead<InternalRow>> get() {
+        return splitRead;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
index d52e2a902b..ab8671c966 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
@@ -18,21 +18,18 @@
 
 package org.apache.paimon.table.source.splitread;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.table.source.DataSplit;
 
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /** Raw file split read for all primary key table. */
 public class PrimaryKeyTableRawFileSplitReadProvider extends 
RawFileSplitReadProvider {
 
     public PrimaryKeyTableRawFileSplitReadProvider(
-            Supplier<RawFileSplitRead> supplier, 
Consumer<SplitRead<InternalRow>> valuesAssigner) {
-        super(supplier, valuesAssigner);
+            Supplier<RawFileSplitRead> supplier, SplitReadConfig 
splitReadConfig) {
+        super(supplier, splitReadConfig);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
index bba0b557ec..75ab25ca91 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
@@ -18,12 +18,9 @@
 
 package org.apache.paimon.table.source.splitread;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.utils.LazyField;
 
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /** A {@link SplitReadProvider} to create {@link RawFileSplitRead}. */
@@ -32,23 +29,18 @@ public abstract class RawFileSplitReadProvider implements 
SplitReadProvider {
     private final LazyField<RawFileSplitRead> splitRead;
 
     public RawFileSplitReadProvider(
-            Supplier<RawFileSplitRead> supplier, 
Consumer<SplitRead<InternalRow>> valuesAssigner) {
+            Supplier<RawFileSplitRead> supplier, SplitReadConfig 
splitReadConfig) {
         this.splitRead =
                 new LazyField<>(
                         () -> {
                             RawFileSplitRead read = supplier.get();
-                            valuesAssigner.accept(read);
+                            splitReadConfig.config(read);
                             return read;
                         });
     }
 
     @Override
-    public boolean initialized() {
-        return splitRead.initialized();
-    }
-
-    @Override
-    public SplitRead<InternalRow> getOrCreate() {
-        return splitRead.get();
+    public LazyField<RawFileSplitRead> get() {
+        return splitRead;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java
similarity index 78%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java
index 2aaefb322f..83ebea0686 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java
@@ -20,14 +20,9 @@ package org.apache.paimon.table.source.splitread;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.operation.SplitRead;
-import org.apache.paimon.table.source.DataSplit;
 
-/** Provider to create {@link SplitRead}. */
-public interface SplitReadProvider {
+/** Config {@link SplitRead} with projection, filter and others. */
+public interface SplitReadConfig {
 
-    boolean match(DataSplit split, boolean forceKeepDelete);
-
-    boolean initialized();
-
-    SplitRead<InternalRow> getOrCreate();
+    void config(SplitRead<InternalRow> read);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
index 2aaefb322f..dfc32501d6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
@@ -21,13 +21,12 @@ package org.apache.paimon.table.source.splitread;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.LazyField;
 
 /** Provider to create {@link SplitRead}. */
 public interface SplitReadProvider {
 
     boolean match(DataSplit split, boolean forceKeepDelete);
 
-    boolean initialized();
-
-    SplitRead<InternalRow> getOrCreate();
+    LazyField<? extends SplitRead<InternalRow>> get();
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index bf4952bcd7..ad91a3c70a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -622,7 +622,7 @@ public class AppendOnlyWriterTest {
                         fileFormat,
                         targetFileSize,
                         AppendOnlyWriterTest.SCHEMA,
-                        AppendOnlyWriterTest.SCHEMA,
+                        null,
                         getMaxSequenceNumber(toCompact),
                         compactManager,
                         null,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 55595d13d3..ca0b65b2ee 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,7 +85,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         fileFormat,
                         10,
                         SCHEMA,
-                        SCHEMA,
+                        null,
                         0,
                         new BucketedAppendCompactManager(
                                 null, toCompact, null, 4, 10, false, null, 
null), // not used
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
similarity index 97%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index cbf0fa31df..8cbc0413ff 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -23,8 +23,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.operation.MergeFileSplitRead;
-import org.apache.paimon.reader.CompoundFileReader;
+import org.apache.paimon.reader.DataEvolutionFileReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -47,8 +46,8 @@ import java.util.stream.Collectors;
 
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
-/** Test for {@link MergeFileSplitRead}. */
-public class AppendMergeFieldTest extends TableTestBase {
+/** Test for table with data evolution. */
+public class DataEvolutionTableTest extends TableTestBase {
 
     @Test
     public void testBasic() throws Exception {
@@ -76,7 +75,7 @@ public class AppendMergeFieldTest extends TableTestBase {
         ReadBuilder readBuilder = getTableDefault().newReadBuilder();
         RecordReader<InternalRow> reader =
                 
readBuilder.newRead().createReader(readBuilder.newScan().plan());
-        assertThat(reader).isInstanceOf(CompoundFileReader.class);
+        assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
         reader.forEachRemaining(
                 r -> {
                     assertThat(r.getInt(0)).isEqualTo(1);
@@ -203,7 +202,7 @@ public class AppendMergeFieldTest extends TableTestBase {
         ReadBuilder readBuilder = getTableDefault().newReadBuilder();
         RecordReader<InternalRow> reader =
                 
readBuilder.newRead().createReader(readBuilder.newScan().plan());
-        assertThat(reader).isInstanceOf(CompoundFileReader.class);
+        assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
 
         reader.forEachRemaining(
                 r -> {
@@ -375,7 +374,7 @@ public class AppendMergeFieldTest extends TableTestBase {
         ReadBuilder readBuilder = getTableDefault().newReadBuilder();
         RecordReader<InternalRow> reader =
                 
readBuilder.newRead().createReader(readBuilder.newScan().plan());
-        assertThat(reader).isInstanceOf(CompoundFileReader.class);
+        assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
         AtomicInteger i = new AtomicInteger(0);
         reader.forEachRemaining(
                 r -> {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
new file mode 100644
index 0000000000..b53a33481a
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.splitread;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link DataEvolutionSplitReadProvider}. */
+public class DataEvolutionSplitReadProviderTest {
+
+    private Supplier<DataEvolutionSplitRead> mockSupplier;
+    private SplitReadConfig mockSplitReadConfig;
+    private DataEvolutionSplitRead mockSplitRead;
+    private DataEvolutionSplitReadProvider provider;
+
+    @SuppressWarnings("unchecked")
+    @BeforeEach
+    public void setUp() {
+        mockSupplier = (Supplier<DataEvolutionSplitRead>) mock(Supplier.class);
+        mockSplitReadConfig = mock(SplitReadConfig.class);
+        mockSplitRead = mock(DataEvolutionSplitRead.class);
+        when(mockSupplier.get()).thenReturn(mockSplitRead);
+
+        provider = new DataEvolutionSplitReadProvider(mockSupplier, 
mockSplitReadConfig);
+    }
+
+    @Test
+    public void testGetAndInitialization() {
+        // Supplier should not be called yet due to lazy initialization
+        verify(mockSupplier, times(0)).get();
+
+        // First access, should trigger initialization
+        DataEvolutionSplitRead read = provider.get().get();
+
+        // Verify supplier and config were called
+        verify(mockSupplier, times(1)).get();
+        verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
+        assertThat(read).isSameAs(mockSplitRead);
+
+        // Second access, should return cached instance without re-initializing
+        DataEvolutionSplitRead read2 = provider.get().get();
+        verify(mockSupplier, times(1)).get();
+        verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
+        assertThat(read2).isSameAs(mockSplitRead);
+    }
+
+    @Test
+    public void testMatchWithNoFiles() {
+        DataSplit split = mock(DataSplit.class);
+        when(split.dataFiles()).thenReturn(Collections.emptyList());
+        assertThat(provider.match(split, false)).isFalse();
+    }
+
+    @Test
+    public void testMatchWithOneFile() {
+        DataSplit split = mock(DataSplit.class);
+        DataFileMeta file1 = mock(DataFileMeta.class);
+        when(split.dataFiles()).thenReturn(Collections.singletonList(file1));
+        assertThat(provider.match(split, false)).isFalse();
+    }
+
+    @Test
+    public void testMatchWithNullFirstRowId() {
+        DataSplit split = mock(DataSplit.class);
+        DataFileMeta file1 = mock(DataFileMeta.class);
+        DataFileMeta file2 = mock(DataFileMeta.class);
+
+        when(file1.firstRowId()).thenReturn(1L);
+        when(file2.firstRowId()).thenReturn(null);
+        when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+
+        assertThat(provider.match(split, false)).isFalse();
+    }
+
+    @Test
+    public void testMatchWithDifferentFirstRowIds() {
+        DataSplit split = mock(DataSplit.class);
+        DataFileMeta file1 = mock(DataFileMeta.class);
+        DataFileMeta file2 = mock(DataFileMeta.class);
+
+        when(file1.firstRowId()).thenReturn(1L);
+        when(file2.firstRowId()).thenReturn(2L);
+        when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+
+        assertThat(provider.match(split, false)).isFalse();
+    }
+
+    @Test
+    public void testMatchSuccess() {
+        DataSplit split = mock(DataSplit.class);
+        DataFileMeta file1 = mock(DataFileMeta.class);
+        DataFileMeta file2 = mock(DataFileMeta.class);
+
+        when(file1.firstRowId()).thenReturn(100L);
+        when(file2.firstRowId()).thenReturn(100L);
+        when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+
+        // The forceKeepDelete parameter is not used in match, so test both 
values
+        assertThat(provider.match(split, true)).isTrue();
+        assertThat(provider.match(split, false)).isTrue();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
index 8648bf734e..8d8a668f79 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
@@ -70,7 +70,7 @@ public class AppendPreCommitCompactWorkerOperator extends 
AbstractStreamOperator
         this.write = (AppendFileStoreWrite) table.store().newWrite(null);
         if (coreOptions.rowTrackingEnabled()) {
             checkArgument(
-                    !coreOptions.dataElolutionEnabled(),
+                    !coreOptions.dataEvolutionEnabled(),
                     "Data evolution enabled table should not invoke compact 
yet.");
             
this.write.withWriteType(SpecialFields.rowTypeWithRowLineage(table.rowType()));
         }

Reply via email to