This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ff2537b2 [core] Introduce RawFileSplitRead to accelerate batch read 
for primary key table (#3209)
8ff2537b2 is described below

commit 8ff2537b2a0ac43365746193c59bdd6977e9394d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 15 16:48:16 2024 +0800

    [core] Introduce RawFileSplitRead to accelerate batch read for primary key 
table (#3209)
---
 .../java/org/apache/paimon/utils/LazyField.java    |  38 ++--
 .../org/apache/paimon/AppendOnlyFileStore.java     |   6 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   4 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |  17 +-
 .../deletionvectors/ApplyDeletionVectorReader.java |  13 --
 .../paimon/operation/AppendOnlyFileStoreRead.java  | 192 ------------------
 .../paimon/operation/AppendOnlyFileStoreWrite.java |   4 +-
 ...eFileStoreRead.java => MergeFileSplitRead.java} |  25 ++-
 .../apache/paimon/operation/RawFileSplitRead.java  | 216 +++++++++++++++++++++
 .../{FileStoreRead.java => SplitRead.java}         |   4 +-
 .../paimon/table/AppendOnlyFileStoreTable.java     |  12 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  23 +--
 .../paimon/table/source/AbstractDataTableRead.java |  10 +-
 .../paimon/table/source/KeyValueTableRead.java     | 123 +++++++++---
 .../org/apache/paimon/table/source/TableRead.java  |   4 +-
 .../test/java/org/apache/paimon/TestFileStore.java |   4 +-
 ...reReadTest.java => MergeFileSplitReadTest.java} |   6 +-
 .../paimon/flink/action/MergeIntoActionITCase.java |   6 +-
 .../flink/source/TestChangelogDataReadWrite.java   |  46 ++---
 .../paimon/flink/util/ReadWriteTableTestUtil.java  |  22 ++-
 20 files changed, 431 insertions(+), 344 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java
similarity index 58%
copy from 
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java
index 2d3e121b1..2bb701362 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java
@@ -16,23 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation;
+package org.apache.paimon.utils;
 
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import java.util.function.Supplier;
 
