This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 19e9b1366 [core] Move convertToRawFiles method from TableScan to Split 
(#2361)
19e9b1366 is described below

commit 19e9b136682b285c40c2efc04decc2ccf8391878
Author: tsreaper <[email protected]>
AuthorDate: Wed Nov 22 11:47:53 2023 +0800

    [core] Move convertToRawFiles method from TableScan to Split (#2361)
---
 .../table/source/AbstractInnerTableScan.java       |   9 --
 .../org/apache/paimon/table/source/DataSplit.java  |  38 +++++-
 .../org/apache/paimon/table/source/RawFile.java    |  21 +++
 .../java/org/apache/paimon/table/source/Split.java |  11 ++
 .../org/apache/paimon/table/source/TableScan.java  |   9 --
 .../table/source/snapshot/SnapshotReader.java      |   8 --
 .../table/source/snapshot/SnapshotReaderImpl.java  | 143 ++++++++++-----------
 .../apache/paimon/table/system/AuditLogTable.java  |   9 --
 .../table/source/snapshot/SnapshotReaderTest.java  |  16 +--
 .../source/FileStoreSourceSplitGeneratorTest.java  | 121 ++++++-----------
 10 files changed, 185 insertions(+), 200 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 277e01a2e..501630d80 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -197,13 +197,4 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
     public List<BinaryRow> listPartitions() {
         return snapshotReader.partitions();
     }
-
-    @Override
-    public Optional<List<RawFile>> convertToRawFiles(Split split) {
-        if (split instanceof DataSplit) {
-            return snapshotReader.convertToRawFiles((DataSplit) split);
-        } else {
-            return Optional.empty();
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 87898154c..5226739df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -31,8 +31,10 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.OptionalLong;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -40,7 +42,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Input splits. Needed by most batch computation engines. */
 public class DataSplit implements Split {
 
-    private static final long serialVersionUID = 3L;
+    private static final long serialVersionUID = 4L;
 
     private long snapshotId = 0;
     private boolean isStreaming = false;
@@ -50,6 +52,8 @@ public class DataSplit implements Split {
     private int bucket = -1;
     private List<DataFileMeta> dataFiles;
 
+    private List<RawFile> rawFiles = Collections.emptyList();
+
     public DataSplit() {}
 
     public long snapshotId() {
@@ -89,6 +93,15 @@ public class DataSplit implements Split {
         return rowCount;
     }
 
+    @Override
+    public Optional<List<RawFile>> convertToRawFiles() {
+        if (rawFiles.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return Optional.of(rawFiles);
+        }
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -102,12 +115,13 @@ public class DataSplit implements Split {
                 && Objects.equals(partition, split.partition)
                 && Objects.equals(beforeFiles, split.beforeFiles)
                 && Objects.equals(dataFiles, split.dataFiles)
-                && isStreaming == split.isStreaming;
+                && isStreaming == split.isStreaming
+                && Objects.equals(rawFiles, split.rawFiles);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(partition, bucket, beforeFiles, dataFiles, 
isStreaming);
+        return Objects.hash(partition, bucket, beforeFiles, dataFiles, 
isStreaming, rawFiles);
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -125,6 +139,7 @@ public class DataSplit implements Split {
         this.beforeFiles = other.beforeFiles;
         this.dataFiles = other.dataFiles;
         this.isStreaming = other.isStreaming;
+        this.rawFiles = other.rawFiles;
     }
 
     public void serialize(DataOutputView out) throws IOException {
@@ -144,6 +159,11 @@ public class DataSplit implements Split {
         }
 
         out.writeBoolean(isStreaming);
+
+        out.writeInt(rawFiles.size());
+        for (RawFile rawFile : rawFiles) {
+            rawFile.serialize(out);
+        }
     }
 
     public static DataSplit deserialize(DataInputView in) throws IOException {
@@ -166,6 +186,12 @@ public class DataSplit implements Split {
 
         boolean isStreaming = in.readBoolean();
 
+        int rawFileNum = in.readInt();
+        List<RawFile> rawFiles = new ArrayList<>();
+        for (int i = 0; i < rawFileNum; i++) {
+            rawFiles.add(RawFile.deserialize(in));
+        }
+
         return builder()
                 .withSnapshot(snapshotId)
                 .withPartition(partition)
@@ -173,6 +199,7 @@ public class DataSplit implements Split {
                 .withBeforeFiles(beforeFiles)
                 .withDataFiles(dataFiles)
                 .isStreaming(isStreaming)
+                .rawFiles(rawFiles)
                 .build();
     }
 
@@ -215,6 +242,11 @@ public class DataSplit implements Split {
             return this;
         }
 
+        public Builder rawFiles(List<RawFile> rawFiles) {
+            this.split.rawFiles = rawFiles;
+            return this;
+        }
+
         public DataSplit build() {
             checkArgument(split.partition != null);
             checkArgument(split.bucket != -1);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
index a1c1f07d2..2923304e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
@@ -20,7 +20,10 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
 
+import java.io.IOException;
 import java.util.Objects;
 
 /**
@@ -73,6 +76,24 @@ public class RawFile {
         return schemaId;
     }
 
+    public void serialize(DataOutputView out) throws IOException {
+        out.writeUTF(path);
+        out.writeLong(offset);
+        out.writeLong(length);
+        out.writeUTF(format);
+        out.writeLong(schemaId);
+    }
+
+    public static RawFile deserialize(DataInputView in) throws IOException {
+        String path = in.readUTF();
+        long offset = in.readLong();
+        long length = in.readLong();
+        String format = in.readUTF();
+        long schemaId = in.readLong();
+
+        return new RawFile(path, offset, length, format, schemaId);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof RawFile)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
index b4cbbbd4e..ee96943a4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.annotation.Public;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
 
 /**
  * An input split for reading.
@@ -31,4 +33,13 @@ import java.io.Serializable;
 public interface Split extends Serializable {
 
     long rowCount();
+
+    /**
+     * If all files in this split can be read without merging, returns an 
{@link Optional} wrapping
+     * a list of {@link RawFile}s to be read without merging. Otherwise, 
returns {@link
+     * Optional#empty()}.
+     */
+    default Optional<List<RawFile>> convertToRawFiles() {
+        return Optional.empty();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
index e9e876dd1..ba8b0b5e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.table.Table;
 
 import java.util.List;
-import java.util.Optional;
 
 /**
  * A scan of {@link Table} to generate {@link Split} splits.
@@ -39,14 +38,6 @@ public interface TableScan {
     /** Get partitions from simple manifest entries. */
     List<BinaryRow> listPartitions();
 
-    /**
-     * If all files in this split can be read without merging, returns an 
{@link Optional} wrapping
-     * a list of {@link RawFile}s, otherwise returns {@link Optional#empty()}.
-     */
-    default Optional<List<RawFile>> convertToRawFiles(Split split) {
-        return Optional.empty();
-    }
-
     /**
      * Plan of scan.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 10d51684b..4af797c0a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
@@ -36,7 +35,6 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 /** Read splits from specified {@link Snapshot} with given configuration. */
 public interface SnapshotReader {
@@ -76,12 +74,6 @@ public interface SnapshotReader {
     /** Get partitions from a snapshot. */
     List<BinaryRow> partitions();
 
-    /**
-     * If all files in this split can be read without merging, returns an 
{@link Optional} wrapping
-     * a list of {@link RawFile}s, otherwise returns {@link Optional#empty()}.
-     */
-    Optional<List<RawFile>> convertToRawFiles(DataSplit split);
-
     /** Result plan of this scan. */
     interface Plan extends TableScan.Plan {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index a5f5cc36e..1c6bdc64d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.consumer.ConsumerManager;
@@ -258,6 +257,40 @@ public class SnapshotReaderImpl implements SnapshotReader {
         };
     }
 
+    private List<DataSplit> generateSplits(
+            long snapshotId,
+            boolean isStreaming,
+            SplitGenerator splitGenerator,
+            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
+        List<DataSplit> splits = new ArrayList<>();
+        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
+                groupedDataFiles.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
+                int bucket = bucketEntry.getKey();
+                List<DataFileMeta> bucketFiles = bucketEntry.getValue();
+                DataSplit.Builder builder =
+                        DataSplit.builder()
+                                .withSnapshot(snapshotId)
+                                .withPartition(partition)
+                                .withBucket(bucket)
+                                .isStreaming(isStreaming);
+                List<List<DataFileMeta>> splitGroups =
+                        isStreaming
+                                ? splitGenerator.splitForStreaming(bucketFiles)
+                                : splitGenerator.splitForBatch(bucketFiles);
+                for (List<DataFileMeta> dataFiles : splitGroups) {
+                    splits.add(
+                            builder.withDataFiles(dataFiles)
+                                    .rawFiles(convertToRawFiles(partition, 
bucket, dataFiles))
+                                    .build());
+                }
+            }
+        }
+        return splits;
+    }
+
     @Override
     public List<BinaryRow> partitions() {
         List<ManifestEntry> entryList = scan.plan().files();
@@ -275,47 +308,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 .collect(Collectors.toList());
     }
 
-    @Override
-    public Optional<List<RawFile>> convertToRawFiles(DataSplit split) {
-        String bucketPath = pathFactory.bucketPath(split.partition(), 
split.bucket()).toString();
-        List<DataFileMeta> dataFiles = split.dataFiles();
-
-        // bucket with only one file can be returned
-        if (dataFiles.size() == 1) {
-            return Optional.of(
-                    Collections.singletonList(makeRawTableFile(bucketPath, 
dataFiles.get(0))));
-        }
-
-        // append only files can be returned
-        if (tableSchema.primaryKeys().isEmpty()) {
-            return Optional.of(makeRawTableFiles(bucketPath, dataFiles));
-        }
-
-        // bucket containing only one level (except level 0) can be returned
-        Set<Integer> levels =
-                
dataFiles.stream().map(DataFileMeta::level).collect(Collectors.toSet());
-        if (levels.size() == 1 && !levels.contains(0)) {
-            return Optional.of(makeRawTableFiles(bucketPath, dataFiles));
-        }
-
-        return Optional.empty();
-    }
-
-    private List<RawFile> makeRawTableFiles(String bucketPath, 
List<DataFileMeta> dataFiles) {
-        return dataFiles.stream()
-                .map(f -> makeRawTableFile(bucketPath, f))
-                .collect(Collectors.toList());
-    }
-
-    private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
-        return new RawFile(
-                bucketPath + "/" + meta.fileName(),
-                0,
-                meta.fileSize(),
-                new 
CoreOptions(tableSchema.options()).formatType().toString().toLowerCase(),
-                meta.schemaId());
-    }
-
     @Override
     public Plan readChanges() {
         withMode(ScanMode.DELTA);
@@ -368,6 +360,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                                 .withBeforeFiles(before)
                                 .withDataFiles(data)
                                 .isStreaming(isStreaming)
+                                .rawFiles(convertToRawFiles(part, bucket, 
data))
                                 .build();
                 splits.add(split);
             }
@@ -414,36 +407,42 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
         return lazyPartitionComparator;
     }
 
-    @VisibleForTesting
-    public static List<DataSplit> generateSplits(
-            long snapshotId,
-            boolean isStreaming,
-            SplitGenerator splitGenerator,
-            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
-        List<DataSplit> splits = new ArrayList<>();
-        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
-                groupedDataFiles.entrySet()) {
-            BinaryRow partition = entry.getKey();
-            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
-            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
-                int bucket = bucketEntry.getKey();
-                List<DataFileMeta> bucketFiles = bucketEntry.getValue();
-                DataSplit.Builder builder =
-                        DataSplit.builder()
-                                .withSnapshot(snapshotId)
-                                .withPartition(partition)
-                                .withBucket(bucket)
-                                .isStreaming(isStreaming);
-                List<List<DataFileMeta>> splitGroups =
-                        isStreaming
-                                ? splitGenerator.splitForStreaming(bucketFiles)
-                                : splitGenerator.splitForBatch(bucketFiles);
-                splitGroups.stream()
-                        .map(builder::withDataFiles)
-                        .map(DataSplit.Builder::build)
-                        .forEach(splits::add);
-            }
+    private List<RawFile> convertToRawFiles(
+            BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
+        String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
+
+        // bucket with only one file can be returned
+        if (dataFiles.size() == 1) {
+            return Collections.singletonList(makeRawTableFile(bucketPath, 
dataFiles.get(0)));
         }
-        return splits;
+
+        // append only files can be returned
+        if (tableSchema.primaryKeys().isEmpty()) {
+            return makeRawTableFiles(bucketPath, dataFiles);
+        }
+
+        // bucket containing only one level (except level 0) can be returned
+        Set<Integer> levels =
+                
dataFiles.stream().map(DataFileMeta::level).collect(Collectors.toSet());
+        if (levels.size() == 1 && !levels.contains(0)) {
+            return makeRawTableFiles(bucketPath, dataFiles);
+        }
+
+        return Collections.emptyList();
+    }
+
+    private List<RawFile> makeRawTableFiles(String bucketPath, 
List<DataFileMeta> dataFiles) {
+        return dataFiles.stream()
+                .map(f -> makeRawTableFile(bucketPath, f))
+                .collect(Collectors.toList());
+    }
+
+    private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
+        return new RawFile(
+                bucketPath + "/" + meta.fileName(),
+                0,
+                meta.fileSize(),
+                new 
CoreOptions(tableSchema.options()).formatType().toString().toLowerCase(),
+                meta.schemaId());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 990c71257..f8e84fa72 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -37,11 +37,9 @@ import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
@@ -281,13 +279,6 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         public List<BinaryRow> partitions() {
             return snapshotReader.partitions();
         }
-
-        @Override
-        public Optional<List<RawFile>> convertToRawFiles(DataSplit split) {
-            // we can't return snapshotReader.convertToRawFiles(split),
-            // because AuditLogTable must use its special reader
-            return Optional.empty();
-        }
     }
 
     private class AuditLogBatchScan implements InnerTableScan {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
index 9de27a222..edb3f0a86 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
@@ -70,7 +70,7 @@ public class SnapshotReaderTest {
     }
 
     @Test
-    public void testGetPrimaryKeyRawTableFiles() throws Exception {
+    public void testGetPrimaryKeyRawFiles() throws Exception {
         RowType rowType =
                 RowType.of(
                         new DataType[] {DataTypes.STRING(), DataTypes.INT(), 
DataTypes.BIGINT()},
@@ -98,7 +98,7 @@ public class SnapshotReaderTest {
             assertThat(dataSplit.dataFiles()).hasSize(1);
             DataFileMeta meta = dataSplit.dataFiles().get(0);
             String partition = dataSplit.partition().getString(0).toString();
-            assertThat(reader.convertToRawFiles(dataSplit))
+            assertThat(dataSplit.convertToRawFiles())
                     .hasValue(
                             Collections.singletonList(
                                     new RawFile(
@@ -123,7 +123,7 @@ public class SnapshotReaderTest {
         assertThat(dataSplits).hasSize(2);
         for (DataSplit dataSplit : dataSplits) {
             assertThat(dataSplit.dataFiles()).hasSize(2);
-            assertThat(reader.convertToRawFiles(dataSplit)).isNotPresent();
+            assertThat(dataSplit.convertToRawFiles()).isNotPresent();
         }
 
         // compact all files
@@ -146,7 +146,7 @@ public class SnapshotReaderTest {
             assertThat(dataSplit.dataFiles()).hasSize(1);
             DataFileMeta meta = dataSplit.dataFiles().get(0);
             String partition = dataSplit.partition().getString(0).toString();
-            assertThat(reader.convertToRawFiles(dataSplit))
+            assertThat(dataSplit.convertToRawFiles())
                     .hasValue(
                             Collections.singletonList(
                                     new RawFile(
@@ -171,7 +171,7 @@ public class SnapshotReaderTest {
         assertThat(dataSplits).hasSize(2);
         for (DataSplit dataSplit : dataSplits) {
             assertThat(dataSplit.dataFiles()).hasSize(2);
-            assertThat(reader.convertToRawFiles(dataSplit)).isNotPresent();
+            assertThat(dataSplit.convertToRawFiles()).isNotPresent();
         }
 
         write.close();
@@ -179,7 +179,7 @@ public class SnapshotReaderTest {
     }
 
     @Test
-    public void testGetAppendOnlyRawTableFiles() throws Exception {
+    public void testGetAppendOnlyRawFiles() throws Exception {
         RowType rowType =
                 RowType.of(
                         new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
@@ -205,7 +205,7 @@ public class SnapshotReaderTest {
         DataSplit dataSplit = dataSplits.get(0);
         assertThat(dataSplit.dataFiles()).hasSize(1);
         DataFileMeta meta = dataSplit.dataFiles().get(0);
-        assertThat(reader.convertToRawFiles(dataSplit))
+        assertThat(dataSplit.convertToRawFiles())
                 .hasValue(
                         Collections.singletonList(
                                 new RawFile(
@@ -237,7 +237,7 @@ public class SnapshotReaderTest {
         assertThat(dataSplit.dataFiles()).hasSize(2);
         DataFileMeta meta0 = dataSplit.dataFiles().get(0);
         DataFileMeta meta1 = dataSplit.dataFiles().get(1);
-        assertThat(reader.convertToRawFiles(dataSplit))
+        assertThat(dataSplit.convertToRawFiles())
                 .hasValue(
                         Arrays.asList(
                                 new RawFile(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 880f3288e..5015e9219 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -19,20 +19,13 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.ScanMode;
-import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 
 import org.junit.jupiter.api.Test;
 
-import javax.annotation.Nullable;
-
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -47,62 +40,20 @@ public class FileStoreSourceSplitGeneratorTest {
 
     @Test
     public void test() {
-        FileStoreScan.Plan plan =
-                new FileStoreScan.Plan() {
-                    @Override
-                    public Long watermark() {
-                        return null;
-                    }
-
-                    @Nullable
-                    @Override
-                    public Long snapshotId() {
-                        return 1L;
-                    }
-
-                    @Override
-                    public ScanMode scanMode() {
-                        return ScanMode.ALL;
-                    }
-
-                    @Override
-                    public List<ManifestEntry> files() {
-                        return Arrays.asList(
-                                makeEntry(1, 0, "f0"),
-                                makeEntry(1, 0, "f1"),
-                                makeEntry(1, 1, "f2"),
-                                makeEntry(2, 0, "f3"),
-                                makeEntry(2, 0, "f4"),
-                                makeEntry(2, 0, "f5"),
-                                makeEntry(2, 1, "f6"),
-                                makeEntry(3, 0, "f7"),
-                                makeEntry(3, 1, "f8"),
-                                makeEntry(4, 0, "f9"),
-                                makeEntry(4, 1, "f10"),
-                                makeEntry(5, 0, "f11"),
-                                makeEntry(5, 1, "f12"),
-                                makeEntry(6, 0, "f13"),
-                                makeEntry(6, 1, "f14"));
-                    }
-                };
         List<DataSplit> scanSplits =
-                SnapshotReaderImpl.generateSplits(
-                        1L,
-                        false,
-                        new SplitGenerator() {
-                            @Override
-                            public List<List<DataFileMeta>> splitForBatch(
-                                    List<DataFileMeta> files) {
-                                return Collections.singletonList(files);
-                            }
-
-                            @Override
-                            public List<List<DataFileMeta>> splitForStreaming(
-                                    List<DataFileMeta> files) {
-                                return null;
-                            }
-                        },
-                        
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
+                Arrays.asList(
+                        dataSplit(1, 0, "f0", "f1"),
+                        dataSplit(1, 1, "f2"),
+                        dataSplit(2, 0, "f3", "f4", "f5"),
+                        dataSplit(2, 1, "f6"),
+                        dataSplit(3, 0, "f7"),
+                        dataSplit(3, 1, "f8"),
+                        dataSplit(4, 0, "f9"),
+                        dataSplit(4, 1, "f10"),
+                        dataSplit(5, 0, "f11"),
+                        dataSplit(5, 1, "f12"),
+                        dataSplit(6, 0, "f13"),
+                        dataSplit(6, 1, "f14"));
         DataFilePlan tableScanPlan = new DataFilePlan(scanSplits);
 
         List<FileStoreSourceSplit> splits =
@@ -145,24 +96,30 @@ public class FileStoreSourceSplitGeneratorTest {
                 .isEqualTo(files);
     }
 
-    private ManifestEntry makeEntry(int partition, int bucket, String 
fileName) {
-        return new ManifestEntry(
-                FileKind.ADD,
-                row(partition), // not used
-                bucket, // not used
-                0, // not used
-                new DataFileMeta(
-                        fileName,
-                        0, // not used
-                        0, // not used
-                        null, // not used
-                        null, // not used
-                        StatsTestUtils.newEmptyTableStats(), // not used
-                        StatsTestUtils.newEmptyTableStats(), // not used
-                        0, // not used
-                        0, // not used
-                        0, // not used
-                        0 // not used
-                        ));
+    private DataSplit dataSplit(int partition, int bucket, String... 
fileNames) {
+        List<DataFileMeta> metas = new ArrayList<>();
+        for (String fileName : fileNames) {
+            metas.add(
+                    new DataFileMeta(
+                            fileName,
+                            0, // not used
+                            0, // not used
+                            null, // not used
+                            null, // not used
+                            StatsTestUtils.newEmptyTableStats(), // not used
+                            StatsTestUtils.newEmptyTableStats(), // not used
+                            0, // not used
+                            0, // not used
+                            0, // not used
+                            0 // not used
+                            ));
+        }
+        return DataSplit.builder()
+                .withSnapshot(1)
+                .withPartition(row(partition))
+                .withBucket(bucket)
+                .isStreaming(false)
+                .withDataFiles(metas)
+                .build();
     }
 }

Reply via email to