This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit ea176b9dcdaeef67cc25705835edbe3043171dad
Author: Houliang Qi <[email protected]>
AuthorDate: Tue Dec 24 13:45:06 2024 +0800

    [core] Support read external path in DataFileMeta (#4761)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java |  4 +-
 .../java/org/apache/paimon/io/DataFileMeta.java    | 18 +++---
 .../paimon/io/DataFileMeta10LegacySerializer.java  |  2 +-
 .../org/apache/paimon/io/DataFilePathFactory.java  | 23 ++++++++
 .../org/apache/paimon/io/FileIndexEvaluator.java   |  2 +-
 .../apache/paimon/io/KeyValueDataFileWriter.java   |  3 +-
 .../paimon/io/KeyValueFileReaderFactory.java       | 23 ++++++--
 .../paimon/io/KeyValueFileWriterFactory.java       | 10 ++--
 .../org/apache/paimon/io/RowDataFileWriter.java    |  3 +-
 .../apache/paimon/manifest/ExpireFileEntry.java    | 18 +++++-
 .../java/org/apache/paimon/manifest/FileEntry.java | 15 ++++-
 .../org/apache/paimon/manifest/ManifestEntry.java  |  8 ++-
 .../apache/paimon/manifest/SimpleFileEntry.java    | 24 ++++++--
 .../paimon/manifest/SimpleFileEntrySerializer.java | 68 ----------------------
 .../apache/paimon/mergetree/MergeTreeWriter.java   | 14 +++--
 .../org/apache/paimon/migrate/FileMetaUtils.java   |  1 +
 .../paimon/operation/FileStoreCommitImpl.java      |  2 +-
 .../apache/paimon/operation/RawFileSplitRead.java  |  5 +-
 .../apache/paimon/table/query/LocalTableQuery.java |  7 +--
 .../org/apache/paimon/table/source/DataSplit.java  |  4 +-
 .../org/apache/paimon/table/system/FilesTable.java | 11 +++-
 .../apache/paimon/append/AppendOnlyWriterTest.java |  7 ++-
 .../paimon/io/KeyValueFileReadWriteTest.java       |  5 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |  6 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  6 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  2 +-
 .../paimon/table/AppendOnlyFileStoreTableTest.java |  2 +-
 .../org/apache/paimon/table/source/SplitTest.java  |  1 +
 .../apache/paimon/flink/clone/PickFilesUtil.java   |  2 +-
 .../compact/changelog/ChangelogCompactTask.java    | 11 ++--
 .../paimon/flink/sink/RewriteFileIndexSink.java    | 12 ++--
 .../org/apache/paimon/spark/ScanHelperTest.scala   |  2 +
 32 files changed, 176 insertions(+), 145 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index a3087e3628..4c313dd655 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -244,7 +244,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         for (DataFileMeta file : compactAfter) {
             // appendOnlyCompactManager will rewrite the file and no file 
upgrade will occur, so we
             // can directly delete the file in compactAfter.
-            fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+            fileIO.deleteQuietly(pathFactory.toPath(file));
         }
 
         sinkWriter.close();
@@ -271,7 +271,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
             } finally {
                 // remove small files
                 for (DataFileMeta file : files) {
-                    fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+                    fileIO.deleteQuietly(pathFactory.toPath(file));
                 }
             }
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 3be09ea6c2..459cd788de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -135,7 +135,8 @@ public class DataFileMeta {
             List<String> extraFiles,
             @Nullable byte[] embeddedIndex,
             @Nullable FileSource fileSource,
-            @Nullable List<String> valueStatsCols) {
+            @Nullable List<String> valueStatsCols,
+            @Nullable String externalPath) {
         return new DataFileMeta(
                 fileName,
                 fileSize,
@@ -154,7 +155,7 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                null);
+                externalPath);
     }
 
     public DataFileMeta(
@@ -173,7 +174,8 @@ public class DataFileMeta {
             @Nullable Long deleteRowCount,
             @Nullable byte[] embeddedIndex,
             @Nullable FileSource fileSource,
-            @Nullable List<String> valueStatsCols) {
+            @Nullable List<String> valueStatsCols,
+            @Nullable String externalPath) {
         this(
                 fileName,
                 fileSize,
@@ -192,7 +194,7 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                null);
+                externalPath);
     }
 
     public DataFileMeta(
@@ -403,7 +405,7 @@ public class DataFileMeta {
                 externalPath);
     }
 
