This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aee8fbebe [core] Refactor DataSplit to contain before files
aee8fbebe is described below

commit aee8fbebe77dfd19db8fc747708089a5bb31a62d
Author: Jingsong <[email protected]>
AuthorDate: Tue Jul 11 10:16:02 2023 +0800

    [core] Refactor DataSplit to contain before files
    
    This closes #1527
---
 .../AppendOnlyTableCompactionCoordinator.java      |   2 +-
 .../paimon/operation/AppendOnlyFileStoreRead.java  |   2 +-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  11 +-
 .../paimon/operation/KeyValueFileStoreRead.java    |  47 +++---
 .../paimon/table/AppendOnlyFileStoreTable.java     |   3 -
 .../table/ChangelogValueCountFileStoreTable.java   |   3 -
 .../org/apache/paimon/table/source/DataSplit.java  | 184 +++++++++++++--------
 .../snapshot/IncrementalStartingScanner.java       |  10 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  | 110 ++++++------
 .../apache/paimon/table/system/BucketsTable.java   |   2 +-
 .../org/apache/paimon/table/system/FilesTable.java |   4 +-
 .../test/java/org/apache/paimon/TestFileStore.java |  14 +-
 .../operation/KeyValueFileStoreReadTest.java       |  17 +-
 .../table/ChangelogWithKeyFileMetaFilterTest.java  |   6 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |   8 +-
 ...hangelogWithKeyTableColumnTypeFileMetaTest.java |   2 +-
 .../paimon/table/ColumnTypeFileMetaTestBase.java   |  12 +-
 .../paimon/table/FileMetaFilterTestBase.java       |  18 +-
 .../paimon/table/FileStoreTableTestBase.java       |   2 +-
 .../paimon/table/SchemaEvolutionTableTestBase.java |   2 +-
 .../org/apache/paimon/table/source/SplitTest.java  |  12 +-
 .../paimon/flink/action/CompactActionITCase.java   |   4 +-
 .../paimon/flink/sink/CompactorSinkITCase.java     |   4 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  |  16 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |   3 +-
 .../source/FileStoreSourceSplitSerializerTest.java |  11 +-
 .../paimon/hive/mapred/PaimonInputSplit.java       |   2 +-
 .../paimon/hive/mapred/PaimonInputSplitTest.java   |  18 +-
 28 files changed, 305 insertions(+), 224 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
index e89cff30e..7277d5e26 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
@@ -116,7 +116,7 @@ public class AppendOnlyTableCompactionCoordinator {
             splits.forEach(
                     split -> {
                         DataSplit dataSplit = (DataSplit) split;
-                        notifyNewFiles(dataSplit.partition(), 
dataSplit.files());
+                        notifyNewFiles(dataSplit.partition(), 
dataSplit.dataFiles());
                     });
             // batch mode, we don't do continuous scanning
             if (!streamingMode) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index 6bcfc27fd..2a90a0bac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -98,7 +98,7 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(split.partition(), 
split.bucket());
         List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new 
ArrayList<>();
-        for (DataFileMeta file : split.files()) {
+        for (DataFileMeta file : split.dataFiles()) {
             String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
             BulkFormatMapping bulkFormatMapping =
                     bulkFormatMappings.computeIfAbsent(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 8019df5c5..abe350c8c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -158,12 +158,11 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
             rewriter.write(
                     new RecordReaderIterator<>(
                             read.createReader(
-                                    new DataSplit(
-                                            0L /* unused */,
-                                            partition,
-                                            bucket,
-                                            toCompact,
-                                            false))));
+                                    DataSplit.builder()
+                                            .withPartition(partition)
+                                            .withBucket(bucket)
+                                            .withDataFiles(toCompact)
+                                            .build())));
             rewriter.close();
             return rewriter.result();
         };
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 9fce1cb85..b3aa93d53 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -32,6 +32,7 @@ import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.mergetree.compact.IntervalPartition;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
@@ -51,6 +52,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -185,26 +187,17 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
 
     private RecordReader<KeyValue> 
