This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ac05e4b6e [core] External Path in DataFileMeta should be the file 
path (#4766)
4ac05e4b6e is described below

commit 4ac05e4b6e730d976a51e92a518cfdbdeb10b51b
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 24 16:07:44 2024 +0800

    [core] External Path in DataFileMeta should be the file path (#4766)
---
 .../java/org/apache/paimon/io/DataFileMeta.java    | 16 +++++--
 .../apache/paimon/io/DataFileMetaSerializer.java   |  2 +-
 .../org/apache/paimon/io/DataFilePathFactory.java  | 54 ++++++++++------------
 .../org/apache/paimon/io/FileIndexEvaluator.java   |  2 +-
 .../paimon/io/KeyValueFileReaderFactory.java       | 40 ++++------------
 .../paimon/io/KeyValueFileWriterFactory.java       | 15 +++---
 .../java/org/apache/paimon/manifest/FileEntry.java |  5 +-
 .../org/apache/paimon/manifest/ManifestEntry.java  |  7 ++-
 .../apache/paimon/manifest/SimpleFileEntry.java    |  1 +
 .../apache/paimon/mergetree/MergeTreeWriter.java   | 16 +++----
 .../org/apache/paimon/table/source/DataSplit.java  |  4 +-
 .../org/apache/paimon/table/system/FilesTable.java | 53 +++++++++------------
 .../apache/paimon/append/AppendOnlyWriterTest.java |  7 ++-
 .../apache/paimon/io/DataFilePathFactoryTest.java  | 12 +++--
 .../paimon/io/KeyValueFileReadWriteTest.java       | 14 +++---
 .../paimon/mergetree/ContainsLevelsTest.java       |  9 +---
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  9 +---
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  8 ++--
 .../paimon/utils/FileStorePathFactoryTest.java     | 12 +++--
 .../apache/paimon/flink/clone/CloneFileInfo.java   | 15 ++++--
 .../paimon/flink/clone/CopyFileOperator.java       |  2 +-
 .../flink/clone/PickFilesForCloneOperator.java     |  6 ++-
 .../apache/paimon/flink/clone/PickFilesUtil.java   |  2 +-
 .../compact/changelog/ChangelogCompactTask.java    | 10 ++--
 .../paimon/flink/sink/RewriteFileIndexSink.java    |  5 +-
 .../flink/action/RewriteFileIndexActionITCase.java |  2 +-
 .../procedure/RewriteFileIndexProcedureITCase.java |  2 +-
 ...nlySingleTableCompactionWorkerOperatorTest.java |  7 +--
 .../apache/paimon/spark/SparkFileIndexITCase.java  |  3 +-
 29 files changed, 157 insertions(+), 183 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 459cd788de..b164b60fe5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -368,9 +368,14 @@ public class DataFileMeta {
         return split[split.length - 1];
     }
 
-    @Nullable
-    public String externalPath() {
-        return externalPath;
+    public Optional<String> externalPath() {
+        return Optional.ofNullable(externalPath);
+    }
+
+    public Optional<String> externalPathDir() {
+        return Optional.ofNullable(externalPath)
+                .map(Path::new)
+                .map(p -> p.getParent().toUri().toString());
     }
 
     public Optional<FileSource> fileSource() {
@@ -405,7 +410,8 @@ public class DataFileMeta {
                 externalPath);
     }
 
-    public DataFileMeta rename(String newExternalPath, String newFileName) {
+    public DataFileMeta rename(String newFileName) {
+        String newExternalPath = externalPathDir().map(dir -> dir + "/" + 
newFileName).orElse(null);
         return new DataFileMeta(
                 newFileName,
                 fileSize,
@@ -452,7 +458,7 @@ public class DataFileMeta {
     public List<Path> collectFiles(DataFilePathFactory pathFactory) {
         List<Path> paths = new ArrayList<>();
         paths.add(pathFactory.toPath(this));
-        extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, 
f)));
+        extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this)));
         return paths;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index c8a5e326b0..a316f897ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -59,7 +59,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 meta.embeddedIndex(),
                 meta.fileSource().map(FileSource::toByteValue).orElse(null),
                 toStringArrayData(meta.valueStatsCols()),
-                BinaryString.fromString(meta.externalPath()));
+                
meta.externalPath().map(BinaryString::fromString).orElse(null));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index daeb9f52ea..19525ab6cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -20,9 +20,11 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileEntry;
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -67,47 +69,36 @@ public class DataFilePathFactory {
         return newPath(changelogFilePrefix);
     }
 