-    public DataFileMeta rename(String newFileName) {
+    public DataFileMeta rename(String newExternalPath, String newFileName) {
         return new DataFileMeta(
                 newFileName,
                 fileSize,
@@ -422,7 +424,7 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                newExternalPath);
     }
 
     public DataFileMeta copyWithoutStats() {
@@ -449,8 +451,8 @@ public class DataFileMeta {
 
     public List<Path> collectFiles(DataFilePathFactory pathFactory) {
         List<Path> paths = new ArrayList<>();
-        paths.add(pathFactory.toPath(fileName));
-        extraFiles.forEach(f -> paths.add(pathFactory.toPath(f)));
+        paths.add(pathFactory.toPath(this));
+        extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, 
f)));
         return paths;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
index 68ccba6ea3..518db7c658 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
@@ -46,7 +46,7 @@ import static 
org.apache.paimon.utils.SerializationUtils.newBytesType;
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
-/** Serializer for {@link DataFileMeta} with 0.9 version. */
+/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */
 public class DataFileMeta10LegacySerializer implements Serializable {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index b632d44c94..daeb9f52ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -78,10 +78,33 @@ public class DataFilePathFactory {
         return new Path(parent, name);
     }
 
+    @VisibleForTesting
     public Path toPath(String fileName) {
         return new Path(parent + "/" + fileName);
     }
 
+    /**
+     * for read purpose.
+     *
+     * @param fileName the file name
+     * @param externalPath the external path, if null, it will use the parent 
path
+     * @return the file's path
+     */
+    public Path toPath(String fileName, String externalPath) {
+        return new Path((externalPath == null ? parent : externalPath) + "/" + 
fileName);
+    }
+
+    public Path toPath(DataFileMeta dataFileMeta) {
+        String externalPath = dataFileMeta.externalPath();
+        String fileName = dataFileMeta.fileName();
+        return new Path((externalPath == null ? parent : externalPath) + "/" + 
fileName);
+    }
+
+    public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
+        String externalPath = dataFileMeta.externalPath();
+        return new Path((externalPath == null ? parent : externalPath) + "/" + 
extraFile);
+    }
+
     @VisibleForTesting
     public String uuid() {
         return uuid;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 530b871653..9055097d37 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -62,7 +62,7 @@ public class FileIndexEvaluator {
                 // go to file index check
                 try (FileIndexPredicate predicate =
                         new FileIndexPredicate(
-                                dataFilePathFactory.toPath(indexFiles.get(0)),
+                                dataFilePathFactory.toExtraFilePath(file, 
indexFiles.get(0)),
                                 fileIO,
                                 dataSchema.logicalRowType())) {
                     return predicate.evaluate(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 651c6a6f7b..f78d755648 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -195,7 +195,8 @@ public abstract class KeyValueDataFileWriter
                 deleteRecordCount,
                 indexResult.embeddedIndexBytes(),
                 fileSource,
-                valueStatsPair.getKey());
+                valueStatsPair.getKey(),
+                null);
     }
 
     abstract Pair<SimpleColStats[], SimpleColStats[]> 
fetchKeyValueStats(SimpleColStats[] rowStats);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 7e272fc97c..9d65a54113 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -97,16 +98,25 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
 
     @Override
     public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws 
IOException {
-        return createRecordReader(file.schemaId(), file.fileName(), 
file.fileSize(), file.level());
+        return createRecordReader(
+                file.schemaId(),
+                file.fileName(),
+                file.fileSize(),
+                file.level(),
+                file.externalPath());
     }
 
+    @VisibleForTesting
     public RecordReader<KeyValue> createRecordReader(
-            long schemaId, String fileName, long fileSize, int level) throws 
IOException {
+            long schemaId, String fileName, long fileSize, int level, String 
externalPath)
+            throws IOException {
         if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
             return new AsyncRecordReader<>(
-                    () -> createRecordReader(schemaId, fileName, level, false, 
2, fileSize));
+                    () ->
+                            createRecordReader(
+                                    schemaId, fileName, level, false, 2, 
fileSize, externalPath));
         }
-        return createRecordReader(schemaId, fileName, level, true, null, 
fileSize);
+        return createRecordReader(schemaId, fileName, level, true, null, 
fileSize, externalPath);
     }
 
     private FileRecordReader<KeyValue> createRecordReader(
@@ -115,7 +125,8 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
             int level,
             boolean reuseFormat,
             @Nullable Integer orcPoolSize,
-            long fileSize)
+            long fileSize,
+            String externalPath)
             throws IOException {
         String formatIdentifier = 
DataFilePathFactory.formatIdentifier(fileName);
 
@@ -132,7 +143,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                                 new FormatKey(schemaId, formatIdentifier),
                                 key -> formatSupplier.get())
                         : formatSupplier.get();
