This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b0b63757b3 [core] Introduce IncrementalSplit to simplify DataSplit 
(#7093)
b0b63757b3 is described below

commit b0b63757b3ed67ef48b0496035c146b5a22d4744
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 21 20:25:11 2026 +0800

    [core] Introduce IncrementalSplit to simplify DataSplit (#7093)
---
 .../org/apache/paimon/manifest/PartitionEntry.java |  25 --
 .../paimon/operation/MergeFileSplitRead.java       |   4 -
 .../apache/paimon/operation/RawFileSplitRead.java  |  35 ++-
 .../org/apache/paimon/table/source/DataSplit.java  |  77 +------
 .../paimon/table/source/IncrementalSplit.java      | 254 +++++++++++++++++++++
 .../source/snapshot/AbstractStartingScanner.java   |  39 +++-
 .../snapshot/IncrementalDiffStartingScanner.java   |   7 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  91 +++++---
 .../table/source/snapshot/StartingScanner.java     |   6 +-
 .../AppendTableRawFileSplitReadProvider.java       |   3 +-
 .../IncrementalChangelogReadProvider.java          |  27 +--
 .../splitread/IncrementalDiffReadProvider.java     |   8 +-
 .../source/splitread/IncrementalDiffSplitRead.java |  10 +-
 .../splitread/MergeFileSplitReadProvider.java      |   9 +-
 .../paimon/table/system/FileMonitorTable.java      |  35 ++-
 .../cluster/BucketedAppendClusterManagerTest.java  |   3 +-
 .../apache/paimon/table/SimpleTableTestBase.java   |   5 +-
 .../snapshot/CompactedStartingScannerTest.java     |   2 +-
 .../source/snapshot/FullStartingScannerTest.java   |   2 +-
 .../snapshot/StaticFromTagStartingScannerTest.java |   2 +-
 .../lookup/IncrementalCompactDiffSplitRead.java    |  40 ++--
 .../paimon/flink/lookup/LookupCompactDiffRead.java |   5 +-
 .../paimon/flink/lookup/LookupStreamingReader.java |   9 +-
 .../flink/lookup/PrimaryKeyPartialLookupTable.java |  36 ++-
 .../source/ContinuousFileSplitEnumerator.java      |  30 ++-
 .../flink/source/align/PlaceholderSplit.java       |   6 -
 .../paimon/hive/utils/HiveSplitGenerator.java      |   3 +-
 .../paimon/spark/scan/BinPackingSplits.scala       |   2 +-
 28 files changed, 526 insertions(+), 249 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index aaee93602a..d162234cd5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -23,16 +23,13 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 
-import static org.apache.paimon.manifest.FileKind.ADD;
 import static org.apache.paimon.manifest.FileKind.DELETE;
 
 /** Entry representing a partition. */
@@ -150,28 +147,6 @@ public class PartitionEntry {
         return partitions.values();
     }
 
-    public static Collection<PartitionEntry> mergeSplits(Collection<DataSplit> 
splits) {
-        Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
-        for (DataSplit split : splits) {
-            BinaryRow partition = split.partition();
-            for (DataFileMeta file : split.dataFiles()) {
-                PartitionEntry partitionEntry =
-                        fromDataFile(
-                                partition,
-                                ADD,
-                                file,
-                                
Optional.ofNullable(split.totalBuckets()).orElse(0));
-                partitions.compute(
-                        partition,
-                        (part, old) -> old == null ? partitionEntry : 
old.merge(partitionEntry));
-            }
-
-            // Ignore before files, because we don't know how to merge them
-            // Ignore deletion files, because it is costly to read from it
-        }
-        return partitions.values();
-    }
-
     public static void merge(Collection<PartitionEntry> from, Map<BinaryRow, 
PartitionEntry> to) {
         for (PartitionEntry entry : from) {
             to.compute(entry.partition(), (part, old) -> old == null ? entry : 
old.merge(entry));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index c41c9c3777..689117f1d2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -229,10 +229,6 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
     }
 
     public RecordReader<KeyValue> createReader(DataSplit split) throws 
IOException {
-        if (!split.beforeFiles().isEmpty()) {
-            throw new IllegalArgumentException("This read cannot accept split 
with before files.");
-        }
-
         if (split.isStreaming() || split.bucket() == 
BucketMode.POSTPONE_BUCKET) {
             return createNoMergeReader(
                     split.partition(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index deda6e682d..4b4beeaf31 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -45,6 +45,8 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -144,19 +146,38 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
     @Override
     public RecordReader<InternalRow> createReader(Split s) throws IOException {
-        DataSplit split = (DataSplit) s;
-        if (!split.beforeFiles().isEmpty()) {
-            LOG.info("Ignore split before files: {}", split.beforeFiles());
+        if (s instanceof DataSplit) {
+            DataSplit split = (DataSplit) s;
+            return createReader(
+                    split.partition(),
+                    split.bucket(),
+                    split.dataFiles(),
+                    split.deletionFiles().orElse(null));
+        } else {
+            IncrementalSplit split = (IncrementalSplit) s;
+            if (!split.beforeFiles().isEmpty()) {
+                LOG.info("Ignore split before files: {}", split.beforeFiles());
+            }
+            return createReader(
+                    split.partition(),
+                    split.bucket(),
+                    split.afterFiles(),
+                    split.afterDeletionFiles());
         }
+    }
 
-        List<DataFileMeta> files = split.dataFiles();
-        DeletionVector.Factory dvFactory =
-                DeletionVector.factory(fileIO, files, 
split.deletionFiles().orElse(null));
+    public RecordReader<InternalRow> createReader(
+            BinaryRow partition,
+            int bucket,
+            List<DataFileMeta> files,
+            List<DeletionFile> deletionFiles)
+            throws IOException {
+        DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, 
files, deletionFiles);
         Map<String, IOExceptionSupplier<DeletionVector>> dvFactories = new 
HashMap<>();
         for (DataFileMeta file : files) {
             dvFactories.put(file.fileName(), () -> 
dvFactory.create(file.fileName()).orElse(null));
         }
-        return createReader(split.partition(), split.bucket(), 
split.dataFiles(), dvFactories);
+        return createReader(partition, bucket, files, dvFactories);
     }
 
     public RecordReader<InternalRow> createReader(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 956b67c157..cbd2e3086f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -70,9 +70,6 @@ public class DataSplit implements Split {
     private String bucketPath;
     @Nullable private Integer totalBuckets;
 
-    private List<DataFileMeta> beforeFiles = new ArrayList<>();
-    @Nullable private List<DeletionFile> beforeDeletionFiles;
-
     private List<DataFileMeta> dataFiles;
     @Nullable private List<DeletionFile> dataDeletionFiles;
 
@@ -101,14 +98,6 @@ public class DataSplit implements Split {
         return totalBuckets;
     }
 
-    public List<DataFileMeta> beforeFiles() {
-        return beforeFiles;
-    }
-
-    public Optional<List<DeletionFile>> beforeDeletionFiles() {
-        return Optional.ofNullable(beforeDeletionFiles);
-    }
-
     public List<DataFileMeta> dataFiles() {
         return dataFiles;
     }
@@ -126,10 +115,6 @@ public class DataSplit implements Split {
         return rawConvertible;
     }
 
-    public OptionalLong latestFileCreationEpochMillis() {
-        return 
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
-    }
-
     public OptionalLong earliestFileCreationEpochMillis() {
         return 
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).min();
     }
@@ -260,32 +245,6 @@ public class DataSplit implements Split {
         return sum;
     }
 
-    /**
-     * Obtain merged row count as much as possible. There are two scenarios 
where accurate row count
-     * can be calculated:
-     *
-     * <p>1. raw file and no deletion file.
-     *
-     * <p>2. raw file + deletion file with cardinality.
-     */
-    public long partialMergedRowCount() {
-        long sum = 0L;
-        if (rawConvertible) {
-            List<RawFile> rawFiles = convertToRawFiles().orElse(null);
-            if (rawFiles != null) {
-                for (int i = 0; i < rawFiles.size(); i++) {
-                    RawFile rawFile = rawFiles.get(i);
-                    if (dataDeletionFiles == null || dataDeletionFiles.get(i) 
== null) {
-                        sum += rawFile.rowCount();
-                    } else if (dataDeletionFiles.get(i).cardinality() != null) 
{
-                        sum += rawFile.rowCount() - 
dataDeletionFiles.get(i).cardinality();
-                    }
-                }
-            }
-        }
-        return sum;
-    }
-
     @Override
     public Optional<List<RawFile>> convertToRawFiles() {
         if (rawConvertible) {
@@ -352,8 +311,6 @@ public class DataSplit implements Split {
                 && Objects.equals(partition, dataSplit.partition)
                 && Objects.equals(bucketPath, dataSplit.bucketPath)
                 && Objects.equals(totalBuckets, dataSplit.totalBuckets)
-                && Objects.equals(beforeFiles, dataSplit.beforeFiles)
-                && Objects.equals(beforeDeletionFiles, 
dataSplit.beforeDeletionFiles)
                 && Objects.equals(dataFiles, dataSplit.dataFiles)
                 && Objects.equals(dataDeletionFiles, 
dataSplit.dataDeletionFiles);
     }
@@ -366,8 +323,6 @@ public class DataSplit implements Split {
                 bucket,
                 bucketPath,
                 totalBuckets,
-                beforeFiles,
-                beforeDeletionFiles,
                 dataFiles,
                 dataDeletionFiles,
                 isStreaming,
@@ -404,8 +359,6 @@ public class DataSplit implements Split {
         this.bucket = other.bucket;
         this.bucketPath = other.bucketPath;
         this.totalBuckets = other.totalBuckets;
-        this.beforeFiles = other.beforeFiles;
-        this.beforeDeletionFiles = other.beforeDeletionFiles;
         this.dataFiles = other.dataFiles;
         this.dataDeletionFiles = other.dataDeletionFiles;
         this.isStreaming = other.isStreaming;
@@ -427,12 +380,10 @@ public class DataSplit implements Split {
         }
 
         DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
-        out.writeInt(beforeFiles.size());
-        for (DataFileMeta file : beforeFiles) {
-            dataFileSer.serialize(file, out);
-        }
 
-        DeletionFile.serializeList(out, beforeDeletionFiles);
+        // compatible with old beforeFiles
+        out.writeInt(0);
+        DeletionFile.serializeList(out, null);
 
         out.writeInt(dataFiles.size());
         for (DataFileMeta file : dataFiles) {
@@ -461,13 +412,15 @@ public class DataSplit implements Split {
         FunctionWithIOException<DataInputView, DeletionFile> deletionFileSerde 
=
                 getDeletionFileSerde(version);
         int beforeNumber = in.readInt();
-        List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
-        for (int i = 0; i < beforeNumber; i++) {
-            beforeFiles.add(dataFileSer.apply(in));
+        if (beforeNumber > 0) {
+            throw new RuntimeException("Cannot deserialize data split with 
before files.");
         }
 
         List<DeletionFile> beforeDeletionFiles =
                 DeletionFile.deserializeList(in, deletionFileSerde);
+        if (beforeDeletionFiles != null) {
+            throw new RuntimeException("Cannot deserialize data split with 
before deletion files.");
+        }
 
         int fileNumber = in.readInt();
         List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
@@ -487,14 +440,10 @@ public class DataSplit implements Split {
                         .withBucket(bucket)
                         .withBucketPath(bucketPath)
                         .withTotalBuckets(totalBuckets)
-                        .withBeforeFiles(beforeFiles)
                         .withDataFiles(dataFiles)
                         .isStreaming(isStreaming)
                         .rawConvertible(rawConvertible);
 
-        if (beforeDeletionFiles != null) {
-            builder.withBeforeDeletionFiles(beforeDeletionFiles);
-        }
         if (dataDeletionFiles != null) {
             builder.withDataDeletionFiles(dataDeletionFiles);
         }
@@ -572,16 +521,6 @@ public class DataSplit implements Split {
             return this;
         }
 
-        public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
-            this.split.beforeFiles = new ArrayList<>(beforeFiles);
-            return this;
-        }
-
-        public Builder withBeforeDeletionFiles(List<DeletionFile> 
beforeDeletionFiles) {
-            this.split.beforeDeletionFiles = new 
ArrayList<>(beforeDeletionFiles);
-            return this;
-        }
-
         public Builder withDataFiles(List<DataFileMeta> dataFiles) {
             this.split.dataFiles = new ArrayList<>(dataFiles);
             return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/IncrementalSplit.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/IncrementalSplit.java
new file mode 100644
index 0000000000..5b672a4fef
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/IncrementalSplit.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.utils.FunctionWithIOException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Incremental split for batch and streaming. */
+public class IncrementalSplit implements Split {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int VERSION = 1;
+
+    private long snapshotId;
+    private BinaryRow partition;
+    private int bucket;
+    private int totalBuckets;
+
+    private List<DataFileMeta> beforeFiles;
+    private @Nullable List<DeletionFile> beforeDeletionFiles;
+
+    private List<DataFileMeta> afterFiles;
+    private @Nullable List<DeletionFile> afterDeletionFiles;
+
+    private boolean isStreaming;
+
+    public IncrementalSplit(
+            long snapshotId,
+            BinaryRow partition,
+            int bucket,
+            int totalBuckets,
+            List<DataFileMeta> beforeFiles,
+            @Nullable List<DeletionFile> beforeDeletionFiles,
+            List<DataFileMeta> afterFiles,
+            @Nullable List<DeletionFile> afterDeletionFiles,
+            boolean isStreaming) {
+        this.snapshotId = snapshotId;
+        this.partition = partition;
+        this.bucket = bucket;
+        this.totalBuckets = totalBuckets;
+        this.beforeFiles = beforeFiles;
+        this.beforeDeletionFiles = beforeDeletionFiles;
+        this.afterFiles = afterFiles;
+        this.afterDeletionFiles = afterDeletionFiles;
+        this.isStreaming = isStreaming;
+    }
+
+    public long snapshotId() {
+        return snapshotId;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public int totalBuckets() {
+        return totalBuckets;
+    }
+
+    public List<DataFileMeta> beforeFiles() {
+        return beforeFiles;
+    }
+
+    @Nullable
+    public List<DeletionFile> beforeDeletionFiles() {
+        return beforeDeletionFiles;
+    }
+
+    public List<DataFileMeta> afterFiles() {
+        return afterFiles;
+    }
+
+    @Nullable
+    public List<DeletionFile> afterDeletionFiles() {
+        return afterDeletionFiles;
+    }
+
+    public boolean isStreaming() {
+        return isStreaming;
+    }
+
+    @Override
+    public long rowCount() {
+        long rowCount = 0;
+        for (DataFileMeta file : beforeFiles) {
+            rowCount += file.rowCount();
+        }
+        for (DataFileMeta file : afterFiles) {
+            rowCount += file.rowCount();
+        }
+        return rowCount;
+    }
+
+    @Override
+    public OptionalLong mergedRowCount() {
+        return OptionalLong.empty();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IncrementalSplit that = (IncrementalSplit) o;
+        return snapshotId == that.snapshotId
+                && bucket == that.bucket
+                && totalBuckets == that.totalBuckets
+                && isStreaming == that.isStreaming
+                && Objects.equals(partition, that.partition)
+                && Objects.equals(beforeFiles, that.beforeFiles)
+                && Objects.equals(beforeDeletionFiles, 
that.beforeDeletionFiles)
+                && Objects.equals(afterFiles, that.afterFiles)
+                && Objects.equals(afterDeletionFiles, that.afterDeletionFiles);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                snapshotId,
+                partition,
+                bucket,
+                totalBuckets,
+                beforeFiles,
+                beforeDeletionFiles,
+                afterFiles,
+                afterDeletionFiles,
+                isStreaming);
+    }
+
+    @Override
+    public String toString() {
+        return "IncrementalSplit{"
+                + "snapshotId="
+                + snapshotId
+                + ", partition="
+                + partition
+                + ", bucket="
+                + bucket
+                + ", totalBuckets="
+                + totalBuckets
+                + ", beforeFiles="
+                + beforeFiles
+                + ", beforeDeletionFiles="
+                + beforeDeletionFiles
+                + ", afterFiles="
+                + afterFiles
+                + ", afterDeletionFiles="
+                + afterDeletionFiles
+                + ", isStreaming="
+                + isStreaming
+                + '}';
+    }
+
+    private void writeObject(ObjectOutputStream objectOutputStream) throws 
IOException {
+        DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(objectOutputStream);
+        out.writeInt(VERSION);
+        out.writeLong(snapshotId);
+        serializeBinaryRow(partition, out);
+        out.writeInt(bucket);
+        out.writeInt(totalBuckets);
+
+        DataFileMetaSerializer dataFileSerializer = new 
DataFileMetaSerializer();
+        out.writeInt(beforeFiles.size());
+        for (DataFileMeta file : beforeFiles) {
+            dataFileSerializer.serialize(file, out);
+        }
+
+        DeletionFile.serializeList(out, beforeDeletionFiles);
+
+        out.writeInt(afterFiles.size());
+        for (DataFileMeta file : afterFiles) {
+            dataFileSerializer.serialize(file, out);
+        }
+
+        DeletionFile.serializeList(out, afterDeletionFiles);
+
+        out.writeBoolean(isStreaming);
+    }
+
+    private void readObject(ObjectInputStream objectInputStream)
+            throws IOException, ClassNotFoundException {
+        DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(objectInputStream);
+        int version = in.readInt();
+        if (version != VERSION) {
+            throw new UnsupportedOperationException("Unsupported version: " + 
version);
+        }
+
+        snapshotId = in.readLong();
+        partition = deserializeBinaryRow(in);
+        bucket = in.readInt();
+        totalBuckets = in.readInt();
+
+        DataFileMetaSerializer dataFileMetaSerializer = new 
DataFileMetaSerializer();
+        FunctionWithIOException<DataInputView, DeletionFile> 
deletionFileSerializer =
+                DeletionFile::deserialize;
+
+        int beforeNumber = in.readInt();
+        beforeFiles = new ArrayList<>(beforeNumber);
+        for (int i = 0; i < beforeNumber; i++) {
+            beforeFiles.add(dataFileMetaSerializer.deserialize(in));
+        }
+
+        beforeDeletionFiles = DeletionFile.deserializeList(in, 
deletionFileSerializer);
+
+        int fileNumber = in.readInt();
+        afterFiles = new ArrayList<>(fileNumber);
+        for (int i = 0; i < fileNumber; i++) {
+            afterFiles.add(dataFileMetaSerializer.deserialize(in));
+        }
+
+        afterDeletionFiles = DeletionFile.deserializeList(in, 
deletionFileSerializer);
+
+        isStreaming = in.readBoolean();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
index c1b2f985a4..84212a3cc4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
@@ -18,13 +18,23 @@
 
 package org.apache.paimon.table.source.snapshot;
 
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.SnapshotManager;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.manifest.FileKind.ADD;
 
 /** The abstract class for StartingScanner. */
 public abstract class AbstractStartingScanner implements StartingScanner {
@@ -54,8 +64,35 @@ public abstract class AbstractStartingScanner implements 
StartingScanner {
     public List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader) {
         Result result = scan(snapshotReader);
         if (result instanceof ScannedResult) {
-            return new ArrayList<>(PartitionEntry.mergeSplits(((ScannedResult) 
result).splits()));
+            return mergeDataSplitsToPartitionEntries(((ScannedResult) 
result).splits());
         }
         return Collections.emptyList();
     }
+
+    private static List<PartitionEntry> mergeDataSplitsToPartitionEntries(
+            Collection<Split> splits) {
+        Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
+        for (Split s : splits) {
+            if (!(s instanceof DataSplit)) {
+                throw new UnsupportedOperationException();
+            }
+            DataSplit split = (DataSplit) s;
+            BinaryRow partition = split.partition();
+            for (DataFileMeta file : split.dataFiles()) {
+                PartitionEntry partitionEntry =
+                        PartitionEntry.fromDataFile(
+                                partition,
+                                ADD,
+                                file,
+                                
Optional.ofNullable(split.totalBuckets()).orElse(0));
+                partitions.compute(
+                        partition,
+                        (part, old) -> old == null ? partitionEntry : 
old.merge(partitionEntry));
+            }
+
+            // Ignore before files, because we don't know how to merge them
+            // Ignore deletion files, because it is costly to read from it
+        }
+        return new ArrayList<>(partitions.values());
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
index 37c6498e95..3a42f16e2c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.tag.TagPeriodHandler;
@@ -67,6 +68,12 @@ public class IncrementalDiffStartingScanner extends 
AbstractStartingScanner {
         return 
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
     }
 
+    @Override
+    public List<PartitionEntry> scanPartitions(SnapshotReader reader) {
+        // ignore start, just use end to read partition entries
+        return reader.withSnapshot(end).partitionEntries();
+    }
+
     public static StartingScanner betweenTags(
             Tag startTag,
             Tag endTag,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 3021eeda3e..c839a5dbd1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -46,8 +46,10 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
@@ -459,41 +461,42 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
 
         Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
                 groupByPartFiles(plan.files(FileKind.DELETE));
-        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> afterFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
         LazyField<Snapshot> beforeSnapshot =
                 new LazyField<>(() -> 
snapshotManager.snapshot(plan.snapshot().id() - 1));
-        return toChangesPlan(true, plan, beforeSnapshot, beforeFiles, 
dataFiles);
+        return toIncrementalPlan(
+                true, beforeSnapshot, beforeFiles, plan.snapshot(), 
plan.watermark(), afterFiles);
     }
 
-    private Plan toChangesPlan(
+    private Plan toIncrementalPlan(
             boolean isStreaming,
-            FileStoreScan.Plan plan,
             LazyField<Snapshot> beforeSnapshot,
             Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles,
-            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles) {
-        Snapshot snapshot = plan.snapshot();
-        List<DataSplit> splits = new ArrayList<>();
+            @Nullable Snapshot afterSnapshot,
+            @Nullable Long afterWatermark,
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> afterFiles) {
+        List<Split> splits = new ArrayList<>();
         Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
         beforeFiles.forEach(
                 (part, bucketMap) ->
                         buckets.computeIfAbsent(part, k -> new HashSet<>())
                                 .addAll(bucketMap.keySet()));
-        dataFiles.forEach(
+        afterFiles.forEach(
                 (part, bucketMap) ->
                         buckets.computeIfAbsent(part, k -> new HashSet<>())
                                 .addAll(bucketMap.keySet()));
         // Read deletion indexes at once to reduce file IO
         Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
beforeDeletionFilesMap = null;
-        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFilesMap = null;
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
afterDeletionFilesMap = null;
         if (!isStreaming && deletionVectors) {
             beforeDeletionFilesMap =
                     beforeSnapshot.get() != null
                             ? scanDvIndex(beforeSnapshot.get(), 
toPartBuckets(beforeFiles))
                             : Collections.emptyMap();
-            deletionFilesMap =
-                    snapshot != null
-                            ? scanDvIndex(snapshot, toPartBuckets(dataFiles))
+            afterDeletionFilesMap =
+                    afterSnapshot != null
+                            ? scanDvIndex(afterSnapshot, 
toPartBuckets(afterFiles))
                             : Collections.emptyMap();
         }
 
@@ -505,13 +508,10 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                                 .getOrDefault(part, Collections.emptyMap())
                                 .getOrDefault(bucket, Collections.emptyList());
                 List<ManifestEntry> dataEntries =
-                        dataFiles
+                        afterFiles
                                 .getOrDefault(part, Collections.emptyMap())
                                 .getOrDefault(bucket, Collections.emptyList());
 
-                // deduplicate
-                beforeEntries.removeIf(dataEntries::remove);
-
                 Integer totalBuckets = null;
                 if (!dataEntries.isEmpty()) {
                     totalBuckets = dataEntries.get(0).totalBuckets();
@@ -519,54 +519,69 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                     totalBuckets = beforeEntries.get(0).totalBuckets();
                 }
 
+                // deduplicate
+                beforeEntries.removeIf(dataEntries::remove);
+
                 List<DataFileMeta> before =
                         beforeEntries.stream()
                                 .map(ManifestEntry::file)
                                 .collect(Collectors.toList());
-                List<DataFileMeta> data =
+                List<DataFileMeta> after =
                         
dataEntries.stream().map(ManifestEntry::file).collect(Collectors.toList());
 
-                DataSplit.Builder builder =
-                        DataSplit.builder()
-                                .withSnapshot(snapshot.id())
-                                .withPartition(part)
-                                .withBucket(bucket)
-                                .withTotalBuckets(totalBuckets)
-                                .withBeforeFiles(before)
-                                .withDataFiles(data)
-                                .isStreaming(isStreaming)
-                                .withBucketPath(pathFactory.bucketPath(part, 
bucket).toString());
+                List<DeletionFile> beforeDeletionFiles = null;
                 if (deletionVectors && beforeDeletionFilesMap != null) {
-                    builder.withBeforeDeletionFiles(
+                    beforeDeletionFiles =
                             getDeletionFiles(
                                     before,
                                     beforeDeletionFilesMap.getOrDefault(
-                                            Pair.of(part, bucket), 
Collections.emptyMap())));
+                                            Pair.of(part, bucket), 
Collections.emptyMap()));
                 }
-                if (deletionVectors && deletionFilesMap != null) {
-                    builder.withDataDeletionFiles(
+
+                List<DeletionFile> afterDeletionFiles = null;
+                if (deletionVectors && afterDeletionFilesMap != null) {
+                    afterDeletionFiles =
                             getDeletionFiles(
-                                    data,
-                                    deletionFilesMap.getOrDefault(
-                                            Pair.of(part, bucket), 
Collections.emptyMap())));
+                                    after,
+                                    afterDeletionFilesMap.getOrDefault(
+                                            Pair.of(part, bucket), 
Collections.emptyMap()));
                 }
-                splits.add(builder.build());
+
+                IncrementalSplit split =
+                        new IncrementalSplit(
+                                afterSnapshot.id(),
+                                part,
+                                bucket,
+                                totalBuckets,
+                                before,
+                                beforeDeletionFiles,
+                                after,
+                                afterDeletionFiles,
+                                isStreaming);
+
+                splits.add(split);
             }
         }
 
         return new PlanImpl(
-                plan.watermark(), snapshot == null ? null : snapshot.id(), 
(List) splits);
+                afterWatermark, afterSnapshot == null ? null : 
afterSnapshot.id(), splits);
     }
 
     @Override
     public Plan readIncrementalDiff(Snapshot before) {
         withMode(ScanMode.ALL);
         FileStoreScan.Plan plan = scan.plan();
-        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> afterFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
         Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
                 
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
-        return toChangesPlan(false, plan, new LazyField<>(() -> before), 
beforeFiles, dataFiles);
+        return toIncrementalPlan(
+                false,
+                new LazyField<>(() -> before),
+                beforeFiles,
+                plan.snapshot(),
+                plan.watermark(),
+                afterFiles);
     }
 
     private RecordComparator partitionComparator() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 6d9daa70a9..07b317515a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableScan;
 
 import javax.annotation.Nullable;
@@ -63,8 +63,8 @@ public interface StartingScanner {
             return plan.watermark();
         }
 
-        public List<DataSplit> splits() {
-            return (List) plan.splits();
+        public List<Split> splits() {
+            return plan.splits();
         }
 
         public SnapshotReader.Plan plan() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
index 5a2d4635bb..5bb72578ab 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.splitread;
 
 import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.Split;
 
 import java.util.function.Supplier;
@@ -34,6 +35,6 @@ public class AppendTableRawFileSplitReadProvider extends 
RawFileSplitReadProvide
 
     @Override
     public boolean match(Split split, Context context) {
-        return split instanceof DataSplit;
+        return split instanceof DataSplit || split instanceof IncrementalSplit;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index ee3d84b954..d7d9866a9c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -25,7 +25,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.ReverseReader;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOFunction;
@@ -55,25 +55,23 @@ public class IncrementalChangelogReadProvider implements 
SplitReadProvider {
         final MergeFileSplitRead read = 
supplier.get().withReadKeyType(RowType.of());
         IOFunction<Split, RecordReader<InternalRow>> convertedFactory =
                 split -> {
-                    DataSplit dataSplit = (DataSplit) split;
+                    IncrementalSplit incrementalSplit = (IncrementalSplit) 
split;
                     RecordReader<KeyValue> reader =
                             ConcatRecordReader.create(
                                     () ->
                                             new ReverseReader(
                                                     read.createMergeReader(
-                                                            
dataSplit.partition(),
-                                                            dataSplit.bucket(),
-                                                            
dataSplit.beforeFiles(),
-                                                            dataSplit
-                                                                    
.beforeDeletionFiles()
-                                                                    
.orElse(null),
+                                                            
incrementalSplit.partition(),
+                                                            
incrementalSplit.bucket(),
+                                                            
incrementalSplit.beforeFiles(),
+                                                            
incrementalSplit.beforeDeletionFiles(),
                                                             false)),
                                     () ->
                                             read.createMergeReader(
-                                                    dataSplit.partition(),
-                                                    dataSplit.bucket(),
-                                                    dataSplit.dataFiles(),
-                                                    
dataSplit.deletionFiles().orElse(null),
+                                                    
incrementalSplit.partition(),
+                                                    incrementalSplit.bucket(),
+                                                    
incrementalSplit.afterFiles(),
+                                                    
incrementalSplit.afterDeletionFiles(),
                                                     false));
                     return unwrap(reader);
                 };
@@ -83,11 +81,10 @@ public class IncrementalChangelogReadProvider implements 
SplitReadProvider {
 
     @Override
     public boolean match(Split split, Context context) {
-        if (!(split instanceof DataSplit)) {
+        if (!(split instanceof IncrementalSplit)) {
             return false;
         }
-        DataSplit dataSplit = (DataSplit) split;
-        return !dataSplit.beforeFiles().isEmpty() && dataSplit.isStreaming();
+        return ((IncrementalSplit) split).isStreaming();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
index 6a381c333c..858e07e5d0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table.source.splitread;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.LazyField;
 
@@ -49,11 +49,11 @@ public class IncrementalDiffReadProvider implements 
SplitReadProvider {
 
     @Override
     public boolean match(Split split, Context context) {
-        if (!(split instanceof DataSplit)) {
+        if (!(split instanceof IncrementalSplit)) {
             return false;
         }
-        DataSplit dataSplit = (DataSplit) split;
-        return !dataSplit.beforeFiles().isEmpty() && !dataSplit.isStreaming();
+        IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+        return !incrementalSplit.isStreaming();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index c22416a26a..d00a8a4fd9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -29,7 +29,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.RowKind;
@@ -86,20 +86,20 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
 
     @Override
     public RecordReader<InternalRow> createReader(Split s) throws IOException {
-        DataSplit split = (DataSplit) s;
+        IncrementalSplit split = (IncrementalSplit) s;
         RecordReader<KeyValue> reader =
                 readDiff(
                         mergeRead.createMergeReader(
                                 split.partition(),
                                 split.bucket(),
                                 split.beforeFiles(),
-                                split.beforeDeletionFiles().orElse(null),
+                                split.beforeDeletionFiles(),
                                 forceKeepDelete),
                         mergeRead.createMergeReader(
                                 split.partition(),
                                 split.bucket(),
-                                split.dataFiles(),
-                                split.deletionFiles().orElse(null),
+                                split.afterFiles(),
+                                split.afterDeletionFiles(),
                                 forceKeepDelete),
                         mergeRead.keyComparator(),
                         mergeRead.createUdsComparator(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index d69209ef4e..fb3b850812 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -54,14 +54,7 @@ public class MergeFileSplitReadProvider implements 
SplitReadProvider {
 
     @Override
     public boolean match(Split split, Context context) {
-        if (split instanceof DataSplit) {
-            DataSplit dataSplit = (DataSplit) split;
-            return dataSplit.beforeFiles().isEmpty();
-        } else if (split instanceof ChainSplit) {
-            return true;
-        } else {
-            return false;
-        }
+        return split instanceof DataSplit || split instanceof ChainSplit;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 060570406b..97903d881e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -41,6 +41,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamDataTableScan;
@@ -64,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static java.util.Collections.emptyList;
 import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
 import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
 import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
@@ -181,7 +183,7 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public List<String> primaryKeys() {
-        return Collections.emptyList();
+        return emptyList();
     }
 
     @Override
@@ -238,20 +240,29 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
 
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-            if (!(split instanceof DataSplit)) {
+            FileChange change;
+            if (split instanceof DataSplit) {
+                DataSplit dataSplit = (DataSplit) split;
+                change =
+                        new FileChange(
+                                dataSplit.snapshotId(),
+                                dataSplit.partition(),
+                                dataSplit.bucket(),
+                                emptyList(),
+                                dataSplit.dataFiles());
+            } else if (split instanceof IncrementalSplit) {
+                IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+                change =
+                        new FileChange(
+                                incrementalSplit.snapshotId(),
+                                incrementalSplit.partition(),
+                                incrementalSplit.bucket(),
+                                incrementalSplit.beforeFiles(),
+                                incrementalSplit.afterFiles());
+            } else {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
 
-            DataSplit dataSplit = (DataSplit) split;
-
-            FileChange change =
-                    new FileChange(
-                            dataSplit.snapshotId(),
-                            dataSplit.partition(),
-                            dataSplit.bucket(),
-                            dataSplit.beforeFiles(),
-                            dataSplit.dataFiles());
-
             return new 
IteratorRecordReader<>(Collections.singletonList(toRow(change)).iterator());
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
index 0d2d60e699..d7a3d5d59e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
@@ -94,7 +94,8 @@ public class BucketedAppendClusterManagerTest {
                         ((AppendOnlyFileStoreTable) table)
                                 .store()
                                 .newRead()
-                                .createReader(BinaryRow.EMPTY_ROW, 0, 
result.after(), null))) {
+                                .createReader(
+                                        BinaryRow.EMPTY_ROW, 0, 
result.after(), (List) null))) {
             while (clusterRows.hasNext()) {
                 InternalRow row = clusterRows.next();
                 rows.add(String.format("%d,%d", row.getInt(1), row.getInt(2)));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 69cf5441f5..a310404f53 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -57,6 +57,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.ScanMode;
@@ -1641,8 +1642,8 @@ public abstract class SimpleTableTestBase {
                                 .dataSplits());
 
         for (Split split : splits) {
-            DataSplit dataSplit = (DataSplit) split;
-            
Assertions.assertThat(dataSplit.deletionFiles().isPresent()).isFalse();
+            IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+            
Assertions.assertThat(incrementalSplit.afterDeletionFiles()).isNull();
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 60c9573004..f6049674bb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -59,7 +59,7 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
         StartingScanner.ScannedResult result =
                 (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(3);
-        assertThat(getResult(table.newRead(), toSplits(result.splits())))
+        assertThat(getResult(table.newRead(), result.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
 
         write.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 20a65a04b7..d6b30adb11 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -54,7 +54,7 @@ public class FullStartingScannerTest extends ScannerTestBase {
         StartingScanner.ScannedResult result =
                 (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(2);
-        assertThat(getResult(table.newRead(), toSplits(result.splits())))
+        assertThat(getResult(table.newRead(), result.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
 
         write.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
index 357bb33863..c595359ab9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java
@@ -58,7 +58,7 @@ public class StaticFromTagStartingScannerTest extends 
ScannerTestBase {
         StartingScanner.ScannedResult result =
                 (StartingScanner.ScannedResult) scanner.scan(snapshotReader);
         assertThat(result.currentSnapshotId()).isEqualTo(2);
-        assertThat(getResult(table.newRead(), toSplits(result.splits())))
+        assertThat(getResult(table.newRead(), result.splits()))
                 .hasSameElementsAs(
                         Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 
2|30|101", "+I 2|40|201"));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
index 6583e615d9..8c9b5a6cdc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/IncrementalCompactDiffSplitRead.java
@@ -24,7 +24,7 @@ import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.reader.EmptyRecordReader;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.splitread.IncrementalDiffSplitRead;
 
@@ -41,39 +41,31 @@ public class IncrementalCompactDiffSplitRead extends 
IncrementalDiffSplitRead {
 
     @Override
     public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-        DataSplit dataSplit = (DataSplit) split;
-        if (dataSplit.beforeFiles().stream().noneMatch(file -> file.level() == 
0)) {
+        IncrementalSplit incrementalSplit = (IncrementalSplit) split;
+        if (incrementalSplit.beforeFiles().stream().noneMatch(file -> 
file.level() == 0)) {
             return new EmptyRecordReader<>();
         }
-        return super.createReader(filterLevel0Files(dataSplit));
+        return super.createReader(filterLevel0Files(incrementalSplit));
     }
 
-    private DataSplit filterLevel0Files(DataSplit split) {
+    private IncrementalSplit filterLevel0Files(IncrementalSplit split) {
         List<DataFileMeta> beforeFiles =
                 split.beforeFiles().stream()
                         .filter(file -> file.level() > 0)
                         .collect(Collectors.toList());
         List<DataFileMeta> afterFiles =
-                split.dataFiles().stream()
+                split.afterFiles().stream()
                         .filter(file -> file.level() > 0)
                         .collect(Collectors.toList());
-        DataSplit.Builder builder =
-                new DataSplit.Builder()
-                        .withSnapshot(split.snapshotId())
-                        .withPartition(split.partition())
-                        .withBucket(split.bucket())
-                        .withBucketPath(split.bucketPath())
-                        .withBeforeFiles(beforeFiles)
-                        .withDataFiles(afterFiles)
-                        .isStreaming(split.isStreaming())
-                        .rawConvertible(split.rawConvertible());
-
-        if (split.beforeDeletionFiles().isPresent()) {
-            builder.withBeforeDeletionFiles(split.beforeDeletionFiles().get());
-        }
-        if (split.deletionFiles().isPresent()) {
-            builder.withDataDeletionFiles(split.deletionFiles().get());
-        }
-        return builder.build();
+        return new IncrementalSplit(
+                split.snapshotId(),
+                split.partition(),
+                split.bucket(),
+                split.totalBuckets(),
+                beforeFiles,
+                null,
+                afterFiles,
+                null,
+                split.isStreaming());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index c4be923472..dfdd9dd207 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -57,9 +57,8 @@ public class LookupCompactDiffRead extends 
AbstractDataTableRead {
 
     @Override
     public RecordReader<InternalRow> reader(Split split) throws IOException {
-        DataSplit dataSplit = (DataSplit) split;
-        if (dataSplit.beforeFiles().isEmpty()) {
-            return fullPhaseMergeRead.createReader(dataSplit); // full reading 
phase
+        if (split instanceof DataSplit) {
+            return fullPhaseMergeRead.createReader(split); // full reading 
phase
         } else {
             return incrementalDiffRead.createReader(split);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ac0f00d895..df326054b1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.RowType;
@@ -155,11 +156,15 @@ public class LookupStreamingReader {
             return;
         }
 
-        DataSplit dataSplit = (DataSplit) splits.get(0);
+        Split split = splits.get(0);
+        long snapshotId =
+                split instanceof DataSplit
+                        ? ((DataSplit) split).snapshotId()
+                        : ((IncrementalSplit) split).snapshotId();
         LOG.info(
                 "LookupStreamingReader get splits from {} with snapshotId {}.",
                 table.name(),
-                dataSplit.snapshotId());
+                snapshotId);
     }
 
     @Nullable
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 03016bbccf..b00ac7e083 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.query.LocalTableQuery;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.utils.Filter;
@@ -53,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Collections.emptyList;
 import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -165,7 +167,7 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         Integer numBuckets = queryExecutor.numBuckets(partition);
         if (numBuckets == null) {
             // no data, just return none
-            return Collections.emptyList();
+            return emptyList();
         }
         int bucket = bucket(numBuckets, adjustedKey);
 
@@ -176,7 +178,7 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
 
         InternalRow kv = queryExecutor.lookup(partition, bucket, trimmedKey);
         if (kv == null) {
-            return Collections.emptyList();
+            return emptyList();
         } else {
             return Collections.singletonList(kv);
         }
@@ -319,19 +321,26 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                 }
 
                 for (Split split : splits) {
-                    refreshSplit((DataSplit) split);
+                    refreshSplit(split);
                 }
             }
         }
 
         @VisibleForTesting
-        void refreshSplit(DataSplit split) {
+        void refreshSplit(Split split) {
+            if (split instanceof DataSplit) {
+                refreshSplit((DataSplit) split);
+            } else {
+                refreshSplit((IncrementalSplit) split);
+            }
+        }
+
+        private void refreshSplit(DataSplit split) {
             BinaryRow partition = split.partition();
             int bucket = split.bucket();
-            List<DataFileMeta> before = split.beforeFiles();
             List<DataFileMeta> after = split.dataFiles();
 
-            tableQuery.refreshFiles(partition, bucket, before, after);
+            tableQuery.refreshFiles(partition, bucket, emptyList(), after);
             Integer totalBuckets = split.totalBuckets();
             if (totalBuckets == null) {
                 // Just for compatibility with older versions
@@ -343,6 +352,13 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
             numBuckets.put(partition, totalBuckets);
         }
 
+        private void refreshSplit(IncrementalSplit split) {
+            BinaryRow partition = split.partition();
+            tableQuery.refreshFiles(
+                    partition, split.bucket(), split.beforeFiles(), 
split.afterFiles());
+            numBuckets.put(partition, split.totalBuckets());
+        }
+
         @Override
         public Long nextSnapshotId() {
             return this.scan.checkpoint();
@@ -359,11 +375,15 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                 return;
             }
 
-            DataSplit dataSplit = (DataSplit) splits.get(0);
+            Split split = splits.get(0);
+            long snapshotId =
+                    split instanceof DataSplit
+                            ? ((DataSplit) split).snapshotId()
+                            : ((IncrementalSplit) split).snapshotId();
             LOG.info(
                     "LocalQueryExecutor get splits from {} with snapshotId 
{}.",
                     tableName,
-                    dataSplit.snapshotId());
+                    snapshotId);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 668bd7d847..87b3f77d5a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.IncrementalSplit;
 import org.apache.paimon.table.source.SnapshotNotExistPlan;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
@@ -305,20 +306,39 @@ public class ContinuousFileSplitEnumerator
     }
 
     protected int assignSuggestedTask(FileStoreSourceSplit split) {
-        DataSplit dataSplit = ((DataSplit) split.split());
+        if (split.split() instanceof DataSplit) {
+            return assignSuggestedTask((DataSplit) split.split());
+        } else {
+            return assignSuggestedTask((IncrementalSplit) split.split());
+        }
+    }
+
+    protected int assignSuggestedTask(DataSplit split) {
         int parallelism = context.currentParallelism();
 
         int bucketId;
-        if (dataSplit.bucket() == BucketMode.POSTPONE_BUCKET) {
+        if (split.bucket() == BucketMode.POSTPONE_BUCKET) {
             bucketId =
-                    
PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName())
+                    
PostponeBucketFileStoreWrite.getWriteId(split.dataFiles().get(0).fileName())
                             % parallelism;
         } else {
-            bucketId = dataSplit.bucket();
+            bucketId = split.bucket();
+        }
+
+        if (shuffleBucketWithPartition) {
+            return ChannelComputer.select(split.partition(), bucketId, 
parallelism);
+        } else {
+            return ChannelComputer.select(bucketId, parallelism);
         }
+    }
+
+    protected int assignSuggestedTask(IncrementalSplit split) {
+        int parallelism = context.currentParallelism();
 
+        // TODO how to deal with postpone bucket?
+        int bucketId = split.bucket();
         if (shuffleBucketWithPartition) {
-            return ChannelComputer.select(dataSplit.partition(), bucketId, 
parallelism);
+            return ChannelComputer.select(split.partition(), bucketId, 
parallelism);
         } else {
             return ChannelComputer.select(bucketId, parallelism);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
index 070b6f6b8f..42044860c6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
@@ -44,7 +44,6 @@ public class PlaceholderSplit extends DataSplit {
         dataSplit =
                 DataSplit.builder()
                         .withSnapshot(snapshotId)
-                        .withBeforeFiles(Collections.emptyList())
                         .withBucket(0)
                         .withDataFiles(Collections.emptyList())
                         .withPartition(BinaryRow.EMPTY_ROW)
@@ -73,11 +72,6 @@ public class PlaceholderSplit extends DataSplit {
         return dataSplit.bucket();
     }
 
-    @Override
-    public List<DataFileMeta> beforeFiles() {
-        return dataSplit.beforeFiles();
-    }
-
     @Override
     public List<DataFileMeta> dataFiles() {
         return dataSplit.dataFiles();
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index fb811841ac..57186709e5 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -55,7 +55,6 @@ import static java.util.Collections.singletonMap;
 import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
 import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
 import static org.apache.paimon.hive.utils.HiveUtils.extractTagName;
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 
 /** Generator to generate hive input splits. */
 public class HiveSplitGenerator {
@@ -174,7 +173,7 @@ public class HiveSplitGenerator {
         for (DataSplit split : splits) {
             if (split instanceof FallbackReadFileStoreTable.FallbackSplit) {
                 dataSplits.add(split);
-            } else if (split.beforeFiles().isEmpty() && 
split.rawConvertible()) {
+            } else if (split.rawConvertible()) {
                 numFiles += split.dataFiles().size();
                 toPack.add(split);
             } else {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index 679234bc74..8ed80644a8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -78,7 +78,7 @@ case class BinPackingSplits(coreOptions: CoreOptions, 
readRowSizeRatio: Double =
   def pack(splits: Array[Split]): Seq[PaimonInputPartition] = {
     val (toReshuffle, reserved) = splits.partition {
       case _: FallbackSplit => false
-      case split: DataSplit => split.beforeFiles().isEmpty && 
split.rawConvertible()
+      case split: DataSplit => split.rawConvertible()
       // Currently, format table reader only supports reading one file.
       case _: FormatDataSplit => false
       case _ => false

Reply via email to