-    private Path newPath(String prefix) {
+    public String newChangelogFileName() {
+        return newFileName(changelogFilePrefix);
+    }
+
+    public Path newPath(String prefix) {
+        return new Path(parent, newFileName(prefix));
+    }
+
+    private String newFileName(String prefix) {
         String extension;
         if (fileSuffixIncludeCompression) {
             extension = "." + fileCompression + "." + formatIdentifier;
         } else {
             extension = "." + formatIdentifier;
         }
-        String name = prefix + uuid + "-" + pathCount.getAndIncrement() + 
extension;
-        return new Path(parent, name);
+        return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
     }
 
-    @VisibleForTesting
-    public Path toPath(String fileName) {
-        return new Path(parent + "/" + fileName);
-    }
-
-    /**
-     * for read purpose.
-     *
-     * @param fileName the file name
-     * @param externalPath the external path, if null, it will use the parent 
path
-     * @return the file's path
-     */
-    public Path toPath(String fileName, String externalPath) {
-        return new Path((externalPath == null ? parent : externalPath) + "/" + 
fileName);
+    public Path toPath(DataFileMeta file) {
+        return file.externalPath().map(Path::new).orElse(new Path(parent, 
file.fileName()));
     }
 
-    public Path toPath(DataFileMeta dataFileMeta) {
-        String externalPath = dataFileMeta.externalPath();
-        String fileName = dataFileMeta.fileName();
-        return new Path((externalPath == null ? parent : externalPath) + "/" + 
fileName);
+    public Path toPath(FileEntry file) {
+        return Optional.ofNullable(file.externalPath())
+                .map(Path::new)
+                .orElse(new Path(parent, file.fileName()));
     }
 
-    public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
-        String externalPath = dataFileMeta.externalPath();
-        return new Path((externalPath == null ? parent : externalPath) + "/" + 
extraFile);
-    }
-
-    @VisibleForTesting
-    public String uuid() {
-        return uuid;
+    public Path toAlignedPath(String fileName, DataFileMeta aligned) {
+        return new 
Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName);
     }
 
     public static Path dataFileToFileIndexPath(Path dataFilePath) {
@@ -141,4 +132,9 @@ public class DataFilePathFactory {
 
         return fileName.substring(index + 1);
     }
+
+    @VisibleForTesting
+    String uuid() {
+        return uuid;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 9055097d37..3ed4c278d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -62,7 +62,7 @@ public class FileIndexEvaluator {
                 // go to file index check
                 try (FileIndexPredicate predicate =
                         new FileIndexPredicate(
-                                dataFilePathFactory.toExtraFilePath(file, 
indexFiles.get(0)),
+                                
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
                                 fileIO,
                                 dataSchema.logicalRowType())) {
                     return predicate.evaluate(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 9d65a54113..14221d50be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -20,7 +20,6 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -98,37 +97,17 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
 
     @Override
     public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws 
IOException {
-        return createRecordReader(
-                file.schemaId(),
-                file.fileName(),
-                file.fileSize(),
-                file.level(),
-                file.externalPath());
-    }
-
-    @VisibleForTesting
-    public RecordReader<KeyValue> createRecordReader(
-            long schemaId, String fileName, long fileSize, int level, String 
externalPath)
-            throws IOException {
-        if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
-            return new AsyncRecordReader<>(
-                    () ->
-                            createRecordReader(
-                                    schemaId, fileName, level, false, 2, 
fileSize, externalPath));
+        if (file.fileSize() >= asyncThreshold && 
file.fileName().endsWith(".orc")) {
+            return new AsyncRecordReader<>(() -> createRecordReader(file, 
false, 2));
         }
-        return createRecordReader(schemaId, fileName, level, true, null, 
fileSize, externalPath);
+        return createRecordReader(file, true, null);
     }
 
     private FileRecordReader<KeyValue> createRecordReader(
-            long schemaId,
-            String fileName,
-            int level,
-            boolean reuseFormat,
-            @Nullable Integer orcPoolSize,
-            long fileSize,
-            String externalPath)
+            DataFileMeta file, boolean reuseFormat, @Nullable Integer 
orcPoolSize)
             throws IOException {
-        String formatIdentifier = 
DataFilePathFactory.formatIdentifier(fileName);
+        String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
+        long schemaId = file.schemaId();
 
         Supplier<FormatReaderMapping> formatSupplier =
                 () ->
@@ -143,8 +122,9 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                                 new FormatKey(schemaId, formatIdentifier),
                                 key -> formatSupplier.get())
                         : formatSupplier.get();
-        Path filePath = pathFactory.toPath(fileName, externalPath);
+        Path filePath = pathFactory.toPath(file);
 
+        long fileSize = file.fileSize();
         FileRecordReader<InternalRow> fileRecordReader =
                 new DataFileRecordReader(
                         formatReaderMapping.getReaderFactory(),
@@ -156,13 +136,13 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                         formatReaderMapping.getCastMapping(),
                         
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));
 
-        Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
+        Optional<DeletionVector> deletionVector = 
dvFactory.create(file.fileName());
         if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
             fileRecordReader =
                     new ApplyDeletionVectorReader(fileRecordReader, 
deletionVector.get());
         }
 
-        return new KeyValueDataFileRecordReader(fileRecordReader, keyType, 
valueType, level);
+        return new KeyValueDataFileRecordReader(fileRecordReader, keyType, 
valueType, file.level());
     }
 
     public static Builder builder(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 500320c249..7b6f4f0e3c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -142,14 +142,13 @@ public class KeyValueFileWriterFactory {
                         fileIndexOptions);
     }
 
-    public void deleteFile(DataFileMeta meta, int level) {
-        fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
+    public void deleteFile(DataFileMeta file) {
+        
fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file));
     }
 
-    public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int 
level)
-            throws IOException {
-        Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
-        Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
+    public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile) 
throws IOException {
+        Path sourcePath = 
formatContext.pathFactory(sourceFile.level()).toPath(sourceFile);
+        Path targetPath = 
formatContext.pathFactory(targetFile.level()).toPath(targetFile);
         fileIO.copyFile(sourcePath, targetPath, true);
     }
 
@@ -157,8 +156,8 @@ public class KeyValueFileWriterFactory {
         return fileIO;
     }
 
-    public Path newChangelogPath(int level) {
-        return formatContext.pathFactory(level).newChangelogPath();
+    public String newChangelogFileName(int level) {
+        return formatContext.pathFactory(level).newChangelogFileName();
     }
 
     public static Builder builder(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 738776438b..dd77759de1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -54,6 +54,7 @@ public interface FileEntry {
 
     String fileName();
 
+    @Nullable
     String externalPath();
 
     Identifier identifier();
@@ -161,7 +162,9 @@ public interface FileEntry {
                     + ", extraFiles "
                     + extraFiles
                     + ", embeddedIndex "
-                    + Arrays.toString(embeddedIndex);
+                    + Arrays.toString(embeddedIndex)
+                    + ", externalPath "
+                    + externalPath;
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index d4748451d8..3cb5733a38 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -26,6 +26,8 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
@@ -92,9 +94,10 @@ public class ManifestEntry implements FileEntry {
         return file.fileName();
     }
 
+    @Nullable
     @Override
     public String externalPath() {
-        return file.externalPath();
+        return file.externalPath().orElse(null);
     }
 
     @Override
@@ -129,7 +132,7 @@ public class ManifestEntry implements FileEntry {
                 file.fileName(),
                 file.extraFiles(),
                 file.embeddedIndex(),
-                file.externalPath());
+                externalPath());
     }
 
     public ManifestEntry copyWithoutStats() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index f86bded52d..c8708db0b8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -106,6 +106,7 @@ public class SimpleFileEntry implements FileEntry {
         return fileName;
     }
 
+    @Nullable
     @Override
     public String externalPath() {
         return externalPath;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index df48559223..1c805e764a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -27,7 +27,6 @@ import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
@@ -242,10 +241,9 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             } else if (changelogProducer == ChangelogProducer.INPUT && 
isInsertOnly) {
                 List<DataFileMeta> changelogMetas = new ArrayList<>();
                 for (DataFileMeta dataMeta : dataMetas) {
-                    Path newPath = writerFactory.newChangelogPath(0);
-                    DataFileMeta changelogMeta =
-                            dataMeta.rename(newPath.getParent().getName(), 
newPath.getName());
-                    writerFactory.copyFile(dataMeta, changelogMeta, 0);
+                    String newFileName = writerFactory.newChangelogFileName(0);
+                    DataFileMeta changelogMeta = dataMeta.rename(newFileName);
+                    writerFactory.copyFile(dataMeta, changelogMeta);
                     changelogMetas.add(changelogMeta);
                 }
                 newFilesChangelog.addAll(changelogMetas);
@@ -343,7 +341,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
                 // 2. This file is not the input of upgraded.
                 if (!compactBefore.containsKey(file.fileName())
                         && !afterFiles.contains(file.fileName())) {
-                    writerFactory.deleteFile(file, file.level());
+                    writerFactory.deleteFile(file);
                 }
             } else {
                 compactBefore.put(file.fileName(), file);
@@ -377,7 +375,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         deletedFiles.clear();
 
         for (DataFileMeta file : newFilesChangelog) {
-            writerFactory.deleteFile(file, file.level());
+            writerFactory.deleteFile(file);
         }
         newFilesChangelog.clear();
 
@@ -392,12 +390,12 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         compactAfter.clear();
 
         for (DataFileMeta file : compactChangelog) {
-            writerFactory.deleteFile(file, file.level());
+            writerFactory.deleteFile(file);
         }
         compactChangelog.clear();
 
         for (DataFileMeta file : delete) {
-            writerFactory.deleteFile(file, file.level());
+            writerFactory.deleteFile(file);
         }
 
         if (compactDeletionFile != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 9178d25a91..39f9269f41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -180,10 +180,8 @@ public class DataSplit implements Split {
     }
 
     private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
-        String path = file.externalPath() != null ? file.externalPath() : 
bucketPath;
-        path += "/" + file.fileName();
         return new RawFile(
-                path,
+                file.externalPath().orElse(bucketPath + "/" + file.fileName()),
                 file.fileSize(),
                 0,
                 file.fileSize(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 3107ebe150..5c7ccd4809 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -370,9 +370,9 @@ public class FilesTable implements ReadonlyTable {
                 DataSplit dataSplit,
                 RowDataToObjectArrayConverter partitionConverter,
                 Function<Long, RowDataToObjectArrayConverter> keyConverters,
-                DataFileMeta dataFileMeta,
+                DataFileMeta file,
                 SimpleStatsEvolutions simpleStatsEvolutions) {
-            StatsLazyGetter statsGetter = new StatsLazyGetter(dataFileMeta, 
simpleStatsEvolutions);
+            StatsLazyGetter statsGetter = new StatsLazyGetter(file, 
simpleStatsEvolutions);
             @SuppressWarnings("unchecked")
             Supplier<Object>[] fields =
                     new Supplier[] {
@@ -385,51 +385,44 @@ public class FilesTable implements ReadonlyTable {
                                                                 
dataSplit.partition()))),
                         dataSplit::bucket,
                         () ->
-                                dataFileMeta.externalPath() == null
-                                        ? BinaryString.fromString(
-                                                dataSplit.bucketPath()
-                                                        + "/"
-                                                        + 
dataFileMeta.fileName())
-                                        : BinaryString.fromString(
-                                                dataFileMeta.externalPath()
-                                                        + "/"
-                                                        + 
dataFileMeta.fileName()),
+                                BinaryString.fromString(
+                                        file.externalPath()
+                                                .orElse(
+                                                        dataSplit.bucketPath()
+                                                                + "/"
+                                                                + 
file.fileName())),
                         () ->
                                 BinaryString.fromString(
-                                        DataFilePathFactory.formatIdentifier(
-                                                dataFileMeta.fileName())),
-                        dataFileMeta::schemaId,
-                        dataFileMeta::level,
-                        dataFileMeta::rowCount,
-                        dataFileMeta::fileSize,
+                                        
DataFilePathFactory.formatIdentifier(file.fileName())),
+                        file::schemaId,
+                        file::level,
+                        file::rowCount,
+                        file::fileSize,
                         () ->
-                                dataFileMeta.minKey().getFieldCount() <= 0
+                                file.minKey().getFieldCount() <= 0
                                         ? null
                                         : BinaryString.fromString(
                                                 Arrays.toString(
                                                         keyConverters
-                                                                
.apply(dataFileMeta.schemaId())
-                                                                
.convert(dataFileMeta.minKey()))),
+                                                                
.apply(file.schemaId())
+                                                                
.convert(file.minKey()))),
                         () ->
-                                dataFileMeta.maxKey().getFieldCount() <= 0
+                                file.maxKey().getFieldCount() <= 0
                                         ? null
                                         : BinaryString.fromString(
                                                 Arrays.toString(
                                                         keyConverters
-                                                                
.apply(dataFileMeta.schemaId())
-                                                                
.convert(dataFileMeta.maxKey()))),
+                                                                
.apply(file.schemaId())
+                                                                
.convert(file.maxKey()))),
                         () -> 
BinaryString.fromString(statsGetter.nullValueCounts().toString()),
                         () -> 
BinaryString.fromString(statsGetter.lowerValueBounds().toString()),
                         () -> 
BinaryString.fromString(statsGetter.upperValueBounds().toString()),
-                        dataFileMeta::minSequenceNumber,
-                        dataFileMeta::maxSequenceNumber,
-                        dataFileMeta::creationTime,
+                        file::minSequenceNumber,
+                        file::maxSequenceNumber,
+                        file::creationTime,
                         () ->
                                 BinaryString.fromString(
-                                        dataFileMeta
-                                                .fileSource()
-                                                .map(FileSource::toString)
-                                                .orElse(null))
+                                        
file.fileSource().map(FileSource::toString).orElse(null))
                     };
 
             return new LazyGenericRow(fields);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 3f752be13e..7757020532 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -66,7 +66,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -646,10 +645,10 @@ public class AppendOnlyWriterTest {
         int size = toCompact.size();
         long minSeq = toCompact.get(0).minSequenceNumber();
         long maxSeq = toCompact.get(size - 1).maxSequenceNumber();
-        String fileName = "compact-" + UUID.randomUUID();
-        LocalFileIO.create().newOutputStream(pathFactory.toPath(fileName), 
false).close();
+        Path path = pathFactory.newPath("compact-");
+        LocalFileIO.create().newOutputStream(path, false).close();
         return DataFileMeta.forAppend(
-                fileName,
+                path.getName(),
                 toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(),
                 toCompact.stream().mapToLong(DataFileMeta::rowCount).sum(),
                 STATS_SERIALIZER.toBinaryAllMode(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
index d36966c55a..109f33c3dc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
@@ -55,8 +55,9 @@ public class DataFilePathFactoryTest {
                                             + "."
                                             + 
CoreOptions.FILE_FORMAT.defaultValue()));
         }
-        assertThat(pathFactory.toPath("my-data-file-name"))
-                .isEqualTo(new Path(tempDir.toString() + 
"/bucket-123/my-data-file-name"));
+        assertThat(pathFactory.newPath("my-data-file-name").toString())
+                .startsWith(
+                        new Path(tempDir.toString() + 
"/bucket-123/my-data-file-name").toString());
     }
 
     @Test