-import java.io.IOException;
+/** A class to lazy initialized field. */
+public class LazyField<T> {
 
-/**
- * Read operation which provides {@link RecordReader} creation.
- *
- * @param <T> type of record to read.
- */
-public interface FileStoreRead<T> {
+    private final Supplier<T> supplier;
+
+    private boolean initialized;
+    private T value;
+
+    public LazyField(Supplier<T> supplier) {
+        this.supplier = supplier;
+    }
 
-    FileStoreRead<T> withFilter(Predicate predicate);
+    public T get() {
+        if (!initialized) {
+            T t = supplier.get();
+            value = t;
+            initialized = true;
+            return t;
+        }
+        return value;
+    }
 
-    /** Create a {@link RecordReader} from split. */
-    RecordReader<T> createReader(DataSplit split) throws IOException;
+    public boolean initialized() {
+        return initialized;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 0d546e215..186fd4799 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,9 +22,9 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.operation.AppendOnlyFileStoreRead;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.operation.ScanBucketFilter;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
@@ -79,8 +79,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     }
 
     @Override
-    public AppendOnlyFileStoreRead newRead() {
-        return new AppendOnlyFileStoreRead(
+    public RawFileSplitRead newRead() {
+        return new RawFileSplitRead(
                 fileIO,
                 schemaManager,
                 schema,
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index cd38d2061..6731121c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -23,11 +23,11 @@ import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
@@ -73,7 +73,7 @@ public interface FileStore<T> extends Serializable {
 
     StatsFileHandler newStatsFileHandler();
 
-    FileStoreRead<T> newRead();
+    SplitRead<T> newRead();
 
     FileStoreWrite<T> newWrite(String commitUser);
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 956b615d7..354cc6dda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -28,9 +28,10 @@ import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import org.apache.paimon.operation.KeyValueFileStoreRead;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
 import org.apache.paimon.operation.KeyValueFileStoreWrite;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.operation.ScanBucketFilter;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -119,8 +120,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     }
 
     @Override
-    public KeyValueFileStoreRead newRead() {
-        return new KeyValueFileStoreRead(
+    public MergeFileSplitRead newRead() {
+        return new MergeFileSplitRead(
                 options,
                 schema,
                 keyType,
@@ -130,6 +131,16 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 newReaderFactoryBuilder());
     }
 
+    public RawFileSplitRead newBatchRawFileRead() {
+        return new RawFileSplitRead(
+                fileIO,
+                schemaManager,
+                schema,
+                valueType,
+                FileFormatDiscover.of(options),
+                pathFactory());
+    }
+
     public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
         return KeyValueFileReaderFactory.builder(
                 fileIO,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index dadde99ea..6cc8b396f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.reader.RecordReader;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Optional;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -40,18 +39,6 @@ public class ApplyDeletionVectorReader<T> implements 
RecordReader<T> {
         this.deletionVector = deletionVector;
     }
 
-    public static <T> RecordReader<T> create(RecordReader<T> reader, 
Optional<DeletionVector> dv) {
-        return create(reader, dv.orElse(null));
-    }
-
-    public static <T> RecordReader<T> create(RecordReader<T> reader, @Nullable 
DeletionVector dv) {
-        if (dv == null) {
-            return reader;
-        }
-
-        return new ApplyDeletionVectorReader<>(reader, dv);
-    }
-
     @Nullable
     @Override
     public RecordIterator<T> readBatch() throws IOException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
deleted file mode 100644
index 49eea905c..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.AppendOnlyFileStore;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FileFormatDiscover;
-import org.apache.paimon.format.FormatKey;
-import org.apache.paimon.format.FormatReaderContext;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.FileRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.partition.PartitionUtils;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.IndexCastMapping;
-import org.apache.paimon.schema.SchemaEvolutionUtil;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BulkFormatMapping;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Projection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-
-/** {@link FileStoreRead} for {@link AppendOnlyFileStore}. */
-public class AppendOnlyFileStoreRead implements FileStoreRead<InternalRow> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyFileStoreRead.class);
-
-    private final FileIO fileIO;
-    private final SchemaManager schemaManager;
-    private final TableSchema schema;
-    private final FileFormatDiscover formatDiscover;
-    private final FileStorePathFactory pathFactory;
-    private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
-
-    private int[][] projection;
-
-    @Nullable private List<Predicate> filters;
-
-    public AppendOnlyFileStoreRead(
-            FileIO fileIO,
-            SchemaManager schemaManager,
-            TableSchema schema,
-            RowType rowType,
-            FileFormatDiscover formatDiscover,
-            FileStorePathFactory pathFactory) {
-        this.fileIO = fileIO;
-        this.schemaManager = schemaManager;
-        this.schema = schema;
-        this.formatDiscover = formatDiscover;
-        this.pathFactory = pathFactory;
-        this.bulkFormatMappings = new HashMap<>();
-
-        this.projection = Projection.range(0, 
rowType.getFieldCount()).toNestedIndexes();
-    }
-
-    public FileStoreRead<InternalRow> withProjection(int[][] projectedFields) {
-        projection = projectedFields;
-        return this;
-    }
-
-    @Override
-    public FileStoreRead<InternalRow> withFilter(Predicate predicate) {
-        this.filters = splitAnd(predicate);
-        return this;
-    }
-
-    @Override
-    public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
-        DataFilePathFactory dataFilePathFactory =
-                pathFactory.createDataFilePathFactory(split.partition(), 
split.bucket());
-        List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new 
ArrayList<>();
-        if (split.beforeFiles().size() > 0) {
-            LOG.info("Ignore split before files: " + split.beforeFiles());
-        }
-        for (DataFileMeta file : split.dataFiles()) {
-            String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
-            BulkFormatMapping bulkFormatMapping =
-                    bulkFormatMappings.computeIfAbsent(
-                            new FormatKey(file.schemaId(), formatIdentifier),
-                            key -> {
-                                TableSchema tableSchema = schema;
-                                TableSchema dataSchema =
-                                        key.schemaId == schema.id()
-                                                ? schema
-                                                : 
schemaManager.schema(key.schemaId);
-
-                                // projection to data schema
-                                int[][] dataProjection =
-                                        
SchemaEvolutionUtil.createDataProjection(
-                                                tableSchema.fields(),
-                                                dataSchema.fields(),
-                                                projection);
-
-                                IndexCastMapping indexCastMapping =
-                                        
SchemaEvolutionUtil.createIndexCastMapping(
-                                                
Projection.of(projection).toTopLevelIndexes(),
-                                                tableSchema.fields(),
-                                                
Projection.of(dataProjection).toTopLevelIndexes(),
-                                                dataSchema.fields());
-
-                                List<Predicate> dataFilters =
-                                        this.schema.id() == key.schemaId
-                                                ? filters
-                                                : 
SchemaEvolutionUtil.createDataFilters(
-                                                        tableSchema.fields(),
-                                                        dataSchema.fields(),
-                                                        filters);
-
-                                Pair<int[], RowType> partitionPair = null;
-                                if (!dataSchema.partitionKeys().isEmpty()) {
-                                    Pair<int[], int[][]> partitionMapping =
-                                            
PartitionUtils.constructPartitionMapping(
-                                                    dataSchema, 
dataProjection);
-                                    // if partition fields are not selected, 
we just do nothing
-                                    if (partitionMapping != null) {
-                                        dataProjection = 
partitionMapping.getRight();
-                                        partitionPair =
-                                                Pair.of(
-                                                        
partitionMapping.getLeft(),
-                                                        
dataSchema.projectedLogicalRowType(
-                                                                
dataSchema.partitionKeys()));
-                                    }
-                                }
-
-                                RowType projectedRowType =
-                                        Projection.of(dataProjection)
-                                                
.project(dataSchema.logicalRowType());
-
-                                return new BulkFormatMapping(
-                                        indexCastMapping.getIndexMapping(),
-                                        indexCastMapping.getCastMapping(),
-                                        partitionPair,
-                                        formatDiscover
-                                                .discover(formatIdentifier)
-                                                .createReaderFactory(
-                                                        projectedRowType, 
dataFilters));
-                            });
-
-            final BinaryRow partition = split.partition();
-            suppliers.add(
-                    () ->
-                            new FileRecordReader(
-                                    bulkFormatMapping.getReaderFactory(),
-                                    new FormatReaderContext(
-                                            fileIO,
-                                            
dataFilePathFactory.toPath(file.fileName()),
-                                            file.fileSize()),
-                                    bulkFormatMapping.getIndexMapping(),
-                                    bulkFormatMapping.getCastMapping(),
-                                    PartitionUtils.create(
-                                            
bulkFormatMapping.getPartitionPair(), partition)));
-        }
-
-        return ConcatRecordReader.create(suppliers);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index a0d863371..b3361b0df 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -59,7 +59,7 @@ import static 
org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
 public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow> {
 
     private final FileIO fileIO;
-    private final AppendOnlyFileStoreRead read;
+    private final RawFileSplitRead read;
     private final long schemaId;
     private final RowType rowType;
     private final FileFormat fileFormat;
@@ -81,7 +81,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
 
     public AppendOnlyFileStoreWrite(
             FileIO fileIO,
-            AppendOnlyFileStoreRead read,
+            RawFileSplitRead read,
             long schemaId,
             String commitUser,
             RowType rowType,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
similarity index 94%
rename from 
paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
rename to 
paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index b0ab4338f..d457c9093 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -64,8 +64,13 @@ import static 
org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
 import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
 
-/** {@link FileStoreRead} implementation for {@link KeyValueFileStore}. */
-public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
+/**
+ * An implementation for {@link KeyValueFileStore}, this class handle LSM 
merging and changelog row
+ * kind things, it will force reading fields such as sequence and row_kind.
+ *
+ * @see RawFileSplitRead If in batch mode and reading raw files, it is 
recommended to use this read.
+ */
+public class MergeFileSplitRead implements SplitRead<KeyValue> {
 
     private final TableSchema tableSchema;
     private final FileIO fileIO;
@@ -86,7 +91,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
 
     private boolean forceKeepDelete = false;
 
-    public KeyValueFileStoreRead(
+    public MergeFileSplitRead(
             CoreOptions options,
             TableSchema schema,
             RowType keyType,
@@ -105,13 +110,13 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         this.sequenceFields = options.sequenceField();
     }
 
-    public KeyValueFileStoreRead withKeyProjection(@Nullable int[][] 
projectedFields) {
+    public MergeFileSplitRead withKeyProjection(@Nullable int[][] 
projectedFields) {
         readerFactoryBuilder.withKeyProjection(projectedFields);
         this.keyProjectedFields = projectedFields;
         return this;
     }
 
-    public KeyValueFileStoreRead withValueProjection(@Nullable int[][] 
projectedFields) {
+    public MergeFileSplitRead withValueProjection(@Nullable int[][] 
projectedFields) {
         if (projectedFields == null) {
             return this;
         }
@@ -155,18 +160,22 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         return this;
     }
 
-    public KeyValueFileStoreRead withIOManager(IOManager ioManager) {
+    public MergeFileSplitRead withIOManager(IOManager ioManager) {
         this.mergeSorter.setIOManager(ioManager);
         return this;
     }
 
-    public KeyValueFileStoreRead forceKeepDelete() {
+    public MergeFileSplitRead forceKeepDelete() {
         this.forceKeepDelete = true;
         return this;
     }
 
     @Override
-    public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
+    public MergeFileSplitRead withFilter(Predicate predicate) {
+        if (predicate == null) {
+            return this;
+        }
+
         List<Predicate> allFilters = new ArrayList<>();
         List<Predicate> pkFilters = null;
         List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
new file mode 100644
index 000000000..c801dcaa8
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.format.FileFormatDiscover;
+import org.apache.paimon.format.FormatKey;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.partition.PartitionUtils;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.IndexCastMapping;
+import org.apache.paimon.schema.SchemaEvolutionUtil;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BulkFormatMapping;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+
+/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
+public class RawFileSplitRead implements SplitRead<InternalRow> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RawFileSplitRead.class);
+
+    private final FileIO fileIO;
+    private final SchemaManager schemaManager;
+    private final TableSchema schema;
+    private final FileFormatDiscover formatDiscover;
+    private final FileStorePathFactory pathFactory;
+    private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
+
+    private int[][] projection;
+
+    @Nullable private List<Predicate> filters;
+
+    public RawFileSplitRead(
+            FileIO fileIO,
+            SchemaManager schemaManager,
+            TableSchema schema,
+            RowType rowType,
+            FileFormatDiscover formatDiscover,
+            FileStorePathFactory pathFactory) {
+        this.fileIO = fileIO;
+        this.schemaManager = schemaManager;
+        this.schema = schema;
+        this.formatDiscover = formatDiscover;
+        this.pathFactory = pathFactory;
+        this.bulkFormatMappings = new HashMap<>();
+
+        this.projection = Projection.range(0, 
rowType.getFieldCount()).toNestedIndexes();
+    }
+
+    public RawFileSplitRead withProjection(int[][] projectedFields) {
+        if (projectedFields != null) {
+            projection = projectedFields;
+        }
+        return this;
+    }
+
+    @Override
+    public RawFileSplitRead withFilter(Predicate predicate) {
+        if (predicate != null) {
+            this.filters = splitAnd(predicate);
+        }
+        return this;
+    }
+
+    @Override
+    public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
+        DataFilePathFactory dataFilePathFactory =
+                pathFactory.createDataFilePathFactory(split.partition(), 
split.bucket());
+        List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new 
ArrayList<>();
+        if (split.beforeFiles().size() > 0) {
+            LOG.info("Ignore split before files: " + split.beforeFiles());
+        }
+
+        DeletionVector.Factory dvFactory =
+                DeletionVector.factory(
+                        fileIO, split.dataFiles(), 
split.deletionFiles().orElse(null));
+
+        for (DataFileMeta file : split.dataFiles()) {
+            String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
+            BulkFormatMapping bulkFormatMapping =
+                    bulkFormatMappings.computeIfAbsent(
+                            new FormatKey(file.schemaId(), formatIdentifier),
+                            this::createBulkFormatMapping);
+
+            BinaryRow partition = split.partition();
+            suppliers.add(
+                    () ->
+                            createFileReader(
+                                    partition,
+                                    file,
+                                    dataFilePathFactory,
+                                    bulkFormatMapping,
+                                    dvFactory));
+        }
+
+        return ConcatRecordReader.create(suppliers);
+    }
+
+    private BulkFormatMapping createBulkFormatMapping(FormatKey key) {
+        TableSchema tableSchema = schema;
+        TableSchema dataSchema =
+                key.schemaId == schema.id() ? schema : 
schemaManager.schema(key.schemaId);
+
+        // projection to data schema
+        int[][] dataProjection =
+                SchemaEvolutionUtil.createDataProjection(
+                        tableSchema.fields(), dataSchema.fields(), projection);
+
+        IndexCastMapping indexCastMapping =
+                SchemaEvolutionUtil.createIndexCastMapping(
+                        Projection.of(projection).toTopLevelIndexes(),
+                        tableSchema.fields(),
+                        Projection.of(dataProjection).toTopLevelIndexes(),
+                        dataSchema.fields());
+
+        List<Predicate> dataFilters =
+                this.schema.id() == key.schemaId
+                        ? filters
+                        : SchemaEvolutionUtil.createDataFilters(
+                                tableSchema.fields(), dataSchema.fields(), 
filters);
+
+        Pair<int[], RowType> partitionPair = null;
+        if (!dataSchema.partitionKeys().isEmpty()) {
+            Pair<int[], int[][]> partitionMapping =
+                    PartitionUtils.constructPartitionMapping(dataSchema, 
dataProjection);
+            // if partition fields are not selected, we just do nothing
+            if (partitionMapping != null) {
+                dataProjection = partitionMapping.getRight();
+                partitionPair =
+                        Pair.of(
+                                partitionMapping.getLeft(),
+                                
dataSchema.projectedLogicalRowType(dataSchema.partitionKeys()));
+            }
+        }
+
+        RowType projectedRowType =
+                
Projection.of(dataProjection).project(dataSchema.logicalRowType());
+
+        return new BulkFormatMapping(
+                indexCastMapping.getIndexMapping(),
+                indexCastMapping.getCastMapping(),
+                partitionPair,
+                formatDiscover
+                        .discover(key.format)
+                        .createReaderFactory(projectedRowType, dataFilters));
+    }
+
+    private RecordReader<InternalRow> createFileReader(
+            BinaryRow partition,
+            DataFileMeta file,
+            DataFilePathFactory dataFilePathFactory,
+            BulkFormatMapping bulkFormatMapping,
+            DeletionVector.Factory dvFactory)
+            throws IOException {
+        FileRecordReader fileRecordReader =
+                new FileRecordReader(
+                        bulkFormatMapping.getReaderFactory(),
+                        new FormatReaderContext(
+                                fileIO,
+                                dataFilePathFactory.toPath(file.fileName()),
+                                file.fileSize()),
+                        bulkFormatMapping.getIndexMapping(),
+                        bulkFormatMapping.getCastMapping(),
+                        
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+
+        Optional<DeletionVector> deletionVector = 
dvFactory.create(file.fileName());
+        if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
+            return new ApplyDeletionVectorReader<>(fileRecordReader, 
deletionVector.get());
+        }
+        return fileRecordReader;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
similarity index 93%
rename from 
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
rename to paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index 2d3e121b1..a5b05e8f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -29,9 +29,9 @@ import java.io.IOException;
  *
  * @param <T> type of record to read.
  */
-public interface FileStoreRead<T> {
+public interface SplitRead<T> {
 
-    FileStoreRead<T> withFilter(Predicate predicate);
+    SplitRead<T> withFilter(Predicate predicate);
 
     /** Create a {@link RecordReader} from split. */
     RecordReader<T> createReader(DataSplit split) throws IOException;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 2eb41fdd5..810669f2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -24,11 +24,11 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.operation.AppendOnlyFileStoreRead;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
+import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
@@ -112,8 +112,14 @@ class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public InnerTableRead newRead() {
-        AppendOnlyFileStoreRead read = store().newRead();
-        return new AbstractDataTableRead<InternalRow>(read, schema()) {
+        RawFileSplitRead read = store().newRead();
+        return new AbstractDataTableRead<InternalRow>(schema()) {
+
+            @Override
+            protected InnerTableRead innerWithFilter(Predicate predicate) {
+                read.withFilter(predicate);
+                return this;
+            }
 
             @Override
             public void projection(int[][] projection) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index f2ab323d2..bf26ec31c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -32,7 +32,6 @@ import org.apache.paimon.operation.KeyValueFileStoreScan;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.query.LocalTableQuery;
@@ -42,7 +41,6 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.MergeTreeSplitGenerator;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
@@ -157,25 +155,8 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public InnerTableRead newRead() {
-        return new KeyValueTableRead(store().newRead(), schema()) {
-
-            @Override
-            public void projection(int[][] projection) {
-                read.withValueProjection(projection);
-            }
-
-            @Override
-            protected RecordReader.RecordIterator<InternalRow> 
rowDataRecordIteratorFromKv(
-                    RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
-                return new ValueContentRowDataRecordIterator(kvRecordIterator);
-            }
-
-            @Override
-            public InnerTableRead forceKeepDelete() {
-                read.forceKeepDelete();
-                return this;
-            }
-        };
+        return new KeyValueTableRead(
+                () -> store().newRead(), () -> store().newBatchRawFileRead(), 
schema());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index 930cddcd5..bdb548d61 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.DefaultValueAssigner;
-import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateProjectionConverter;
 import org.apache.paimon.reader.RecordReader;
@@ -34,15 +33,13 @@ import java.util.Optional;
 /** A {@link InnerTableRead} for data table. */
 public abstract class AbstractDataTableRead<T> implements InnerTableRead {
 
-    private final FileStoreRead<T> fileStoreRead;
     private final DefaultValueAssigner defaultValueAssigner;
 
     private int[][] projection;
     private boolean executeFilter = false;
     private Predicate predicate;
 
-    public AbstractDataTableRead(FileStoreRead<T> fileStoreRead, TableSchema 
schema) {
-        this.fileStoreRead = fileStoreRead;
+    public AbstractDataTableRead(TableSchema schema) {
         this.defaultValueAssigner = schema == null ? null : 
DefaultValueAssigner.create(schema);
     }
 
@@ -61,10 +58,11 @@ public abstract class AbstractDataTableRead<T> implements 
InnerTableRead {
         if (defaultValueAssigner != null) {
             predicate = defaultValueAssigner.handlePredicate(predicate);
         }
-        fileStoreRead.withFilter(predicate);
-        return this;
+        return innerWithFilter(predicate);
     }
 
+    protected abstract InnerTableRead innerWithFilter(Predicate predicate);
+
     @Override
     public TableRead executeFilter() {
         this.executeFilter = true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index bb0354eee..f3f66c4aa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -21,61 +21,120 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.operation.KeyValueFileStoreRead;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.RawFileSplitRead;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.LazyField;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.function.Supplier;
 
 /**
- * An abstraction layer above {@link KeyValueFileStoreRead} to provide reading 
of {@link
- * InternalRow}.
+ * An abstraction layer above {@link MergeFileSplitRead} to provide reading of 
{@link InternalRow}.
  */
-public abstract class KeyValueTableRead extends 
AbstractDataTableRead<KeyValue> {
+public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> {
 
-    protected final KeyValueFileStoreRead read;
+    private final LazyField<MergeFileSplitRead> mergeRead;
+    private final LazyField<RawFileSplitRead> batchRawRead;
 
-    protected KeyValueTableRead(KeyValueFileStoreRead read, TableSchema 
schema) {
-        super(read, schema);
-        // We don't need any key fields, the columns that need to be read are 
already included in
-        // the value
-        this.read = read.withKeyProjection(new int[0][]);
-    }
+    private int[][] projection = null;
+    private boolean forceKeepDelete = false;
+    private Predicate predicate = null;
+    private IOManager ioManager = null;
 
-    @Override
-    public TableRead withIOManager(IOManager ioManager) {
-        read.withIOManager(ioManager);
-        return this;
+    public KeyValueTableRead(
+            Supplier<MergeFileSplitRead> mergeReadSupplier,
+            Supplier<RawFileSplitRead> batchRawReadSupplier,
+            TableSchema schema) {
+        super(schema);
+        this.mergeRead = new LazyField<>(() -> 
createMergeRead(mergeReadSupplier));
+        this.batchRawRead = new LazyField<>(() -> 
createBatchRawRead(batchRawReadSupplier));
     }
 
-    @Override
-    public final RecordReader<InternalRow> reader(Split split) throws 
IOException {
-        return new RowDataRecordReader(read.createReader((DataSplit) split));
+    private MergeFileSplitRead createMergeRead(Supplier<MergeFileSplitRead> 
readSupplier) {
+        MergeFileSplitRead read =
+                readSupplier
+                        .get()
+                        .withKeyProjection(new int[0][])
+                        .withValueProjection(projection)
+                        .withFilter(predicate)
+                        .withIOManager(ioManager);
+        if (forceKeepDelete) {
+            read = read.forceKeepDelete();
+        }
+        return read;
     }
 
-    protected abstract RecordReader.RecordIterator<InternalRow> 
rowDataRecordIteratorFromKv(
-            RecordReader.RecordIterator<KeyValue> kvRecordIterator);
+    private RawFileSplitRead createBatchRawRead(Supplier<RawFileSplitRead> 
readSupplier) {
+        return 
readSupplier.get().withProjection(projection).withFilter(predicate);
+    }
 
-    private class RowDataRecordReader implements RecordReader<InternalRow> {
+    @Override
+    public void projection(int[][] projection) {
+        if (mergeRead.initialized()) {
+            mergeRead.get().withValueProjection(projection);
+        }
+        if (batchRawRead.initialized()) {
+            batchRawRead.get().withProjection(projection);
+        }
+        this.projection = projection;
+    }
 
-        private final RecordReader<KeyValue> wrapped;
+    @Override
+    public InnerTableRead forceKeepDelete() {
+        if (mergeRead.initialized()) {
+            mergeRead.get().forceKeepDelete();
+        }
+        this.forceKeepDelete = true;
+        return this;
+    }
 
-        private RowDataRecordReader(RecordReader<KeyValue> wrapped) {
-            this.wrapped = wrapped;
+    @Override
+    protected InnerTableRead innerWithFilter(Predicate predicate) {
+        if (mergeRead.initialized()) {
+            mergeRead.get().withFilter(predicate);
         }
+        if (batchRawRead.initialized()) {
+            batchRawRead.get().withFilter(predicate);
+        }
+        this.predicate = predicate;
+        return this;
+    }
 
-        @Nullable
-        @Override
-        public RecordIterator<InternalRow> readBatch() throws IOException {
-            RecordIterator<KeyValue> batch = wrapped.readBatch();
-            return batch == null ? null : rowDataRecordIteratorFromKv(batch);
+    @Override
+    public TableRead withIOManager(IOManager ioManager) {
+        if (mergeRead.initialized()) {
+            mergeRead.get().withIOManager(ioManager);
         }
+        this.ioManager = ioManager;
+        return this;
+    }
 
-        @Override
-        public void close() throws IOException {
-            wrapped.close();
+    @Override
+    public RecordReader<InternalRow> reader(Split split) throws IOException {
+        DataSplit dataSplit = (DataSplit) split;
+        if (!forceKeepDelete && !dataSplit.isStreaming() && 
split.convertToRawFiles().isPresent()) {
+            return batchRawRead.get().createReader(dataSplit);
         }
+
+        RecordReader<KeyValue> reader = 
mergeRead.get().createReader(dataSplit);
+        return new RecordReader<InternalRow>() {
+
+            @Nullable
+            @Override
+            public RecordIterator<InternalRow> readBatch() throws IOException {
+                RecordIterator<KeyValue> batch = reader.readBatch();
+                return batch == null ? null : new 
ValueContentRowDataRecordIterator(batch);
+            }
+
+            @Override
+            public void close() throws IOException {
+                reader.close();
+            }
+        };
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 72b54ae6f..1b2c6299b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -22,7 +22,7 @@ import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.operation.FileStoreRead;
+import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.reader.RecordReader;
 
 import java.io.IOException;
@@ -30,7 +30,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * An abstraction layer above {@link FileStoreRead} to provide reading of 
{@link InternalRow}.
+ * An abstraction layer above {@link SplitRead} to provide reading of {@link 
InternalRow}.
  *
  * @since 0.4.0
  */
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index b20f89b94..6adc3aff0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -36,9 +36,9 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
+import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
@@ -426,7 +426,7 @@ public class TestFileStore extends KeyValueFileStore {
         }
 
         List<KeyValue> kvs = new ArrayList<>();
-        FileStoreRead<KeyValue> read = newRead();
+        SplitRead<KeyValue> read = newRead();
         for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> 
entryWithPartition :
                 filesPerPartitionAndBucket.entrySet()) {
             for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
similarity index 99%
rename from 
paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index c46482556..806c869f9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -65,8 +65,8 @@ import java.util.stream.Stream;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link KeyValueFileStoreRead}. */
-public class KeyValueFileStoreReadTest {
+/** Tests for {@link MergeFileSplitRead}. */
+public class MergeFileSplitReadTest {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -226,7 +226,7 @@ public class KeyValueFileStoreReadTest {
         Map<BinaryRow, List<ManifestEntry>> filesGroupedByPartition =
                 scan.withSnapshot(snapshotId).plan().files().stream()
                         
.collect(Collectors.groupingBy(ManifestEntry::partition));
-        KeyValueFileStoreRead read = store.newRead();
+        MergeFileSplitRead read = store.newRead();
         if (keyProjection != null) {
             read.withKeyProjection(keyProjection);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 523033579..99af52ce6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -124,9 +124,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
                 expected,
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
-                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
-                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
+                        changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+I", 7, "Seven", "matched_upsert", 
"02-28"),
                         changelogRow("+I", 8, "v_8", "insert", "02-29"),
                         changelogRow("+I", 11, "v_11", "insert", "02-29"),
                         changelogRow("+I", 12, "v_12", "insert", "02-29")));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index d51aefb96..7fd14b1ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
@@ -32,16 +33,15 @@ import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.paimon.operation.KeyValueFileStoreRead;
 import org.apache.paimon.operation.KeyValueFileStoreWrite;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
@@ -60,7 +60,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Function;
 
 import static java.util.Collections.singletonList;
 
@@ -110,20 +109,13 @@ public class TestChangelogDataReadWrite {
     }
 
     public TableRead createReadWithKey() {
-        return createRead(ValueContentRowDataRecordIterator::new);
-    }
-
-    private TableRead createRead(
-            Function<
-                            RecordReader.RecordIterator<KeyValue>,
-                            RecordReader.RecordIterator<InternalRow>>
-                    rowDataIteratorCreator) {
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
         CoreOptions options = new CoreOptions(new HashMap<>());
-        KeyValueFileStoreRead read =
-                new KeyValueFileStoreRead(
+        TableSchema schema = schemaManager.schema(0);
+        MergeFileSplitRead read =
+                new MergeFileSplitRead(
                         options,
-                        schemaManager.schema(0),
+                        schema,
                         KEY_TYPE,
                         VALUE_TYPE,
                         COMPARATOR,
@@ -131,26 +123,22 @@ public class TestChangelogDataReadWrite {
                         KeyValueFileReaderFactory.builder(
                                 LocalFileIO.create(),
                                 schemaManager,
-                                schemaManager.schema(0),
+                                schema,
                                 KEY_TYPE,
                                 VALUE_TYPE,
                                 ignore -> avro,
                                 pathFactory,
                                 EXTRACTOR,
                                 options));
-        return new KeyValueTableRead(read, null) {
-
-            @Override
-            public void projection(int[][] projection) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            protected RecordReader.RecordIterator<InternalRow> 
rowDataRecordIteratorFromKv(
-                    RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
-                return rowDataIteratorCreator.apply(kvRecordIterator);
-            }
-        };
+        RawFileSplitRead rawFileRead =
+                new RawFileSplitRead(
+                        LocalFileIO.create(),
+                        schemaManager,
+                        schema,
+                        VALUE_TYPE,
+                        FileFormatDiscover.of(options),
+                        pathFactory);
+        return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
     }
 
     public List<DataFileMeta> writeFiles(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 2303ba1bc..fc7e05d75 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.Nullable;
@@ -261,15 +262,28 @@ public class ReadWriteTableTestUtil {
         CloseableIterator<Row> resultItr = bEnv.executeSql(query).collect();
         try (BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(resultItr)) {
             if (!expected.isEmpty()) {
-                assertThat(
-                                iterator.collect(
-                                        expected.size(), TIME_OUT.getSize(), 
TIME_OUT.getUnit()))
-                        .containsExactlyInAnyOrderElementsOf(expected);
+                List<Row> result =
+                        iterator.collect(expected.size(), TIME_OUT.getSize(), 
TIME_OUT.getUnit());
+                assertThat(toInsertOnlyRows(result))
+                        
.containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected));
             }
             assertThat(resultItr.hasNext()).isFalse();
         }
     }
 
+    private static List<Row> toInsertOnlyRows(List<Row> rows) {
+        List<Row> result = new ArrayList<>();
+        for (Row row : rows) {
+            assertThat(row.getKind()).isIn(RowKind.INSERT, 
RowKind.UPDATE_AFTER);
+            Row newRow = new Row(row.getArity());
+            for (int i = 0; i < row.getArity(); i++) {
+                newRow.setField(i, row.getField(i));
+            }
+            result.add(newRow);
+        }
+        return result;
+    }
+
     public static BlockingIterator<Row, Row> testStreamingRead(String query, 
List<Row> expected)
             throws Exception {
         BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());

Reply via email to