This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit de32dae41a165984d6fe121809e991ab75881ae5
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 16 17:03:12 2024 +0800

    [core] Incremental-between tags should deduplicate records (#3338)
---
 .../org/apache/paimon/mergetree/MergeSorter.java   |   4 +
 .../paimon/operation/MergeFileSplitRead.java       | 109 ++++++++-------------
 .../apache/paimon/operation/RawFileSplitRead.java  |  12 +++
 .../org/apache/paimon/operation/SplitRead.java     |  46 ++++++++-
 .../paimon/table/source/KeyValueTableRead.java     |  79 +++++++--------
 .../IncrementalChangelogReadProvider.java          |  96 ++++++++++++++++++
 .../splitread/IncrementalDiffReadProvider.java     |  65 ++++++++++++
 .../splitread/IncrementalDiffSplitRead.java}       |  95 ++++++++++++++++--
 .../splitread/MergeFileSplitReadProvider.java      |  68 +++++++++++++
 .../source/splitread/RawFileSplitReadProvider.java |  60 ++++++++++++
 .../source/splitread/SplitReadProvider.java}       |  21 ++--
 .../paimon/operation/MergeFileSplitReadTest.java   |   2 +-
 .../apache/paimon/table/IncrementalTableTest.java  |  18 +++-
 13 files changed, 541 insertions(+), 134 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index 0f54b40b6..420613899 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -95,6 +95,10 @@ public class MergeSorter {
         return memoryPool;
     }
 
+    public RowType valueType() {
+        return valueType;
+    }
+
     public void setIOManager(IOManager ioManager) {
         this.ioManager = ioManager;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index d457c9093..8002b62f0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -116,7 +116,16 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
         return this;
     }
 