createReaderWithoutOuterProjection(DataSplit split)
             throws IOException {
-        if (split.isIncremental()) {
+        if (split.isStreaming()) {
             KeyValueFileReaderFactory readerFactory =
                     readerFactoryBuilder.build(
                             split.partition(), split.bucket(), true, 
filtersForOverlappedSection);
-            // Return the raw file contents without merging
-            List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new 
ArrayList<>();
-            for (DataFileMeta file : split.files()) {
-                suppliers.add(
-                        () -> {
-                            // We need to check extraFiles to be compatible 
with Paimon 0.2.
-                            // See comments on DataFileMeta#extraFiles.
-                            String fileName = 
changelogFile(file).orElse(file.fileName());
-                            return readerFactory.createRecordReader(
-                                    file.schemaId(), fileName, file.level());
-                        });
-            }
-            RecordReader<KeyValue> concatRecordReader = 
ConcatRecordReader.create(suppliers);
-            return split.reverseRowKind()
-                    ? new ReverseReader(concatRecordReader)
-                    : concatRecordReader;
+            ReaderSupplier<KeyValue> beforeSupplier =
+                    () -> new 
ReverseReader(streamingConcat(split.beforeFiles(), readerFactory));
+            ReaderSupplier<KeyValue> dataSupplier =
+                    () -> streamingConcat(split.dataFiles(), readerFactory);
+            return split.beforeFiles().isEmpty()
+                    ? dataSupplier.get()
+                    : ConcatRecordReader.create(Arrays.asList(beforeSupplier, 
dataSupplier));
         } else {
             // Sections are read by SortMergeReader, which sorts and merges 
records by keys.
             // So we cannot project keys or else the sorting will be incorrect.
@@ -218,11 +211,11 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                             false,
                             filtersForNonOverlappedSection);
 
-            List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders = 
new ArrayList<>();
+            List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
             MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
                     new 
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
             for (List<SortedRun> section :
-                    new IntervalPartition(split.files(), 
keyComparator).partition()) {
+                    new IntervalPartition(split.dataFiles(), 
keyComparator).partition()) {
                 sectionReaders.add(
                         () ->
                                 MergeTreeReaders.readerForSection(
@@ -245,6 +238,22 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         }
     }
 
+    private RecordReader<KeyValue> streamingConcat(
+            List<DataFileMeta> files, KeyValueFileReaderFactory readerFactory) 
throws IOException {
+        List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
+        for (DataFileMeta file : files) {
+            suppliers.add(
+                    () -> {
+                        // We need to check extraFiles to be compatible with 
Paimon 0.2.
+                        // See comments on DataFileMeta#extraFiles.
+                        String fileName = 
changelogFile(file).orElse(file.fileName());
+                        return readerFactory.createRecordReader(
+                                file.schemaId(), fileName, file.level());
+                    });
+        }
+        return ConcatRecordReader.create(suppliers);
+    }
+
     private Optional<String> changelogFile(DataFileMeta fileMeta) {
         for (String file : fileMeta.extraFiles()) {
             if (file.startsWith(CHANGELOG_FILE_PREFIX)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 68f274465..beb703b84 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -30,7 +30,6 @@ import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
-import org.apache.paimon.operation.ReverseReader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
@@ -94,8 +93,6 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     /**
      * Currently, the streaming read of overwrite is implemented by reversing 
the {@link RowKind} of
      * overwrote records to {@link RowKind#DELETE}, so only tables that have 
primary key support it.
-     *
-     * @see ReverseReader
      */
     @Override
     public boolean supportStreamingReadOverwrite() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 5fa99529d..42b5fa18a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -32,7 +32,6 @@ import 
org.apache.paimon.mergetree.compact.ValueCountMergeFunction;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
 import org.apache.paimon.operation.Lock;
-import org.apache.paimon.operation.ReverseReader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -107,8 +106,6 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     /**
      * Currently, the streaming read of overwrite is implemented by reversing 
the {@link RowKind} of
      * overwrote records to {@link RowKind#DELETE}, so only tables that have 
primary key support it.
-     *
-     * @see ReverseReader
      */
     @Override
     public boolean supportStreamingReadOverwrite() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 8cd768b30..8fb76347c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -34,54 +34,22 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Input splits. Needed by most batch computation engines. */
 public class DataSplit implements Split {
 
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
+
+    private long snapshotId = 0;
+    private boolean isStreaming = false;
+    private List<DataFileMeta> beforeFiles = new ArrayList<>();
 
-    private long snapshotId;
     private BinaryRow partition;
-    private int bucket;
-    private List<DataFileMeta> files;
-    private boolean isIncremental;
-
-    // when reverseRowKind is true, the RowKind of records from this split 
should be reversed to
-    // DELETE
-    private boolean reverseRowKind;
-
-    public DataSplit(
-            long snapshotId,
-            BinaryRow partition,
-            int bucket,
-            List<DataFileMeta> files,
-            boolean isIncremental) {
-        init(snapshotId, partition, bucket, files, isIncremental, false);
-    }
-
-    public DataSplit(
-            long snapshotId,
-            BinaryRow partition,
-            int bucket,
-            List<DataFileMeta> files,
-            boolean isIncremental,
-            boolean reverseRowKind) {
-        init(snapshotId, partition, bucket, files, isIncremental, 
reverseRowKind);
-    }
-
-    private void init(
-            long snapshotId,
-            BinaryRow partition,
-            int bucket,
-            List<DataFileMeta> files,
-            boolean isIncremental,
-            boolean reverseRowKind) {
-        this.snapshotId = snapshotId;
-        this.partition = partition;
-        this.bucket = bucket;
-        this.files = files;
-        this.isIncremental = isIncremental;
-        this.reverseRowKind = reverseRowKind;
-    }
+    private int bucket = -1;
+    private List<DataFileMeta> dataFiles;
+
+    public DataSplit() {}
 
     public long snapshotId() {
         return snapshotId;
@@ -95,22 +63,22 @@ public class DataSplit implements Split {
         return bucket;
     }
 
-    public List<DataFileMeta> files() {
-        return files;
+    public List<DataFileMeta> beforeFiles() {
+        return beforeFiles;
     }
 
-    public boolean isIncremental() {
-        return isIncremental;
+    public List<DataFileMeta> dataFiles() {
+        return dataFiles;
     }
 
-    public boolean reverseRowKind() {
-        return reverseRowKind;
+    public boolean isStreaming() {
+        return isStreaming;
     }
 
     @Override
     public long rowCount() {
         long rowCount = 0;
-        for (DataFileMeta file : files) {
+        for (DataFileMeta file : dataFiles) {
             rowCount += file.rowCount();
         }
         return rowCount;
@@ -127,14 +95,14 @@ public class DataSplit implements Split {
         DataSplit split = (DataSplit) o;
         return bucket == split.bucket
                 && Objects.equals(partition, split.partition)
-                && Objects.equals(files, split.files)
-                && isIncremental == split.isIncremental
-                && reverseRowKind == split.reverseRowKind;
+                && Objects.equals(beforeFiles, split.beforeFiles)
+                && Objects.equals(dataFiles, split.dataFiles)
+                && isStreaming == split.isStreaming;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(partition, bucket, files, isIncremental, 
reverseRowKind);
+        return Objects.hash(partition, bucket, beforeFiles, dataFiles, 
isStreaming);
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -142,40 +110,114 @@ public class DataSplit implements Split {
     }
 
     private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        DataSplit split = DataSplit.deserialize(new 
DataInputViewStreamWrapper(in));
-        init(
-                split.snapshotId,
-                split.partition,
-                split.bucket,
-                split.files,
-                split.isIncremental,
-                split.reverseRowKind);
+        assign(deserialize(new DataInputViewStreamWrapper(in)));
+    }
+
+    private void assign(DataSplit other) {
+        this.snapshotId = other.snapshotId;
+        this.partition = other.partition;
+        this.bucket = other.bucket;
+        this.beforeFiles = other.beforeFiles;
+        this.dataFiles = other.dataFiles;
+        this.isStreaming = other.isStreaming;
     }
 
     public void serialize(DataOutputView out) throws IOException {
         out.writeLong(snapshotId);
         SerializationUtils.serializeBinaryRow(partition, out);
         out.writeInt(bucket);
-        out.writeInt(files.size());
+
         DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
-        for (DataFileMeta file : files) {
+        out.writeInt(beforeFiles.size());
+        for (DataFileMeta file : beforeFiles) {
+            dataFileSer.serialize(file, out);
+        }
+
+        out.writeInt(dataFiles.size());
+        for (DataFileMeta file : dataFiles) {
             dataFileSer.serialize(file, out);
         }
-        out.writeBoolean(isIncremental);
-        out.writeBoolean(reverseRowKind);
+
+        out.writeBoolean(isStreaming);
     }
 
     public static DataSplit deserialize(DataInputView in) throws IOException {
         long snapshotId = in.readLong();
         BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
         int bucket = in.readInt();
-        int fileNumber = in.readInt();
-        List<DataFileMeta> files = new ArrayList<>(fileNumber);
+
         DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
+        int beforeNumber = in.readInt();
+        List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
+        for (int i = 0; i < beforeNumber; i++) {
+            beforeFiles.add(dataFileSer.deserialize(in));
+        }
+
+        int fileNumber = in.readInt();
+        List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
         for (int i = 0; i < fileNumber; i++) {
-            files.add(dataFileSer.deserialize(in));
+            dataFiles.add(dataFileSer.deserialize(in));
+        }
+
+        boolean isStreaming = in.readBoolean();
+
+        return builder()
+                .withSnapshot(snapshotId)
+                .withPartition(partition)
+                .withBucket(bucket)
+                .withBeforeFiles(beforeFiles)
+                .withDataFiles(dataFiles)
+                .isStreaming(isStreaming)
+                .build();
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Builder for {@link DataSplit}. */
+    public static class Builder {
+
+        private final DataSplit split = new DataSplit();
+
+        public Builder withSnapshot(long snapshot) {
+            this.split.snapshotId = snapshot;
+            return this;
+        }
+
+        public Builder withPartition(BinaryRow partition) {
+            this.split.partition = partition;
+            return this;
+        }
+
+        public Builder withBucket(int bucket) {
+            this.split.bucket = bucket;
+            return this;
+        }
+
+        public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
+            this.split.beforeFiles = beforeFiles;
+            return this;
+        }
+
+        public Builder withDataFiles(List<DataFileMeta> dataFiles) {
+            this.split.dataFiles = dataFiles;
+            return this;
+        }
+
+        public Builder isStreaming(boolean isStreaming) {
+            this.split.isStreaming = isStreaming;
+            return this;
+        }
+
+        public DataSplit build() {
+            checkArgument(split.partition != null);
+            checkArgument(split.bucket != -1);
+            checkArgument(split.dataFiles != null);
+
+            DataSplit split = new DataSplit();
+            split.assign(this.split);
+            return split;
         }
-        return new DataSplit(
-                snapshotId, partition, bucket, files, in.readBoolean(), 
in.readBoolean());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 40d40c830..e3ba10691 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -57,7 +57,7 @@ public class IncrementalStartingScanner implements 
StartingScanner {
             for (DataSplit split : splits) {
                 grouped.computeIfAbsent(
                                 Pair.of(split.partition(), split.bucket()), k 
-> new ArrayList<>())
-                        .addAll(split.files());
+                        .addAll(split.dataFiles());
             }
         }
 
@@ -67,7 +67,13 @@ public class IncrementalStartingScanner implements 
StartingScanner {
             int bucket = entry.getKey().getRight();
             for (List<DataFileMeta> files :
                     reader.splitGenerator().splitForBatch(entry.getValue())) {
-                result.add(new DataSplit(end, partition, bucket, files, 
false));
+                result.add(
+                        DataSplit.builder()
+                                .withSnapshot(end)
+                                .withPartition(partition)
+                                .withBucket(bucket)
+                                .withDataFiles(files)
+                                .build());
             }
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 19b4097be..768eb3f0e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -42,13 +42,18 @@ import org.apache.paimon.utils.SnapshotManager;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** Implementation of {@link SnapshotReader}. */
@@ -164,7 +169,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         Long snapshotId = plan.snapshotId();
 
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files =
-                FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD));
+                groupByPartFiles(plan.files(FileKind.ADD));
         if (options.scanPlanSortPartition()) {
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> newFiles = new 
LinkedHashMap<>();
             files.entrySet().stream()
@@ -176,7 +181,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 generateSplits(
                         snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : 
snapshotId,
                         scanKind != ScanKind.ALL,
-                        false,
                         splitGenerator,
                         files);
         return new Plan() {
@@ -216,10 +220,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 .collect(Collectors.toList());
     }
 
-    /**
-     * Get splits from an overwrite snapshot files. The {@link 
FileKind#DELETE} part will be marked
-     * with reverseRowKind = true (see {@link DataSplit}).
-     */
+    /** Get splits from an overwritten snapshot files. */
     @Override
     public Plan readOverwrittenChanges() {
         withKind(ScanKind.DELTA);
@@ -234,21 +235,44 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
 
         List<DataSplit> splits = new ArrayList<>();
 
-        splits.addAll(
-                generateSplits(
-                        snapshotId,
-                        true,
-                        true,
-                        splitGenerator,
-                        
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.DELETE))));
-
-        splits.addAll(
-                generateSplits(
-                        snapshotId,
-                        true,
-                        false,
-                        splitGenerator,
-                        
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD))));
+        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+                groupByPartFiles(plan.files(FileKind.DELETE));
+        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+                groupByPartFiles(plan.files(FileKind.ADD));
+
+        Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
+        beforeFiles.forEach(
+                (part, bucketMap) ->
+                        buckets.computeIfAbsent(part, k -> new HashSet<>())
+                                .addAll(bucketMap.keySet()));
+        dataFiles.forEach(
+                (part, bucketMap) ->
+                        buckets.computeIfAbsent(part, k -> new HashSet<>())
+                                .addAll(bucketMap.keySet()));
+
+        for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
+            BinaryRow part = entry.getKey();
+            for (Integer bucket : entry.getValue()) {
+                List<DataFileMeta> before =
+                        beforeFiles
+                                .getOrDefault(part, Collections.emptyMap())
+                                .getOrDefault(bucket, Collections.emptyList());
+                List<DataFileMeta> data =
+                        dataFiles
+                                .getOrDefault(part, Collections.emptyMap())
+                                .getOrDefault(bucket, Collections.emptyList());
+                DataSplit split =
+                        DataSplit.builder()
+                                .withSnapshot(snapshotId)
+                                .withPartition(part)
+                                .withBucket(bucket)
+                                .withBeforeFiles(before)
+                                .withDataFiles(data)
+                                .isStreaming(true)
+                                .build();
+                splits.add(split);
+            }
+        }
 
         return new Plan() {
             @Nullable
@@ -283,8 +307,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
     @VisibleForTesting
     public static List<DataSplit> generateSplits(
             long snapshotId,
-            boolean isIncremental,
-            boolean reverseRowKind,
+            boolean isStreaming,
             SplitGenerator splitGenerator,
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
         List<DataSplit> splits = new ArrayList<>();
@@ -294,32 +317,21 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
             Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
             for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
                 int bucket = bucketEntry.getKey();
-                if (isIncremental) {
-                    // streaming splits incremental data files
-                    
splitGenerator.splitForStreaming(bucketEntry.getValue()).stream()
-                            .map(
-                                    files ->
-                                            new DataSplit(
-                                                    snapshotId,
-                                                    partition,
-                                                    bucket,
-                                                    files,
-                                                    true,
-                                                    reverseRowKind))
-                            .forEach(splits::add);
-                } else {
-                    
splitGenerator.splitForBatch(bucketEntry.getValue()).stream()
-                            .map(
-                                    files ->
-                                            new DataSplit(
-                                                    snapshotId,
-                                                    partition,
-                                                    bucket,
-                                                    files,
-                                                    false,
-                                                    reverseRowKind))
-                            .forEach(splits::add);
-                }
+                List<DataFileMeta> bucketFiles = bucketEntry.getValue();
+                DataSplit.Builder builder =
+                        DataSplit.builder()
+                                .withSnapshot(snapshotId)
+                                .withPartition(partition)
+                                .withBucket(bucket)
+                                .isStreaming(isStreaming);
+                List<List<DataFileMeta>> splitGroups =
+                        isStreaming
+                                ? splitGenerator.splitForStreaming(bucketFiles)
+                                : splitGenerator.splitForBatch(bucketFiles);
+                splitGroups.stream()
+                        .map(builder::withDataFiles)
+                        .map(DataSplit.Builder::build)
+                        .forEach(splits::add);
             }
         }
         return splits;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index b19f64cd3..c8aea53f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -167,7 +167,7 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
                 // Serialized files are only useful in streaming jobs.
                 // Batch compact jobs only run once, so they only need to know 
what buckets should
                 // be compacted and don't need to concern incremental new 
files.
-                files = dataSplit.files();
+                files = dataSplit.dataFiles();
             }
             InternalRow row =
                     GenericRow.of(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index daf491cc1..94858164f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -169,7 +169,7 @@ public class FilesTable implements ReadonlyTable {
             TableScan.Plan plan = plan();
             return plan.splits().stream()
                     .map(s -> (DataSplit) s)
-                    .mapToLong(s -> s.files().size())
+                    .mapToLong(s -> s.dataFiles().size())
                     .sum();
         }
 
@@ -263,7 +263,7 @@ public class FilesTable implements ReadonlyTable {
             for (Split dataSplit : plan.splits()) {
                 iteratorList.add(
                         Iterators.transform(
-                                ((DataSplit) dataSplit).files().iterator(),
+                                ((DataSplit) dataSplit).dataFiles().iterator(),
                                 file ->
                                         toRow(
                                                 (DataSplit) dataSplit,
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 6b88a838b..2d8b98a60 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -349,7 +349,7 @@ public class TestFileStore extends KeyValueFileStore {
     }
 
     public List<KeyValue> readKvsFromManifestEntries(
-            List<ManifestEntry> entries, boolean isIncremental) throws 
Exception {
+            List<ManifestEntry> entries, boolean isStreaming) throws Exception 
{
         if (LOG.isDebugEnabled()) {
             for (ManifestEntry entry : entries) {
                 LOG.debug("reading from " + entry.toString());
@@ -374,12 +374,12 @@ public class TestFileStore extends KeyValueFileStore {
                 RecordReaderIterator<KeyValue> iterator =
                         new RecordReaderIterator<>(
                                 read.createReader(
-                                        new DataSplit(
-                                                0L /* unused */,
-                                                entryWithPartition.getKey(),
-                                                entryWithBucket.getKey(),
-                                                entryWithBucket.getValue(),
-                                                isIncremental)));
+                                        DataSplit.builder()
+                                                
.withPartition(entryWithPartition.getKey())
+                                                
.withBucket(entryWithBucket.getKey())
+                                                
.withDataFiles(entryWithBucket.getValue())
+                                                .isStreaming(isStreaming)
+                                                .build()));
                 while (iterator.hasNext()) {
                     kvs.add(iterator.next().copy(keySerializer, 
valueSerializer));
                 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
index 6225b7ef8..8a2d118e0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
@@ -234,14 +234,15 @@ public class KeyValueFileStoreReadTest {
         for (Map.Entry<BinaryRow, List<ManifestEntry>> entry : 
filesGroupedByPartition.entrySet()) {
             RecordReader<KeyValue> reader =
                     read.createReader(
-                            new DataSplit(
-                                    snapshotId,
-                                    entry.getKey(),
-                                    0,
-                                    entry.getValue().stream()
-                                            .map(ManifestEntry::file)
-                                            .collect(Collectors.toList()),
-                                    false));
+                            DataSplit.builder()
+                                    .withSnapshot(snapshotId)
+                                    .withPartition(entry.getKey())
+                                    .withBucket(0)
+                                    .withDataFiles(
+                                            entry.getValue().stream()
+                                                    .map(ManifestEntry::file)
+                                                    
.collect(Collectors.toList()))
+                                    .build());
             RecordReaderIterator<KeyValue> actualIterator = new 
RecordReaderIterator<>(reader);
             while (actualIterator.hasNext()) {
                 result.add(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
index 86362cb60..3c7b168f1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
@@ -53,7 +53,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends 
FileMetaFilterTestBase {
                     List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 6L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
@@ -87,7 +87,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends 
FileMetaFilterTestBase {
                             
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 6L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
@@ -120,7 +120,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends 
FileMetaFilterTestBase {
                     List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 6L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index c76a6e15d..703f82167 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -710,7 +710,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         SnapshotReader snapshotReader = 
table.newSnapshotReader().withKind(ScanKind.DELTA);
         List<DataSplit> splits0 = snapshotReader.read().dataSplits();
         assertThat(splits0).hasSize(1);
-        assertThat(splits0.get(0).files()).hasSize(1);
+        assertThat(splits0.get(0).dataFiles()).hasSize(1);
 
         write.write(rowData(1, 10, 1000L));
         write.write(rowData(1, 20, 2000L));
@@ -721,9 +721,9 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         List<DataSplit> splits1 = snapshotReader.read().dataSplits();
         assertThat(splits1).hasSize(1);
-        assertThat(splits1.get(0).files()).hasSize(1);
-        assertThat(splits1.get(0).files().get(0).fileName())
-                .isNotEqualTo(splits0.get(0).files().get(0).fileName());
+        assertThat(splits1.get(0).dataFiles()).hasSize(1);
+        assertThat(splits1.get(0).dataFiles().get(0).fileName())
+                .isNotEqualTo(splits0.get(0).dataFiles().get(0).fileName());
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
index 869dc8bca..5d6875ed1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
@@ -94,7 +94,7 @@ public class ChangelogWithKeyTableColumnTypeFileMetaTest 
extends ColumnTypeFileM
                             
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 3L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
index 8b576ecb9..a95bd63b6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
@@ -52,7 +52,7 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
                     List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 3L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
@@ -74,7 +74,7 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
 
                     List<DataFileMeta> fileMetaList =
                             splits.stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     assertThat(
                                     fileMetaList.stream()
@@ -126,7 +126,7 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
                             
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 2L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
@@ -157,7 +157,7 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
 
                     List<DataFileMeta> fileMetaList =
                             splits.stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     assertThat(
                                     fileMetaList.stream()
@@ -185,7 +185,7 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
                             
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 2L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
@@ -207,7 +207,7 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
 
                     List<DataFileMeta> fileMetaList =
                             splits.stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     assertThat(
                                     fileMetaList.stream()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java
index 14ccd403a..f2ab5b9a9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java
@@ -45,7 +45,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
                     checkFilterRowCount(toDataFileMetas(splits), 6L);
                     return splits.stream()
-                            .flatMap(s -> s.files().stream())
+                            .flatMap(s -> s.dataFiles().stream())
                             .collect(Collectors.toList());
                 },
                 (files, schemas) -> {
@@ -65,7 +65,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
 
                     List<DataFileMeta> fileMetaList =
                             splits.stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     assertThat(
                                     fileMetaList.stream()
@@ -124,7 +124,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
                     List<DataFileMeta> files =
                             
table.newSnapshotReader().withFilter(predicate).read().dataSplits()
                                     .stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     assertThat(files.size()).isGreaterThan(0);
                     checkFilterRowCount(files, 3L);
@@ -140,7 +140,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
                             
table.newSnapshotReader().withFilter(predicate).read().dataSplits();
                     List<DataFileMeta> filterFileMetas =
                             filterSplits.stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     checkFilterRowCount(filterFileMetas, 6L);
 
@@ -165,7 +165,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
                                     filterAllSplits.stream()
                                             .flatMap(
                                                     s ->
-                                                            s.files().stream()
+                                                            
s.dataFiles().stream()
                                                                     
.map(DataFileMeta::fileName))
                                             .collect(Collectors.toList()))
                             .containsAll(
@@ -179,7 +179,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
 
                     Set<String> filterFileNames = new HashSet<>();
                     for (DataSplit dataSplit : filterAllSplits) {
-                        for (DataFileMeta dataFileMeta : dataSplit.files()) {
+                        for (DataFileMeta dataFileMeta : 
dataSplit.dataFiles()) {
                             FieldStats[] fieldStats = 
getTableValueStats(dataFileMeta).fields(null);
                             int minValue = (Integer) fieldStats[1].minValue();
                             int maxValue = (Integer) fieldStats[1].maxValue();
@@ -205,7 +205,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
                     FileStoreTable table = createFileStoreTable(schemas);
                     List<DataFileMeta> files =
                             
table.newSnapshotReader().read().dataSplits().stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     assertThat(files.size()).isGreaterThan(0);
                     checkFilterRowCount(files, 6L);
@@ -225,7 +225,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
 
                     List<DataFileMeta> filterFileMetas =
                             filterSplits.stream()
-                                    .flatMap(s -> s.files().stream())
+                                    .flatMap(s -> s.dataFiles().stream())
                                     .collect(Collectors.toList());
                     List<String> fileNameList =
                             filterFileMetas.stream()
@@ -250,7 +250,7 @@ public abstract class FileMetaFilterTestBase extends 
SchemaEvolutionTableTestBas
 
                     Set<String> filterFileNames = new HashSet<>();
                     for (DataSplit dataSplit : allSplits) {
-                        for (DataFileMeta dataFileMeta : dataSplit.files()) {
+                        for (DataFileMeta dataFileMeta : 
dataSplit.dataFiles()) {
                             FieldStats[] fieldStats = 
getTableValueStats(dataFileMeta).fields(null);
                             Integer minValue = (Integer) 
fieldStats[3].minValue();
                             Integer maxValue = (Integer) 
fieldStats[3].maxValue();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index e3966f850..8d7f94140 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -522,7 +522,7 @@ public abstract class FileStoreTableTestBase {
 
         List<DataFileMeta> files =
                 table.newSnapshotReader().read().dataSplits().stream()
-                        .flatMap(split -> split.files().stream())
+                        .flatMap(split -> split.dataFiles().stream())
                         .collect(Collectors.toList());
         for (DataFileMeta file : files) {
             assertThat(file.level()).isEqualTo(0);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index ad76e557a..66f55eb38 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -429,7 +429,7 @@ public abstract class SchemaEvolutionTableTestBase {
     }
 
     protected static List<DataFileMeta> toDataFileMetas(List<DataSplit> 
splits) {
-        return splits.stream().flatMap(s -> 
s.files().stream()).collect(Collectors.toList());
+        return splits.stream().flatMap(s -> 
s.dataFiles().stream()).collect(Collectors.toList());
     }
 
     protected static void checkFilterRowCount(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 000ba6b2d..7fbd8f96d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -45,12 +45,12 @@ public class SplitTest {
             files.add(gen.next().meta);
         }
         DataSplit split =
-                new DataSplit(
-                        ThreadLocalRandom.current().nextLong(100),
-                        data.partition,
-                        data.bucket,
-                        files,
-                        false);
+                DataSplit.builder()
+                        
.withSnapshot(ThreadLocalRandom.current().nextLong(100))
+                        .withPartition(data.partition)
+                        .withBucket(data.bucket)
+                        .withDataFiles(files)
+                        .build();
 
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         split.serialize(new DataOutputViewStreamWrapper(out));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index f420a264b..e5201a1e8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -113,10 +113,10 @@ public class CompactActionITCase extends ActionITCaseBase 
{
         for (DataSplit split : splits) {
             if (split.partition().getInt(1) == 15) {
                 // compacted
-                Assertions.assertEquals(1, split.files().size());
+                Assertions.assertEquals(1, split.dataFiles().size());
             } else {
                 // not compacted
-                Assertions.assertEquals(2, split.files().size());
+                Assertions.assertEquals(2, split.dataFiles().size());
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 75d6e9b52..257f844c7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -126,10 +126,10 @@ public class CompactorSinkITCase extends AbstractTestBase 
{
             DataSplit dataSplit = (DataSplit) split;
             if (dataSplit.partition().getInt(1) == 15) {
                 // compacted
-                assertEquals(1, dataSplit.files().size());
+                assertEquals(1, dataSplit.dataFiles().size());
             } else {
                 // not compacted
-                assertEquals(2, dataSplit.files().size());
+                assertEquals(2, dataSplit.dataFiles().size());
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index bce90c3a9..f511ee2af 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -506,13 +506,25 @@ public class ContinuousFileSplitEnumeratorTest {
             int snapshotId, int bucket, List<DataFileMeta> files) {
         return new FileStoreSourceSplit(
                 UUID.randomUUID().toString(),
-                new DataSplit(snapshotId, row(1), bucket, files, true),
+                DataSplit.builder()
+                        .withSnapshot(snapshotId)
+                        .withPartition(row(1))
+                        .withBucket(bucket)
+                        .withDataFiles(files)
+                        .isStreaming(true)
+                        .build(),
                 0);
     }
 
     private static DataSplit createDataSplit(
             long snapshotId, int bucket, List<DataFileMeta> files) {
-        return new DataSplit(snapshotId, row(1), bucket, files, true);
+        return DataSplit.builder()
+                .withSnapshot(snapshotId)
+                .withPartition(row(1))
+                .withBucket(bucket)
+                .withDataFiles(files)
+                .isStreaming(true)
+                .build();
     }
 
     private static class Builder {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 87b082011..2fb1f090a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -83,7 +83,6 @@ public class FileStoreSourceSplitGeneratorTest {
                 SnapshotReaderImpl.generateSplits(
                         1L,
                         false,
-                        false,
                         new SplitGenerator() {
                             @Override
                             public List<List<DataFileMeta>> splitForBatch(
@@ -134,7 +133,7 @@ public class FileStoreSourceSplitGeneratorTest {
         assertThat(((DataSplit) split.split()).bucket()).isEqualTo(bucket);
         assertThat(
                         ((DataSplit) split.split())
-                                .files().stream()
+                                .dataFiles().stream()
                                         .map(DataFileMeta::fileName)
                                         .collect(Collectors.toList()))
                 .isEqualTo(files);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index f27bb281d..d0cafbb95 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -117,8 +117,15 @@ public class FileStoreSourceSplitSerializerTest {
             List<DataFileMeta> files,
             boolean isIncremental,
             long recordsToSkip) {
-        return new FileStoreSourceSplit(
-                id, new DataSplit(1L, partition, bucket, files, 
isIncremental), recordsToSkip);
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(1)
+                        .withPartition(partition)
+                        .withBucket(bucket)
+                        .withDataFiles(files)
+                        .isStreaming(isIncremental)
+                        .build();
+        return new FileStoreSourceSplit(id, split, recordsToSkip);
     }
 
     private static FileStoreSourceSplit 
serializeAndDeserialize(FileStoreSourceSplit split)
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
index ac99902c9..c1c0efdc9 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
@@ -65,7 +65,7 @@ public class PaimonInputSplit extends FileSplit {
 
     @Override
     public long getLength() {
-        return split.files().stream().mapToLong(DataFileMeta::fileSize).sum();
+        return 
split.dataFiles().stream().mapToLong(DataFileMeta::fileSize).sum();
     }
 
     @Override
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
index ecf683e46..908fce7c1 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
@@ -51,18 +51,18 @@ public class PaimonInputSplitTest {
         }
 
         BinaryRow wantedPartition = generated.get(0).partition;
-        PaimonInputSplit split =
-                new PaimonInputSplit(
-                        tempDir.toString(),
-                        new DataSplit(
-                                ThreadLocalRandom.current().nextLong(100),
-                                wantedPartition,
-                                0,
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        
.withSnapshot(ThreadLocalRandom.current().nextLong(100))
+                        .withPartition(wantedPartition)
+                        .withBucket(0)
+                        .withDataFiles(
                                 generated.stream()
                                         .filter(d -> 
d.partition.equals(wantedPartition))
                                         .map(d -> d.meta)
-                                        .collect(Collectors.toList()),
-                                false));
+                                        .collect(Collectors.toList()))
+                        .build();
+        PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), 
dataSplit);
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream output = new DataOutputStream(baos);

Reply via email to