This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit dbc0a6609587dc515171f64f67cdb868e6e94969
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jan 9 19:47:24 2025 +0800

    [core] Fix remove orphan files with data file path directory (#4871)
---
 .../apache/paimon/operation/OrphanFilesClean.java  | 11 ++--
 .../apache/paimon/utils/FileStorePathFactory.java  | 70 +++++++++++++---------
 .../procedure/RemoveOrphanFilesProcedureTest.scala | 21 +++++++
 3 files changed, 71 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index c2b9be4c27..54e0820918 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializableConsumer;
@@ -252,12 +253,14 @@ public abstract class OrphanFilesClean implements 
Serializable {
 
     /** List directories that contains data files and manifest files. */
     protected List<Path> listPaimonFileDirs() {
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+
         List<Path> paimonFileDirs = new ArrayList<>();
 
-        paimonFileDirs.add(new Path(location, "manifest"));
-        paimonFileDirs.add(new Path(location, "index"));
-        paimonFileDirs.add(new Path(location, "statistics"));
-        paimonFileDirs.addAll(listFileDirs(location, partitionKeysNum));
+        paimonFileDirs.add(pathFactory.manifestPath());
+        paimonFileDirs.add(pathFactory.indexPath());
+        paimonFileDirs.add(pathFactory.statisticsPath());
+        paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), 
partitionKeysNum));
 
         return paimonFileDirs;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index f255762cfd..5eaa6fd89d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -36,6 +36,17 @@ import java.util.stream.Collectors;
 @ThreadSafe
 public class FileStorePathFactory {
 
+    public static final String MANIFEST_PATH = "manifest";
+    public static final String MANIFEST_PREFIX = "manifest-";
+    public static final String MANIFEST_LIST_PREFIX = "manifest-list-";
+    public static final String INDEX_MANIFEST_PREFIX = "index-manifest-";
+
+    public static final String INDEX_PATH = "index";
+    public static final String INDEX_PREFIX = "index-";
+
+    public static final String STATISTICS_PATH = "statistics";
+    public static final String STATISTICS_PREFIX = "stat-";
+
     public static final String BUCKET_PATH_PREFIX = "bucket-";
 
     private final Path root;
@@ -89,6 +100,25 @@ public class FileStorePathFactory {
         return root;
     }
 
+    public Path manifestPath() {
+        return new Path(root, MANIFEST_PATH);
+    }
+
+    public Path indexPath() {
+        return new Path(root, INDEX_PATH);
+    }
+
+    public Path statisticsPath() {
+        return new Path(root, STATISTICS_PATH);
+    }
+
+    public Path dataFilePath() {
+        if (dataFilePathDirectory != null) {
+            return new Path(root, dataFilePathDirectory);
+        }
+        return root;
+    }
+
     @VisibleForTesting
     public static InternalRowPartitionComputer getPartitionComputer(
             RowType partitionType, String defaultPartValue, boolean 
legacyPartitionName) {
@@ -98,25 +128,21 @@ public class FileStorePathFactory {
     }
 
     public Path newManifestFile() {
-        return new Path(
-                root + "/manifest/manifest-" + uuid + "-" + 
manifestFileCount.getAndIncrement());
+        return toManifestFilePath(
+                MANIFEST_PREFIX + uuid + "-" + 
manifestFileCount.getAndIncrement());
     }
 
     public Path newManifestList() {
-        return new Path(
-                root
-                        + "/manifest/manifest-list-"
-                        + uuid
-                        + "-"
-                        + manifestListCount.getAndIncrement());
+        return toManifestListPath(
+                MANIFEST_LIST_PREFIX + uuid + "-" + 
manifestListCount.getAndIncrement());
     }
 
     public Path toManifestFilePath(String manifestFileName) {
-        return new Path(root + "/manifest/" + manifestFileName);
+        return new Path(manifestPath(), manifestFileName);
     }
 
     public Path toManifestListPath(String manifestListName) {
-        return new Path(root + "/manifest/" + manifestListName);
+        return new Path(manifestPath(), manifestListName);
     }
 
     public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, 
int bucket) {
@@ -201,17 +227,13 @@ public class FileStorePathFactory {
         return new PathFactory() {
             @Override
             public Path newPath() {
-                return new Path(
-                        root
-                                + "/manifest/index-manifest-"
-                                + uuid
-                                + "-"
-                                + indexManifestCount.getAndIncrement());
+                return toPath(
+                        INDEX_MANIFEST_PREFIX + uuid + "-" + 
indexManifestCount.getAndIncrement());
             }
 
             @Override
             public Path toPath(String fileName) {
-                return new Path(root + "/manifest/" + fileName);
+                return new Path(manifestPath(), fileName);
             }
         };
     }
@@ -220,13 +242,12 @@ public class FileStorePathFactory {
         return new PathFactory() {
             @Override
             public Path newPath() {
-                return new Path(
-                        root + "/index/index-" + uuid + "-" + 
indexFileCount.getAndIncrement());
+                return toPath(INDEX_PREFIX + uuid + "-" + 
indexFileCount.getAndIncrement());
             }
 
             @Override
             public Path toPath(String fileName) {
-                return new Path(root + "/index/" + fileName);
+                return new Path(indexPath(), fileName);
             }
         };
     }
@@ -235,17 +256,12 @@ public class FileStorePathFactory {
         return new PathFactory() {
             @Override
             public Path newPath() {
-                return new Path(
-                        root
-                                + "/statistics/stats-"
-                                + uuid
-                                + "-"
-                                + statsFileCount.getAndIncrement());
+                return toPath(STATISTICS_PREFIX + uuid + "-" + 
statsFileCount.getAndIncrement());
             }
 
             @Override
             public Path toPath(String fileName) {
-                return new Path(root + "/statistics/" + fileName);
+                return new Path(statisticsPath(), fileName);
             }
         };
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index b1bb3124e3..b680e5b702 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -224,4 +224,25 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0, 0) :: Nil)
   }
 
+  test("Paimon procedure: remove orphan files with data file path directory") {
+    sql(s"""
+           |CREATE TABLE T (id STRING, name STRING)
+           |USING PAIMON
+           |TBLPROPERTIES ('primary-key'='id', 
'data-file.path-directory'='data')
+           |""".stripMargin)
+
+    sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')")
+
+    val table = loadTable("T")
+    val orphanFile = new Path(table.store().pathFactory().dataFilePath(), 
ORPHAN_FILE_1)
+    table.fileIO().tryToWriteAtomic(orphanFile, "b")
+
+    Thread.sleep(1000)
+    val older_than = DateTimeUtils.formatLocalDateTime(
+      DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
+      3)
+    checkAnswer(
+      sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than')"),
+      Row(1, 1) :: Nil)
+  }
 }

Reply via email to