-    public MergeFileSplitRead withValueProjection(@Nullable int[][] 
projectedFields) {
+    public Comparator<InternalRow> keyComparator() {
+        return keyComparator;
+    }
+
+    public MergeSorter mergeSorter() {
+        return mergeSorter;
+    }
+
+    @Override
+    public MergeFileSplitRead withProjection(@Nullable int[][] 
projectedFields) {
         if (projectedFields == null) {
             return this;
         }
@@ -160,11 +169,13 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
         return this;
     }
 
+    @Override
     public MergeFileSplitRead withIOManager(IOManager ioManager) {
         this.mergeSorter.setIOManager(ioManager);
         return this;
     }
 
+    @Override
     public MergeFileSplitRead forceKeepDelete() {
         this.forceKeepDelete = true;
         return this;
@@ -211,75 +222,28 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
 
     @Override
     public RecordReader<KeyValue> createReader(DataSplit split) throws 
IOException {
-        RecordReader<KeyValue> reader = 
createReaderWithoutOuterProjection(split);
-        if (outerProjection != null) {
-            ProjectedRow projectedRow = ProjectedRow.from(outerProjection);
-            reader = reader.transform(kv -> 
kv.replaceValue(projectedRow.replaceRow(kv.value())));
+        if (!split.beforeFiles().isEmpty()) {
+            throw new IllegalArgumentException("This read cannot accept split 
with before files.");
         }
-        return reader;
-    }
 
-    private RecordReader<KeyValue> 
createReaderWithoutOuterProjection(DataSplit split)
-            throws IOException {
-        if (split.beforeFiles().isEmpty()) {
-            if (split.isStreaming() || split.convertToRawFiles().isPresent()) {
-                return noMergeRead(
-                        split.partition(),
-                        split.bucket(),
-                        split.dataFiles(),
-                        split.deletionFiles().orElse(null),
-                        split.isStreaming());
-            } else {
-                return projectKey(
-                        mergeRead(
-                                split.partition(),
-                                split.bucket(),
-                                split.dataFiles(),
-                                null,
-                                forceKeepDelete));
-            }
-        } else if (split.isStreaming()) {
-            // streaming concat read
-            return ConcatRecordReader.create(
-                    () ->
-                            new ReverseReader(
-                                    noMergeRead(
-                                            split.partition(),
-                                            split.bucket(),
-                                            split.beforeFiles(),
-                                            
split.beforeDeletionFiles().orElse(null),
-                                            true)),
-                    () ->
-                            noMergeRead(
-                                    split.partition(),
-                                    split.bucket(),
-                                    split.dataFiles(),
-                                    split.deletionFiles().orElse(null),
-                                    true));
+        if (split.isStreaming()) {
+            return createNoMergeReader(
+                    split.partition(),
+                    split.bucket(),
+                    split.dataFiles(),
+                    split.deletionFiles().orElse(null),
+                    split.isStreaming());
         } else {
-            // batch diff read
-            return projectKey(
-                    DiffReader.readDiff(
-                            mergeRead(
-                                    split.partition(),
-                                    split.bucket(),
-                                    split.beforeFiles(),
-                                    split.beforeDeletionFiles().orElse(null),
-                                    false),
-                            mergeRead(
-                                    split.partition(),
-                                    split.bucket(),
-                                    split.dataFiles(),
-                                    split.deletionFiles().orElse(null),
-                                    false),
-                            keyComparator,
-                            createUdsComparator(),
-                            mergeSorter,
-                            forceKeepDelete));
+            return createMergeReader(
+                    split.partition(),
+                    split.bucket(),
+                    split.dataFiles(),
+                    split.deletionFiles().orElse(null),
+                    forceKeepDelete);
         }
     }
 
-    private RecordReader<KeyValue> mergeRead(
+    public RecordReader<KeyValue> createMergeReader(
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> files,
@@ -316,10 +280,10 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
             reader = new DropDeleteReader(reader);
         }
 
-        return reader;
+        return projectOuter(projectKey(reader));
     }
 
-    private RecordReader<KeyValue> noMergeRead(
+    public RecordReader<KeyValue> createNoMergeReader(
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> files,
@@ -344,7 +308,8 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
                                 file.schemaId(), fileName, file.fileSize(), 
file.level());
                     });
         }
-        return ConcatRecordReader.create(suppliers);
+
+        return projectOuter(ConcatRecordReader.create(suppliers));
     }
 
     private Optional<String> changelogFile(DataFileMeta fileMeta) {
@@ -365,8 +330,16 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
         return reader.transform(kv -> 
kv.replaceKey(projectedRow.replaceRow(kv.key())));
     }
 
+    private RecordReader<KeyValue> projectOuter(RecordReader<KeyValue> reader) 
{
+        if (outerProjection != null) {
+            ProjectedRow projectedRow = ProjectedRow.from(outerProjection);
+            reader = reader.transform(kv -> 
kv.replaceValue(projectedRow.replaceRow(kv.value())));
+        }
+        return reader;
+    }
+
     @Nullable
-    private UserDefinedSeqComparator createUdsComparator() {
+    public UserDefinedSeqComparator createUdsComparator() {
         return UserDefinedSeqComparator.create(
                 readerFactoryBuilder.projectedValueType(), sequenceFields);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index fb68823ba..083db39cb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
 import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatKey;
 import org.apache.paimon.format.FormatReaderContext;
@@ -98,6 +99,17 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         this.projection = Projection.range(0, 
rowType.getFieldCount()).toNestedIndexes();
     }
 
+    @Override
+    public SplitRead<InternalRow> forceKeepDelete() {
+        return this;
+    }
+
+    @Override
+    public SplitRead<InternalRow> withIOManager(@Nullable IOManager ioManager) 
{
+        return this;
+    }
+
+    @Override
     public RawFileSplitRead withProjection(int[][] projectedFields) {
         if (projectedFields != null) {
             projection = projectedFields;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index a5b05e8f4..c17c0f6b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -18,9 +18,13 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.IOFunction;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
@@ -31,8 +35,48 @@ import java.io.IOException;
  */
 public interface SplitRead<T> {
 
-    SplitRead<T> withFilter(Predicate predicate);
+    SplitRead<T> forceKeepDelete();
+
+    SplitRead<T> withIOManager(@Nullable IOManager ioManager);
+
+    SplitRead<T> withProjection(@Nullable int[][] projectedFields);
+
+    SplitRead<T> withFilter(@Nullable Predicate predicate);
 
     /** Create a {@link RecordReader} from split. */
     RecordReader<T> createReader(DataSplit split) throws IOException;
+
+    static <L, R> SplitRead<R> convert(
+            SplitRead<L> read, IOFunction<DataSplit, RecordReader<R>> 
convertedFactory) {
+        return new SplitRead<R>() {
+            @Override
+            public SplitRead<R> forceKeepDelete() {
+                read.forceKeepDelete();
+                return this;
+            }
+
+            @Override
+            public SplitRead<R> withIOManager(@Nullable IOManager ioManager) {
+                read.withIOManager(ioManager);
+                return this;
+            }
+
+            @Override
+            public SplitRead<R> withProjection(@Nullable int[][] 
projectedFields) {
+                read.withProjection(projectedFields);
+                return this;
+            }
+
+            @Override
+            public SplitRead<R> withFilter(@Nullable Predicate predicate) {
+                read.withFilter(predicate);
+                return this;
+            }
+
+            @Override
+            public RecordReader<R> createReader(DataSplit split) throws 
IOException {
+                return convertedFactory.apply(split);
+            }
+        };
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index f3f66c4aa..c674e4792 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -23,14 +23,22 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.RawFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.utils.LazyField;
+import 
org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider;
+import org.apache.paimon.table.source.splitread.IncrementalDiffReadProvider;
+import org.apache.paimon.table.source.splitread.MergeFileSplitReadProvider;
+import org.apache.paimon.table.source.splitread.RawFileSplitReadProvider;
+import org.apache.paimon.table.source.splitread.SplitReadProvider;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Supplier;
 
 /**
@@ -38,8 +46,7 @@ import java.util.function.Supplier;
  */
 public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> {
 
-    private final LazyField<MergeFileSplitRead> mergeRead;
-    private final LazyField<RawFileSplitRead> batchRawRead;
+    private final List<SplitReadProvider> readProviders;
 
     private int[][] projection = null;
     private boolean forceKeepDelete = false;
@@ -51,65 +58,54 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead<KeyValue> {
             Supplier<RawFileSplitRead> batchRawReadSupplier,
             TableSchema schema) {
         super(schema);
-        this.mergeRead = new LazyField<>(() -> 
createMergeRead(mergeReadSupplier));
-        this.batchRawRead = new LazyField<>(() -> 
createBatchRawRead(batchRawReadSupplier));
+        this.readProviders =
+                Arrays.asList(
+                        new RawFileSplitReadProvider(batchRawReadSupplier, 
this::assignValues),
+                        new MergeFileSplitReadProvider(mergeReadSupplier, 
this::assignValues),
+                        new 
IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues),
+                        new IncrementalDiffReadProvider(mergeReadSupplier, 
this::assignValues));
     }
 
-    private MergeFileSplitRead createMergeRead(Supplier<MergeFileSplitRead> 
readSupplier) {
-        MergeFileSplitRead read =
-                readSupplier
-                        .get()
-                        .withKeyProjection(new int[0][])
-                        .withValueProjection(projection)
-                        .withFilter(predicate)
-                        .withIOManager(ioManager);
-        if (forceKeepDelete) {
-            read = read.forceKeepDelete();
+    private List<SplitRead<InternalRow>> initialized() {
+        List<SplitRead<InternalRow>> readers = new ArrayList<>();
+        for (SplitReadProvider readProvider : readProviders) {
+            if (readProvider.initialized()) {
+                readers.add(readProvider.getOrCreate());
+            }
         }
-        return read;
+        return readers;
     }
 
-    private RawFileSplitRead createBatchRawRead(Supplier<RawFileSplitRead> 
readSupplier) {
-        return 
readSupplier.get().withProjection(projection).withFilter(predicate);
+    private void assignValues(SplitRead<InternalRow> read) {
+        if (forceKeepDelete) {
+            read = read.forceKeepDelete();
+        }
+        
read.withProjection(projection).withFilter(predicate).withIOManager(ioManager);
     }
 
     @Override
     public void projection(int[][] projection) {
-        if (mergeRead.initialized()) {
-            mergeRead.get().withValueProjection(projection);
-        }
-        if (batchRawRead.initialized()) {
-            batchRawRead.get().withProjection(projection);
-        }
+        initialized().forEach(r -> r.withProjection(projection));
         this.projection = projection;
     }
 
     @Override
     public InnerTableRead forceKeepDelete() {
-        if (mergeRead.initialized()) {
-            mergeRead.get().forceKeepDelete();
-        }
+        initialized().forEach(SplitRead::forceKeepDelete);
         this.forceKeepDelete = true;
         return this;
     }
 
     @Override
     protected InnerTableRead innerWithFilter(Predicate predicate) {
-        if (mergeRead.initialized()) {
-            mergeRead.get().withFilter(predicate);
-        }
-        if (batchRawRead.initialized()) {
-            batchRawRead.get().withFilter(predicate);
-        }
+        initialized().forEach(r -> r.withFilter(predicate));
         this.predicate = predicate;
         return this;
     }
 
     @Override
     public TableRead withIOManager(IOManager ioManager) {
-        if (mergeRead.initialized()) {
-            mergeRead.get().withIOManager(ioManager);
-        }
+        initialized().forEach(r -> r.withIOManager(ioManager));
         this.ioManager = ioManager;
         return this;
     }
@@ -117,11 +113,16 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead<KeyValue> {
     @Override
     public RecordReader<InternalRow> reader(Split split) throws IOException {
         DataSplit dataSplit = (DataSplit) split;
-        if (!forceKeepDelete && !dataSplit.isStreaming() && 
split.convertToRawFiles().isPresent()) {
-            return batchRawRead.get().createReader(dataSplit);
+        for (SplitReadProvider readProvider : readProviders) {
+            if (readProvider.match(dataSplit, forceKeepDelete)) {
+                return readProvider.getOrCreate().createReader(dataSplit);
+            }
         }
 
-        RecordReader<KeyValue> reader = 
mergeRead.get().createReader(dataSplit);
+        throw new RuntimeException("Should not happen.");
+    }
+
+    public static RecordReader<InternalRow> unwrap(RecordReader<KeyValue> 
reader) {
         return new RecordReader<InternalRow>() {
 
             @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
new file mode 100644
index 000000000..bec95979d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.splitread;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.ReverseReader;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.IOFunction;
+import org.apache.paimon.utils.LazyField;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
+
+/** A {@link SplitReadProvider} to incremental changelog read. */
+public class IncrementalChangelogReadProvider implements SplitReadProvider {
+
+    private final LazyField<SplitRead<InternalRow>> splitRead;
+
+    public IncrementalChangelogReadProvider(
+            Supplier<MergeFileSplitRead> supplier,
+            Consumer<SplitRead<InternalRow>> valuesAssigner) {
+        this.splitRead =
+                new LazyField<>(
+                        () -> {
+                            SplitRead<InternalRow> read = create(supplier);
+                            valuesAssigner.accept(read);
+                            return read;
+                        });
+    }
+
+    private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> 
supplier) {
+        final MergeFileSplitRead read = supplier.get().withKeyProjection(new 
int[0][]);
+        IOFunction<DataSplit, RecordReader<InternalRow>> convertedFactory =
+                split -> {
+                    RecordReader<KeyValue> reader =
+                            ConcatRecordReader.create(
+                                    () ->
+                                            new ReverseReader(
+                                                    read.createNoMergeReader(
+                                                            split.partition(),
+                                                            split.bucket(),
+                                                            
split.beforeFiles(),
+                                                            
split.beforeDeletionFiles()
+                                                                    
.orElse(null),
+                                                            true)),
+                                    () ->
+                                            read.createNoMergeReader(
+                                                    split.partition(),
+                                                    split.bucket(),
+                                                    split.dataFiles(),
+                                                    
split.deletionFiles().orElse(null),
+                                                    true));
+                    return unwrap(reader);
+                };
+
+        return SplitRead.convert(read, convertedFactory);
+    }
+
+    @Override
+    public boolean match(DataSplit split, boolean forceKeepDelete) {
+        return !split.beforeFiles().isEmpty() && split.isStreaming();
+    }
+
+    @Override
+    public boolean initialized() {
+        return splitRead.initialized();
+    }
+
+    @Override
+    public SplitRead<InternalRow> getOrCreate() {
+        return splitRead.get();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
new file mode 100644
index 000000000..a335a7c03
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.splitread;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.LazyField;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** A {@link SplitReadProvider} to batch incremental diff read. */
+public class IncrementalDiffReadProvider implements SplitReadProvider {
+
+    private final LazyField<SplitRead<InternalRow>> splitRead;
+
+    public IncrementalDiffReadProvider(
+            Supplier<MergeFileSplitRead> supplier,
+            Consumer<SplitRead<InternalRow>> valuesAssigner) {
+        this.splitRead =
+                new LazyField<>(
+                        () -> {
+                            SplitRead<InternalRow> read = create(supplier);
+                            valuesAssigner.accept(read);
+                            return read;
+                        });
+    }
+
+    private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> 
supplier) {
+        return new IncrementalDiffSplitRead(supplier.get());
+    }
+
+    @Override
+    public boolean match(DataSplit split, boolean forceKeepDelete) {
+        return !split.beforeFiles().isEmpty() && !split.isStreaming();
+    }
+
+    @Override
+    public boolean initialized() {
+        return splitRead.initialized();
+    }
+
+    @Override
+    public SplitRead<InternalRow> getOrCreate() {
+        return splitRead.get();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
similarity index 55%
rename from 
paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index bc5153600..0519da9fd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -16,15 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation;
+package org.apache.paimon.table.source.splitread;
 
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
 
@@ -34,13 +43,73 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
-/** A {@link RecordReader} util to read diff between before reader and after 
reader. */
-public class DiffReader {
+/** A {@link SplitRead} for batch incremental diff. */
+public class IncrementalDiffSplitRead implements SplitRead<InternalRow> {
 
     private static final int BEFORE_LEVEL = Integer.MIN_VALUE;
     private static final int AFTER_LEVEL = Integer.MAX_VALUE;
 
-    public static RecordReader<KeyValue> readDiff(
+    private final MergeFileSplitRead mergeRead;
+
+    private boolean forceKeepDelete = false;
+    @Nullable private int[][] projectedFields;
+
+    public IncrementalDiffSplitRead(MergeFileSplitRead mergeRead) {
+        this.mergeRead = mergeRead;
+    }
+
+    @Override
+    public SplitRead<InternalRow> forceKeepDelete() {
+        this.forceKeepDelete = true;
+        return this;
+    }
+
+    @Override
+    public SplitRead<InternalRow> withIOManager(@Nullable IOManager ioManager) 
{
+        mergeRead.withIOManager(ioManager);
+        return this;
+    }
+
+    @Override
+    public SplitRead<InternalRow> withProjection(@Nullable int[][] 
projectedFields) {
+        this.projectedFields = projectedFields;
+        return this;
+    }
+
+    @Override
+    public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
+        mergeRead.withFilter(predicate);
+        return this;
+    }
+
+    @Override
+    public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
+        RecordReader<KeyValue> reader =
+                readDiff(
+                        mergeRead.createMergeReader(
+                                split.partition(),
+                                split.bucket(),
+                                split.beforeFiles(),
+                                split.beforeDeletionFiles().orElse(null),
+                                false),
+                        mergeRead.createMergeReader(
+                                split.partition(),
+                                split.bucket(),
+                                split.dataFiles(),
+                                split.deletionFiles().orElse(null),
+                                false),
+                        mergeRead.keyComparator(),
+                        mergeRead.createUdsComparator(),
+                        mergeRead.mergeSorter(),
+                        forceKeepDelete);
+        if (projectedFields != null) {
+            ProjectedRow projectedRow = ProjectedRow.from(projectedFields);
+            reader = reader.transform(kv -> 
kv.replaceValue(projectedRow.replaceRow(kv.value())));
+        }
+        return KeyValueTableRead.unwrap(reader);
+    }
+
+    private static RecordReader<KeyValue> readDiff(
             RecordReader<KeyValue> beforeReader,
             RecordReader<KeyValue> afterReader,
             Comparator<InternalRow> keyComparator,
@@ -54,7 +123,7 @@ public class DiffReader {
                         () -> wrapLevelToReader(afterReader, AFTER_LEVEL)),
                 keyComparator,
                 userDefinedSeqComparator,
-                new DiffMerger(keepDelete));
+                new DiffMerger(keepDelete, 
InternalSerializers.create(sorter.valueType())));
     }
 
     private static RecordReader<KeyValue> wrapLevelToReader(
@@ -96,11 +165,15 @@ public class DiffReader {
     private static class DiffMerger implements MergeFunctionWrapper<KeyValue> {
 
         private final boolean keepDelete;
+        private final InternalRowSerializer serializer1;
+        private final InternalRowSerializer serializer2;
 
         private final List<KeyValue> kvs = new ArrayList<>();
 
-        public DiffMerger(boolean keepDelete) {
+        public DiffMerger(boolean keepDelete, InternalRowSerializer 
serializer) {
             this.keepDelete = keepDelete;
+            this.serializer1 = serializer;
+            this.serializer2 = serializer.duplicate();
         }
 
         @Override
@@ -128,7 +201,9 @@ public class DiffReader {
             } else if (kvs.size() == 2) {
                 KeyValue latest = kvs.get(1);
                 if (latest.level() == AFTER_LEVEL) {
-                    return latest;
+                    if (!valueEquals()) {
+                        return latest;
+                    }
                 }
             } else {
                 throw new IllegalArgumentException("Illegal kv number: " + 
kvs.size());
@@ -136,5 +211,11 @@ public class DiffReader {
 
             return null;
         }
+
+        private boolean valueEquals() {
+            return serializer1
+                    .toBinaryRow(kvs.get(0).value())
+                    .equals(serializer2.toBinaryRow(kvs.get(1).value()));
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
new file mode 100644
index 000000000..abed0f33c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.splitread;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.LazyField;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
+
+/** A {@link SplitReadProvider} to merge files. */
+public class MergeFileSplitReadProvider implements SplitReadProvider {
+
+    private final LazyField<SplitRead<InternalRow>> splitRead;
+
+    public MergeFileSplitReadProvider(
+            Supplier<MergeFileSplitRead> supplier,
+            Consumer<SplitRead<InternalRow>> valuesAssigner) {
+        this.splitRead =
+                new LazyField<>(
+                        () -> {
+                            SplitRead<InternalRow> read = create(supplier);
+                            valuesAssigner.accept(read);
+                            return read;
+                        });
+    }
+
+    private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> 
supplier) {
+        final MergeFileSplitRead read = supplier.get().withKeyProjection(new 
int[0][]);
+        return SplitRead.convert(read, split -> 
unwrap(read.createReader(split)));
+    }
+
+    @Override
+    public boolean match(DataSplit split, boolean forceKeepDelete) {
+        return split.beforeFiles().isEmpty();
+    }
+
+    @Override
+    public boolean initialized() {
+        return splitRead.initialized();
+    }
+
+    @Override
+    public SplitRead<InternalRow> getOrCreate() {
+        return splitRead.get();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
new file mode 100644
index 000000000..9959ee555
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.splitread;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.operation.RawFileSplitRead;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.LazyField;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** A {@link SplitReadProvider} to create {@link RawFileSplitRead}. */
+public class RawFileSplitReadProvider implements SplitReadProvider {
+
+    private final LazyField<RawFileSplitRead> splitRead;
+
+    public RawFileSplitReadProvider(
+            Supplier<RawFileSplitRead> supplier, 
Consumer<SplitRead<InternalRow>> valuesAssigner) {
+        this.splitRead =
+                new LazyField<>(
+                        () -> {
+                            RawFileSplitRead read = supplier.get();
+                            valuesAssigner.accept(read);
+                            return read;
+                        });
+    }
+
+    @Override
+    public boolean match(DataSplit split, boolean forceKeepDelete) {
+        return !forceKeepDelete && !split.isStreaming() && 
split.rawConvertible();
+    }
+
+    @Override
+    public boolean initialized() {
+        return splitRead.initialized();
+    }
+
+    @Override
+    public SplitRead<InternalRow> getOrCreate() {
+        return splitRead.get();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
similarity index 64%
copy from paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
index a5b05e8f4..2aaefb322 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
@@ -16,23 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation;
+package org.apache.paimon.table.source.splitread;
 
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.table.source.DataSplit;
 
-import java.io.IOException;
+/** Provider to create {@link SplitRead}. */
+public interface SplitReadProvider {
 
-/**
- * Read operation which provides {@link RecordReader} creation.
- *
- * @param <T> type of record to read.
- */
-public interface SplitRead<T> {
+    boolean match(DataSplit split, boolean forceKeepDelete);
 
-    SplitRead<T> withFilter(Predicate predicate);
+    boolean initialized();
 
-    /** Create a {@link RecordReader} from split. */
-    RecordReader<T> createReader(DataSplit split) throws IOException;
+    SplitRead<InternalRow> getOrCreate();
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 5652fcd43..1794e8aea 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -231,7 +231,7 @@ public class MergeFileSplitReadTest {
             read.withKeyProjection(keyProjection);
         }
         if (valueProjection != null) {
-            read.withValueProjection(valueProjection);
+            read.withProjection(valueProjection);
         }
 
         List<KeyValue> result = new ArrayList<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index f4dac9d11..b4b905d36 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -225,6 +225,8 @@ public class IncrementalTableTest extends TableTestBase {
                 GenericRow.of(1, 1, 1),
                 GenericRow.of(1, 2, 1),
                 GenericRow.of(1, 3, 1),
+                GenericRow.of(1, 4, 1),
+                GenericRow.of(1, 5, 1),
                 GenericRow.of(2, 1, 1));
 
         // snapshot 2: append
@@ -235,7 +237,7 @@ public class IncrementalTableTest extends TableTestBase {
                 // UPDATE
                 GenericRow.of(1, 2, 2),
                 // NEW
-                GenericRow.of(1, 4, 1));
+                GenericRow.of(1, 6, 1));
 
         // snapshot 3: compact
         compact(table, row(1), 0);
@@ -247,24 +249,30 @@ public class IncrementalTableTest extends TableTestBase {
         // read tag1 tag2
         List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN, 
"TAG1,TAG2"));
         assertThat(result)
-                .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), 
GenericRow.of(1, 4, 1));
+                .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), 
GenericRow.of(1, 6, 1));
         result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2"));
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         GenericRow.of(fromString("-D"), 1, 1, 1),
                         GenericRow.of(fromString("+I"), 1, 2, 2),
-                        GenericRow.of(fromString("+I"), 1, 4, 1));
+                        GenericRow.of(fromString("+I"), 1, 6, 1));
 
         // read tag1 tag3
         result = read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
         assertThat(result)
-                .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), 
GenericRow.of(1, 4, 1));
+                .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), 
GenericRow.of(1, 6, 1));
+
+        // read tag1 tag3 auditLog
         result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         GenericRow.of(fromString("-D"), 1, 1, 1),
                         GenericRow.of(fromString("+I"), 1, 2, 2),
-                        GenericRow.of(fromString("+I"), 1, 4, 1));
+                        GenericRow.of(fromString("+I"), 1, 6, 1));
+
+        // read tag1 tag3 projection
+        result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, 
"TAG1,TAG3"));
+        assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), 
GenericRow.of(6));
 
         assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, 
"TAG2,TAG1")))
                 .hasMessageContaining(


Reply via email to