-        Path filePath = pathFactory.toPath(fileName);
+        Path filePath = pathFactory.toPath(fileName, externalPath);
 
         FileRecordReader<InternalRow> fileRecordReader =
                 new DataFileRecordReader(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index a6aae3985b..500320c249 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -142,14 +142,14 @@ public class KeyValueFileWriterFactory {
                         fileIndexOptions);
     }
 
-    public void deleteFile(String filename, int level) {
-        
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename));
+    public void deleteFile(DataFileMeta meta, int level) {
+        fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
     }
 
-    public void copyFile(String sourceFileName, String targetFileName, int 
level)
+    public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int 
level)
             throws IOException {
-        Path sourcePath = 
formatContext.pathFactory(level).toPath(sourceFileName);
-        Path targetPath = 
formatContext.pathFactory(level).toPath(targetFileName);
+        Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
+        Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
         fileIO.copyFile(sourcePath, targetPath, true);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 8c2e8ec949..cd46d67e3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -124,6 +124,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
                         : 
Collections.singletonList(indexResult.independentIndexFile()),
                 indexResult.embeddedIndexBytes(),
                 fileSource,
-                statsPair.getKey());
+                statsPair.getKey(),
+                null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
index 060360623c..5d6d68144e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -41,8 +41,19 @@ public class ExpireFileEntry extends SimpleFileEntry {
             @Nullable byte[] embeddedIndex,
             BinaryRow minKey,
             BinaryRow maxKey,
-            @Nullable FileSource fileSource) {
-        super(kind, partition, bucket, level, fileName, extraFiles, 
embeddedIndex, minKey, maxKey);
+            @Nullable FileSource fileSource,
+            @Nullable String externalPath) {
+        super(
+                kind,
+                partition,
+                bucket,
+                level,
+                fileName,
+                extraFiles,
+                embeddedIndex,
+                minKey,
+                maxKey,
+                externalPath);
         this.fileSource = fileSource;
     }
 
@@ -61,7 +72,8 @@ public class ExpireFileEntry extends SimpleFileEntry {
                 entry.file().embeddedIndex(),
                 entry.minKey(),
                 entry.maxKey(),
-                entry.file().fileSource().orElse(null));
+                entry.file().fileSource().orElse(null),
+                entry.externalPath());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index a2569beac6..738776438b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -54,6 +54,8 @@ public interface FileEntry {
 
     String fileName();
 
+    String externalPath();
+
     Identifier identifier();
 
     BinaryRow minKey();
@@ -73,6 +75,7 @@ public interface FileEntry {
         public final String fileName;
         public final List<String> extraFiles;
         @Nullable private final byte[] embeddedIndex;
+        @Nullable public final String externalPath;
 
         /* Cache the hash code for the string */
         private Integer hash;
@@ -83,13 +86,15 @@ public interface FileEntry {
                 int level,
                 String fileName,
                 List<String> extraFiles,
-                @Nullable byte[] embeddedIndex) {
+                @Nullable byte[] embeddedIndex,
+                @Nullable String externalPath) {
             this.partition = partition;
             this.bucket = bucket;
             this.level = level;
             this.fileName = fileName;
             this.extraFiles = extraFiles;
             this.embeddedIndex = embeddedIndex;
+            this.externalPath = externalPath;
         }
 
         @Override
@@ -106,7 +111,8 @@ public interface FileEntry {
                     && Objects.equals(partition, that.partition)
                     && Objects.equals(fileName, that.fileName)
                     && Objects.equals(extraFiles, that.extraFiles)
-                    && Objects.deepEquals(embeddedIndex, that.embeddedIndex);
+                    && Objects.deepEquals(embeddedIndex, that.embeddedIndex)
+                    && Objects.deepEquals(externalPath, that.externalPath);
         }
 
         @Override
@@ -119,7 +125,8 @@ public interface FileEntry {
                                 level,
                                 fileName,
                                 extraFiles,
-                                Arrays.hashCode(embeddedIndex));
+                                Arrays.hashCode(embeddedIndex),
+                                externalPath);
             }
             return hash;
         }