@@ -83,8 +84,9 @@ public class DataFilePathFactoryTest {
                                             + "."
                                             + 
CoreOptions.FILE_FORMAT.defaultValue()));
         }
-        assertThat(pathFactory.toPath("my-data-file-name"))
-                .isEqualTo(
-                        new Path(tempDir.toString() + 
"/dt=20211224/bucket-123/my-data-file-name"));
+        assertThat(pathFactory.newPath("my-data-file-name").toString())
+                .startsWith(
+                        new Path(tempDir.toString() + 
"/dt=20211224/bucket-123/my-data-file-name")
+                                .toString());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index e817562689..8f2c815404 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -61,6 +61,7 @@ import java.util.function.Function;
 import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
 import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE;
 import static org.apache.paimon.TestKeyValueGenerator.createTestSchemaManager;
+import static org.apache.paimon.io.DataFileTestUtils.newFile;
 import static 
org.apache.paimon.stats.StatsTestUtils.convertWithoutSchemaEvolution;
 import static 
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -78,7 +79,10 @@ public class KeyValueFileReadWriteTest {
     public void testReadNonExistentFile() {
         KeyValueFileReaderFactory readerFactory =
                 createReaderFactory(tempDir.toString(), "avro", null, null);
-        assertThatThrownBy(() -> readerFactory.createRecordReader(0, 
"dummy_file.avro", 1, 0, null))
+        assertThatThrownBy(
+                        () ->
+                                readerFactory.createRecordReader(
+                                        newFile("non_avro_file.avro", 0, 0, 1, 
0)))
                 .hasMessageContaining(
                         "you can configure 'snapshot.time-retained' option 
with a larger value.");
     }
@@ -307,13 +311,7 @@ public class KeyValueFileReadWriteTest {
         for (DataFileMeta meta : actualMetas) {
             // check the contents of data file
             CloseableIterator<KeyValue> actualKvsIterator =
-                    new RecordReaderIterator<>(
-                            readerFactory.createRecordReader(
-                                    meta.schemaId(),
-                                    meta.fileName(),
-                                    meta.fileSize(),
-                                    meta.level(),
-                                    meta.externalPath()));
+                    new 
RecordReaderIterator<>(readerFactory.createRecordReader(meta));
             while (actualKvsIterator.hasNext()) {
                 assertThat(expectedIterator.hasNext()).isTrue();
                 KeyValue actualKv = actualKvsIterator.next();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index fa96765a42..fa9628b4c1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -195,14 +195,7 @@ public class ContainsLevelsTest {
                 comparator,
                 keyType,
                 new LookupLevels.ContainsValueProcessor(),
-                file ->
-                        createReaderFactory()
-                                .createRecordReader(
-                                        0,
-                                        file.fileName(),
-                                        file.fileSize(),
-                                        file.level(),
-                                        file.externalPath()),
+                file -> createReaderFactory().createRecordReader(file),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 56c45cfdc4..b68a82935b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -272,14 +272,7 @@ public class LookupLevelsTest {
                 comparator,
                 keyType,
                 new LookupLevels.KeyValueProcessor(rowType),
-                file ->
-                        createReaderFactory()
-                                .createRecordReader(
-                                        0,
-                                        file.fileName(),
-                                        file.fileSize(),
-                                        file.level(),
-                                        file.externalPath()),
+                file -> createReaderFactory().createRecordReader(file),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
                         new CacheManager(MemorySize.ofMebiBytes(1)),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 47d12ce47c..e987e2ee99 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -114,7 +114,7 @@ public abstract class MergeTreeTestBase {
         pathFactory = createNonPartFactory(path);
         comparator = Comparator.comparingInt(o -> o.getInt(0));
         recreateMergeTree(1024 * 1024);
-        Path bucketDir = 
writerFactory.pathFactory(0).toPath("ignore").getParent();
+        Path bucketDir = writerFactory.pathFactory(0).newPath().getParent();
         LocalFileIO.create().mkdirs(bucketDir);
     }
 
@@ -418,7 +418,7 @@ public abstract class MergeTreeTestBase {
 
         writer.close();
 
-        Path bucketDir = 
writerFactory.pathFactory(0).toPath("ignore").getParent();
+        Path bucketDir = writerFactory.pathFactory(0).newPath().getParent();
         Set<String> files =
                 Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
                         .map(FileStatus::getPath)
@@ -475,7 +475,7 @@ public abstract class MergeTreeTestBase {
 
         writer.close();
 
-        Path bucketDir = 
writerFactory.pathFactory(0).toPath("ignore").getParent();
+        Path bucketDir = writerFactory.pathFactory(0).newPath().getParent();
         Set<String> files =
                 Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
                         .map(FileStatus::getPath)
@@ -592,7 +592,7 @@ public abstract class MergeTreeTestBase {
             assertThat(remove).isTrue();
             // See MergeTreeWriter.updateCompactResult
             if (!newFileNames.contains(file.fileName()) && 
!afterFiles.contains(file.fileName())) {
-                compactWriterFactory.deleteFile(file, file.level());
+                compactWriterFactory.deleteFile(file);
             }
         }
         compactedFiles.addAll(increment.compactIncrement().compactAfter());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
index 6ca15cf150..c5cda2286d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
@@ -72,8 +72,9 @@ public class FileStorePathFactoryTest {
         FileStorePathFactory pathFactory = createNonPartFactory(new 
Path(tempDir.toString()));
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(new BinaryRow(0), 123);
-        assertThat(dataFilePathFactory.toPath("my-data-file-name"))
-                .isEqualTo(new Path(tempDir.toString() + 
"/bucket-123/my-data-file-name"));
+        assertThat(dataFilePathFactory.newPath("my-data-file-name").toString())
+                .startsWith(
+                        new Path(tempDir.toString() + 
"/bucket-123/my-data-file-name").toString());
     }
 
     @Test
@@ -116,9 +117,10 @@ public class FileStorePathFactoryTest {
         writer.complete();
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(partition, 123);
-        assertThat(dataFilePathFactory.toPath("my-data-file-name"))
-                .isEqualTo(
-                        new Path(tempDir.toString() + expected + 
"/bucket-123/my-data-file-name"));
+        assertThat(dataFilePathFactory.newPath("my-data-file-name").toString())
+                .startsWith(
+                        new Path(tempDir.toString() + expected + 
"/bucket-123/my-data-file-name")
+                                .toString());
     }
 
     public static FileStorePathFactory createNonPartFactory(Path root) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
index d916958412..5c0ac75e16 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
@@ -21,17 +21,26 @@ package org.apache.paimon.flink.clone;
 /** The information of copy file. */
 public class CloneFileInfo {
 
+    private final String sourceFilePath;
     private final String filePathExcludeTableRoot;
     private final String sourceIdentifier;
     private final String targetIdentifier;
 
     public CloneFileInfo(
-            String filePathExcludeTableRoot, String sourceIdentifier, String 
targetIdentifier) {
+            String sourceFilePath,
+            String filePathExcludeTableRoot,
+            String sourceIdentifier,
+            String targetIdentifier) {
+        this.sourceFilePath = sourceFilePath;
         this.filePathExcludeTableRoot = filePathExcludeTableRoot;
         this.sourceIdentifier = sourceIdentifier;
         this.targetIdentifier = targetIdentifier;
     }
 
+    public String getSourceFilePath() {
+        return sourceFilePath;
+    }
+
     public String getFilePathExcludeTableRoot() {
         return filePathExcludeTableRoot;
     }
@@ -47,7 +56,7 @@ public class CloneFileInfo {
     @Override
     public String toString() {
         return String.format(
-                "{ filePath: %s, sourceIdentifier: %s, targetIdentifier: %s }",
-                filePathExcludeTableRoot, sourceIdentifier, targetIdentifier);
+                "{ sourceFilePath: %s, filePathExcludeTableRoot: %s, 
sourceIdentifier: %s, targetIdentifier: %s }",
+                sourceFilePath, filePathExcludeTableRoot, sourceIdentifier, 
targetIdentifier);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index 8bcaa2a207..e7002cce1e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -99,7 +99,7 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
                         });
 
         String filePathExcludeTableRoot = 
cloneFileInfo.getFilePathExcludeTableRoot();
-        Path sourcePath = new Path(sourceTableRootPath + 
filePathExcludeTableRoot);
+        Path sourcePath = new Path(cloneFileInfo.getSourceFilePath());
         Path targetPath = new Path(targetTableRootPath + 
filePathExcludeTableRoot);
 
         if (targetTableFileIO.exists(targetPath)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
index 67eecbc6f2..f58d3acafd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
@@ -123,7 +123,11 @@ public class PickFilesForCloneOperator extends 
AbstractStreamOperator<CloneFileI
         for (Path file : files) {
             Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
             result.add(
-                    new CloneFileInfo(relativePath.toString(), 
sourceIdentifier, targetIdentifier));
+                    new CloneFileInfo(
+                            file.toUri().toString(),
+                            relativePath.toString(),
+                            sourceIdentifier,
+                            targetIdentifier));
         }
         return result;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
index b7e1e60878..9de974d047 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
@@ -108,7 +108,7 @@ public class PickFilesUtil {
                         pathFactory
                                 .createDataFilePathFactory(
                                         simpleFileEntry.partition(), 
simpleFileEntry.bucket())
-                                .toPath(simpleFileEntry.fileName(), 
simpleFileEntry.externalPath());
+                                .toPath(simpleFileEntry);
                 dataFiles.add(dataFilePath);
             }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 39860e418c..7f3f730280 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -167,12 +167,12 @@ public class ChangelogCompactTask implements Serializable 
{
         table.fileIO()
                 .rename(
                         changelogTempPath,
-                        dataFilePathFactory.toExtraFilePath(
-                                baseResult.meta,
+                        dataFilePathFactory.toAlignedPath(
                                 realName
                                         + "."
                                         + 
CompactedChangelogReadOnlyFormat.getIdentifier(
-                                                
baseResult.meta.fileFormat())));
+                                                baseResult.meta.fileFormat()),
+                                baseResult.meta));
 
         List<Committable> newCommittables = new ArrayList<>();
 
@@ -194,9 +194,9 @@ public class ChangelogCompactTask implements Serializable {
                                 + 
CompactedChangelogReadOnlyFormat.getIdentifier(
                                         result.meta.fileFormat());
                 if (result.isCompactResult) {
-                    
compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
+                    compactChangelog.add(result.meta.rename(name));
                 } else {
-                    
newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name));
+                    newFilesChangelog.add(result.meta.rename(name));
                 }
             }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index d35cb09cb7..99061d4b82 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -199,14 +199,13 @@ public class RewriteFileIndexSink extends 
FlinkWriteSink<ManifestEntry> {
                 try (FileIndexFormat.Reader indexReader =
                         FileIndexFormat.createReader(
                                 fileIO.newInputStream(
-                                        dataFilePathFactory.toExtraFilePath(
-                                                dataFileMeta, indexFile)),
+                                        
dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)),
                                 schemaInfo.fileSchema)) {
                     maintainers = indexReader.readAll();
                 }
                 newIndexPath =
                         createNewFileIndexFilePath(
-                                
dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile));
+                                dataFilePathFactory.toAlignedPath(indexFile, 
dataFileMeta));
             } else {
                 maintainers = new HashMap<>();
                 newIndexPath = 
dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
index 242a751416..dc6e5523b0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
@@ -102,7 +102,7 @@ public class RewriteFileIndexActionITCase extends 
ActionITCaseBase {
                     table.store()
                             .pathFactory()
                             .createDataFilePathFactory(entry.partition(), 
entry.bucket())
-                            .toPath(file);
+                            .toAlignedPath(file, entry.file());
             try (FileIndexFormat.Reader reader =
                     FileIndexFormat.createReader(
                             table.fileIO().newInputStream(indexFilePath), 
table.rowType())) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
index 1abfe355a5..23102f2a3d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedureITCase.java
@@ -199,7 +199,7 @@ public class RewriteFileIndexProcedureITCase extends 
CatalogITCaseBase {
                     table.store()
                             .pathFactory()
                             .createDataFilePathFactory(entry.partition(), 
entry.bucket())
-                            .toPath(file);
+                            .toAlignedPath(file, entry.file());
             try (FileIndexFormat.Reader reader =
                     FileIndexFormat.createReader(
                             table.fileIO().newInputStream(indexFilePath), 
table.rowType())) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
index 6238a9cbf3..aa33e4fe75 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
@@ -160,8 +160,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
             List<DataFileMeta> fileMetas =
                     ((CommitMessageImpl) 
commitMessage).compactIncrement().compactAfter();
             for (DataFileMeta fileMeta : fileMetas) {
-                Assertions.assertThat(
-                                
localFileIO.exists(dataFilePathFactory.toPath(fileMeta.fileName())))
+                
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
                         .isTrue();
             }
             if (i++ > 2) {
@@ -188,9 +187,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
                 List<DataFileMeta> fileMetas =
                         ((CommitMessageImpl) 
commitMessage).compactIncrement().compactAfter();
                 for (DataFileMeta fileMeta : fileMetas) {
-                    Assertions.assertThat(
-                                    localFileIO.exists(
-                                            
dataFilePathFactory.toPath(fileMeta.fileName())))
+                    
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
                             .isFalse();
                 }
             } catch (Exception e) {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 0360def685..99e95cf40e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -185,7 +185,8 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
                 try (FileIndexFormat.Reader reader =
                         FileIndexFormat.createReader(
                                 fileIO.newInputStream(
-                                        
dataFilePathFactory.toPath(indexFiles.get(0))),
+                                        dataFilePathFactory.toAlignedPath(
+                                                indexFiles.get(0), 
dataFileMeta)),
                                 tableSchema.logicalRowType())) {
                     Optional<FileIndexReader> fileIndexReader =
                             reader.readColumnIndex("a").stream().findFirst();

Reply via email to