@@ -138,6 +145,8 @@ public interface FileEntry {
                     + extraFiles
                     + ", embeddedIndex="
                     + Arrays.toString(embeddedIndex)
+                    + ", externalPath="
+                    + externalPath
                     + '}';
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 626e0a5d46..d4748451d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -92,6 +92,11 @@ public class ManifestEntry implements FileEntry {
         return file.fileName();
     }
 
+    @Override
+    public String externalPath() {
+        return file.externalPath();
+    }
+
     @Override
     public BinaryRow minKey() {
         return file.minKey();
@@ -123,7 +128,8 @@ public class ManifestEntry implements FileEntry {
                 file.level(),
                 file.fileName(),
                 file.extraFiles(),
-                file.embeddedIndex());
+                file.embeddedIndex(),
+                file.externalPath());
     }
 
     public ManifestEntry copyWithoutStats() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index fdaed2b85a..f86bded52d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -38,6 +38,7 @@ public class SimpleFileEntry implements FileEntry {
     @Nullable private final byte[] embeddedIndex;
     private final BinaryRow minKey;
     private final BinaryRow maxKey;
+    @Nullable private final String externalPath;
 
     public SimpleFileEntry(
             FileKind kind,
@@ -48,7 +49,8 @@ public class SimpleFileEntry implements FileEntry {
             List<String> extraFiles,
             @Nullable byte[] embeddedIndex,
             BinaryRow minKey,
-            BinaryRow maxKey) {
+            BinaryRow maxKey,
+            @Nullable String externalPath) {
         this.kind = kind;
         this.partition = partition;
         this.bucket = bucket;
@@ -58,6 +60,7 @@ public class SimpleFileEntry implements FileEntry {
         this.embeddedIndex = embeddedIndex;
         this.minKey = minKey;
         this.maxKey = maxKey;
+        this.externalPath = externalPath;
     }
 
     public static SimpleFileEntry from(ManifestEntry entry) {
@@ -70,7 +73,8 @@ public class SimpleFileEntry implements FileEntry {
                 entry.file().extraFiles(),
                 entry.file().embeddedIndex(),
                 entry.minKey(),
-                entry.maxKey());
+                entry.maxKey(),
+                entry.externalPath());
     }
 
     public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
@@ -102,9 +106,15 @@ public class SimpleFileEntry implements FileEntry {
         return fileName;
     }
 
+    @Override
+    public String externalPath() {
+        return externalPath;
+    }
+
     @Override
     public Identifier identifier() {
-        return new Identifier(partition, bucket, level, fileName, extraFiles, 
embeddedIndex);
+        return new Identifier(
+                partition, bucket, level, fileName, extraFiles, embeddedIndex, 
externalPath);
     }
 
     @Override
@@ -138,12 +148,14 @@ public class SimpleFileEntry implements FileEntry {
                 && Objects.equals(fileName, that.fileName)
                 && Objects.equals(extraFiles, that.extraFiles)
                 && Objects.equals(minKey, that.minKey)
-                && Objects.equals(maxKey, that.maxKey);
+                && Objects.equals(maxKey, that.maxKey)
+                && Objects.equals(externalPath, that.externalPath);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(kind, partition, bucket, level, fileName, 
extraFiles, minKey, maxKey);
+        return Objects.hash(
+                kind, partition, bucket, level, fileName, extraFiles, minKey, 
maxKey, externalPath);
     }
 
     @Override
@@ -165,6 +177,8 @@ public class SimpleFileEntry implements FileEntry {
                 + minKey
                 + ", maxKey="
                 + maxKey
+                + ", externalPath="
+                + externalPath
                 + '}';
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
deleted file mode 100644
index bdc89b8d4c..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.utils.VersionedObjectSerializer;
-
-import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-
-/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only 
supports reading. */
-public class SimpleFileEntrySerializer extends 
VersionedObjectSerializer<SimpleFileEntry> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int version;
-
-    public SimpleFileEntrySerializer() {
-        super(ManifestEntry.SCHEMA);
-        this.version = new ManifestEntrySerializer().getVersion();
-    }
-
-    @Override
-    public int getVersion() {
-        return version;
-    }
-
-    @Override
-    public InternalRow convertTo(SimpleFileEntry meta) {
-        throw new UnsupportedOperationException("Only supports convert from 
row.");
-    }
-
-    @Override
-    public SimpleFileEntry convertFrom(int version, InternalRow row) {
-        if (this.version != version) {
-            throw new IllegalArgumentException("Unsupported version: " + 
version);
-        }
-
-        InternalRow file = row.getRow(4, DataFileMeta.SCHEMA.getFieldCount());
-        return new SimpleFileEntry(
-                FileKind.fromByteValue(row.getByte(0)),
-                deserializeBinaryRow(row.getBinary(1)),
-                row.getInt(2),
-                file.getInt(10),
-                file.getString(0).toString(),
-                fromStringArrayData(file.getArray(11)),
-                file.isNullAt(14) ? null : file.getBinary(14),
-                deserializeBinaryRow(file.getBinary(3)),
-                deserializeBinaryRow(file.getBinary(4)));
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index f2a964bae1..df48559223 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -27,6 +27,7 @@ import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
@@ -241,9 +242,10 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             } else if (changelogProducer == ChangelogProducer.INPUT && 
isInsertOnly) {
                 List<DataFileMeta> changelogMetas = new ArrayList<>();
                 for (DataFileMeta dataMeta : dataMetas) {
+                    Path newPath = writerFactory.newChangelogPath(0);
                     DataFileMeta changelogMeta =
-                            
dataMeta.rename(writerFactory.newChangelogPath(0).getName());
-                    writerFactory.copyFile(dataMeta.fileName(), 
changelogMeta.fileName(), 0);
+                            dataMeta.rename(newPath.getParent().getName(), 
newPath.getName());
+                    writerFactory.copyFile(dataMeta, changelogMeta, 0);
                     changelogMetas.add(changelogMeta);
                 }
                 newFilesChangelog.addAll(changelogMetas);
@@ -341,7 +343,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
                 // 2. This file is not the input of upgraded.
                 if (!compactBefore.containsKey(file.fileName())
                         && !afterFiles.contains(file.fileName())) {
-                    writerFactory.deleteFile(file.fileName(), file.level());
+                    writerFactory.deleteFile(file, file.level());
                 }
             } else {
                 compactBefore.put(file.fileName(), file);
@@ -375,7 +377,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         deletedFiles.clear();
 
         for (DataFileMeta file : newFilesChangelog) {
-            writerFactory.deleteFile(file.fileName(), file.level());
+            writerFactory.deleteFile(file, file.level());
         }
         newFilesChangelog.clear();
 
@@ -390,12 +392,12 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         compactAfter.clear();
 
         for (DataFileMeta file : compactChangelog) {
-            writerFactory.deleteFile(file.fileName(), file.level());
+            writerFactory.deleteFile(file, file.level());
         }
         compactChangelog.clear();
 
         for (DataFileMeta file : delete) {
-            writerFactory.deleteFile(file.fileName(), file.level());
+            writerFactory.deleteFile(file, file.level());
         }
 
         if (compactDeletionFile != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 391c5f9bb6..51a0b5e2a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -169,6 +169,7 @@ public class FileMetaUtils {
                 Collections.emptyList(),
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 001132e167..0b4783b165 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -593,7 +593,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             toDelete.addAll(commitMessage.compactIncrement().changelogFiles());
 
             for (DataFileMeta file : toDelete) {
-                fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
+                fileIO.deleteQuietly(pathFactory.toPath(file));
             }
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 4fda82f4e8..d0f3275b5a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -210,10 +210,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
         FormatReaderContext formatReaderContext =
                 new FormatReaderContext(
-                        fileIO,
-                        dataFilePathFactory.toPath(file.fileName()),
-                        file.fileSize(),
-                        fileIndexResult);
+                        fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), fileIndexResult);
         FileRecordReader<InternalRow> fileRecordReader =
                 new DataFileRecordReader(
                         formatReaderMapping.getReaderFactory(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 8ff5ce7a65..d474d4e2d0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -161,12 +161,7 @@ public class LocalTableQuery implements TableQuery {
                         readerFactoryBuilder.keyType(),
                         new 
LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()),
                         file -> {
-                            RecordReader<KeyValue> reader =
-                                    factory.createRecordReader(
-                                            file.schemaId(),
-                                            file.fileName(),
-                                            file.fileSize(),
-                                            file.level());
+                            RecordReader<KeyValue> reader = 
factory.createRecordReader(file);
                             if (cacheRowFilter != null) {
                                 reader =
                                         reader.filter(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index bf60234214..9178d25a91 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -180,8 +180,10 @@ public class DataSplit implements Split {
     }
 
     private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
+        String path = file.externalPath() != null ? file.externalPath() : 
bucketPath;
+        path += "/" + file.fileName();
         return new RawFile(
-                bucketPath + "/" + file.fileName(),
+                path,
                 file.fileSize(),
                 0,
                 file.fileSize(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 6dcbb322d6..3107ebe150 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -385,8 +385,15 @@ public class FilesTable implements ReadonlyTable {
                                                                 
dataSplit.partition()))),
                         dataSplit::bucket,
                         () ->
-                                BinaryString.fromString(
-                                        dataSplit.bucketPath() + "/" + 
dataFileMeta.fileName()),
+                                dataFileMeta.externalPath() == null
+                                        ? BinaryString.fromString(
+                                                dataSplit.bucketPath()
+                                                        + "/"
+                                                        + 
dataFileMeta.fileName())
+                                        : BinaryString.fromString(
+                                                dataFileMeta.externalPath()
+                                                        + "/"
+                                                        + 
dataFileMeta.fileName()),
                         () ->
                                 BinaryString.fromString(
                                         DataFilePathFactory.formatIdentifier(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index a9012ed89b..3f752be13e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -125,7 +125,7 @@ public class AppendOnlyWriterTest {
         DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
         assertThat(meta).isNotNull();
 
-        Path path = pathFactory.toPath(meta.fileName());
+        Path path = pathFactory.toPath(meta);
         assertThat(LocalFileIO.create().exists(path)).isTrue();
 
         assertThat(meta.rowCount()).isEqualTo(1L);
@@ -186,7 +186,7 @@ public class AppendOnlyWriterTest {
             assertThat(inc.newFilesIncrement().newFiles().size()).isEqualTo(1);
             DataFileMeta meta = inc.newFilesIncrement().newFiles().get(0);
 
-            Path path = pathFactory.toPath(meta.fileName());
+            Path path = pathFactory.toPath(meta);
             assertThat(LocalFileIO.create().exists(path)).isTrue();
 
             assertThat(meta.rowCount()).isEqualTo(100L);
@@ -227,7 +227,7 @@ public class AppendOnlyWriterTest {
 
         int id = 0;
         for (DataFileMeta meta : firstInc.newFilesIncrement().newFiles()) {
-            Path path = pathFactory.toPath(meta.fileName());
+            Path path = pathFactory.toPath(meta);
             assertThat(LocalFileIO.create().exists(path)).isTrue();
 
             assertThat(meta.rowCount()).isEqualTo(1000L);
@@ -680,6 +680,7 @@ public class AppendOnlyWriterTest {
                 Collections.emptyList(),
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e43cd898db..e817562689 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -78,7 +78,7 @@ public class KeyValueFileReadWriteTest {
     public void testReadNonExistentFile() {
         KeyValueFileReaderFactory readerFactory =
                 createReaderFactory(tempDir.toString(), "avro", null, null);
-        assertThatThrownBy(() -> readerFactory.createRecordReader(0, 
"dummy_file.avro", 1, 0))
+        assertThatThrownBy(() -> readerFactory.createRecordReader(0, 
"dummy_file.avro", 1, 0, null))
                 .hasMessageContaining(
                         "you can configure 'snapshot.time-retained' option 
with a larger value.");
     }
@@ -312,7 +312,8 @@ public class KeyValueFileReadWriteTest {
                                     meta.schemaId(),
                                     meta.fileName(),
                                     meta.fileSize(),
-                                    meta.level()));
+                                    meta.level(),
+                                    meta.externalPath()));
             while (actualKvsIterator.hasNext()) {
                 assertThat(expectedIterator.hasNext()).isTrue();
                 KeyValue actualKv = actualKvsIterator.next();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index be49311427..fa96765a42 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -198,7 +198,11 @@ public class ContainsLevelsTest {
                 file ->
                         createReaderFactory()
                                 .createRecordReader(
-                                        0, file.fileName(), file.fileSize(), 
file.level()),
+                                        0,
+                                        file.fileName(),
+                                        file.fileSize(),
+                                        file.level(),
+                                        file.externalPath()),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index a678534042..56c45cfdc4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -275,7 +275,11 @@ public class LookupLevelsTest {
                 file ->
                         createReaderFactory()
                                 .createRecordReader(
-                                        0, file.fileName(), file.fileSize(), 
file.level()),
+                                        0,
+                                        file.fileName(),
+                                        file.fileSize(),
+                                        file.level(),
+                                        file.externalPath()),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index f2a9c44dd7..47d12ce47c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -592,7 +592,7 @@ public abstract class MergeTreeTestBase {
             assertThat(remove).isTrue();
             // See MergeTreeWriter.updateCompactResult
             if (!newFileNames.contains(file.fileName()) && 
!afterFiles.contains(file.fileName())) {
-                compactWriterFactory.deleteFile(file.fileName(), file.level());
+                compactWriterFactory.deleteFile(file, file.level());
             }
         }
         compactedFiles.addAll(increment.compactIncrement().compactAfter());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 01d4e89af9..471b60d3cf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -112,7 +112,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                 table.store()
                         .pathFactory()
                         .createDataFilePathFactory(split.partition(), 
split.bucket())
-                        .toPath(split.dataFiles().get(0).fileName());
+                        .toPath(split.dataFiles().get(0));
         table.fileIO().deleteQuietly(path);
 
         // read
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index a4e581b701..a088f40dab 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -447,6 +447,7 @@ public class SplitTest {
                 Collections.emptyList(),
                 null,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
index f83b5cf8f9..b7e1e60878 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
@@ -108,7 +108,7 @@ public class PickFilesUtil {
                         pathFactory
                                 .createDataFilePathFactory(
                                         simpleFileEntry.partition(), 
simpleFileEntry.bucket())
-                                .toPath(simpleFileEntry.fileName());
+                                .toPath(simpleFileEntry.fileName(), 
simpleFileEntry.externalPath());
                 dataFiles.add(dataFilePath);
             }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 6b95e36907..39860e418c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -96,7 +96,7 @@ public class ChangelogCompactTask implements Serializable {
                         outputStream,
                         results,
                         table,
-                        dataFilePathFactory.toPath(meta.fileName()),
+                        dataFilePathFactory.toPath(meta),
                         bucket,
                         false,
                         meta);
@@ -111,7 +111,7 @@ public class ChangelogCompactTask implements Serializable {
                         outputStream,
                         results,
                         table,
-                        dataFilePathFactory.toPath(meta.fileName()),
+                        dataFilePathFactory.toPath(meta),
                         bucket,
                         true,
                         meta);
@@ -167,7 +167,8 @@ public class ChangelogCompactTask implements Serializable {
         table.fileIO()
                 .rename(
                         changelogTempPath,
-                        dataFilePathFactory.toPath(
+                        dataFilePathFactory.toExtraFilePath(
+                                baseResult.meta,
                                 realName
                                         + "."
                                         + 
CompactedChangelogReadOnlyFormat.getIdentifier(
@@ -193,9 +194,9 @@ public class ChangelogCompactTask implements Serializable {
                                 + 
CompactedChangelogReadOnlyFormat.getIdentifier(
                                         result.meta.fileFormat());
                 if (result.isCompactResult) {
-                    compactChangelog.add(result.meta.rename(name));
+                    
compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
                 } else {
-                    newFilesChangelog.add(result.meta.rename(name));
+                    
newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
                 }
             }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index d9f863c6b9..d35cb09cb7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -198,16 +198,18 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                 String indexFile = indexFiles.get(0);
                 try (FileIndexFormat.Reader indexReader =
                         FileIndexFormat.createReader(
-                                
fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)),
+                                fileIO.newInputStream(
+                                        dataFilePathFactory.toExtraFilePath(
+                                                dataFileMeta, indexFile)),
                                 schemaInfo.fileSchema)) {
                     maintainers = indexReader.readAll();
                 }
-                newIndexPath = 
createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile));
+                newIndexPath =
+                        createNewFileIndexFilePath(
+                                
dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile));
             } else {
                 maintainers = new HashMap<>();
-                newIndexPath =
-                        dataFileToFileIndexPath(
-                                
dataFilePathFactory.toPath(dataFileMeta.fileName()));
+                newIndexPath = 
dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta));
             }
 
             // remove unnecessary
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index a3223446f6..4997f65eaf 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -53,6 +53,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
             new java.util.ArrayList[String](),
             null,
             FileSource.APPEND,
+            null,
             null)
       }
 
@@ -89,6 +90,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
         new java.util.ArrayList[String](),
         null,
         FileSource.APPEND,
+        null,
         null)
     ).asJava
 

Reply via email to