This is an automated email from the ASF dual-hosted git repository.

XiaoHongbo-Hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.4 by this push:
     new 0fa010844e [core] Avoid deleting directories in orphan files cleanup 
(#7920)
0fa010844e is described below

commit 0fa010844e5c2524d0b33aade4e123dd233027ac
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu May 21 16:02:28 2026 +0800

    [core] Avoid deleting directories in orphan files cleanup (#7920)
    
    This PR fixes a potential data loss risk in orphan files cleanup.
    
    The issue was introduced by #7295, which added support for cleaning
    empty partition directories without
    bucket subdirectories. In that change, `listFileDirs` may add a
    partition directory to the directories to
      be scanned when its child listing is empty.
    
    This is safe when the partition is truly empty. However, if a transient
    `listStatus` failure returns an
    empty result, a partition directory that still contains data may be
    treated as empty and later scanned
    again. When the later listing succeeds, bucket or partition directories
    could be collected as orphan file
      candidates.
    
    Before this fix, `cleanFile` recursively deleted directory candidates
    via `deleteDirectoryQuietly`, so a
    directory incorrectly collected as an orphan candidate could delete
    still-referenced data files under
      that directory.
    
    (cherry picked from commit d7b08252d1be9210c3280bae9579b2b06739430b)
---
 .../paimon/operation/LocalOrphanFilesClean.java    |  1 +
 .../apache/paimon/operation/OrphanFilesClean.java  | 11 ++--
 .../operation/LocalOrphanFilesCleanTest.java       | 58 ++++++++++++++++++++--
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java |  7 ++-
 .../spark/procedure/SparkOrphanFilesClean.scala    | 13 +++--
 5 files changed, 75 insertions(+), 15 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index f8a2e080c3..fb9f9f7589 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -258,6 +258,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
             }
 
             return files.stream()
+                    .filter(status -> !status.isDir())
                     .filter(this::oldEnough)
                     .map(status -> Pair.of(status.getPath(), status.getLen()))
                     .collect(Collectors.toList());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 5b1b045a7d..b146ffbac7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -181,6 +181,7 @@ public abstract class OrphanFilesClean implements 
Serializable {
             Path directory, Predicate<FileStatus> fileStatusFilter, 
Predicate<Path> fileFilter) {
         List<FileStatus> statuses = tryBestListingDirs(directory);
         return statuses.stream()
+                .filter(status -> !status.isDir())
                 .filter(fileStatusFilter)
                 .filter(status -> fileFilter.test(status.getPath()))
                 .map(status -> Pair.of(status.getPath(), status.getLen()))
@@ -220,11 +221,15 @@ public abstract class OrphanFilesClean implements 
Serializable {
         if (!dryRun) {
             try {
                 if (fileIO.isDir(path)) {
-                    fileIO.deleteDirectoryQuietly(path);
+                    LOG.error(
+                            "Refusing to delete directory {} in orphan file 
cleanup. "
+                                    + "This indicates a bug in candidate 
collection.",
+                            path);
                 } else {
                     fileIO.deleteQuietly(path);
                 }
-            } catch (IOException ignored) {
+            } catch (IOException e) {
+                LOG.warn("Failed to check whether {} is directory, skip 
deleting it.", path, e);
             }
         }
     }
@@ -393,7 +398,7 @@ public abstract class OrphanFilesClean implements 
Serializable {
 
         for (FileStatus status : statuses) {
             Path path = status.getPath();
-            if (filter.test(path)) {
+            if (status.isDir() && filter.test(path)) {
                 filtered.add(path);
             }
             // ignore unknown dirs
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
index bb20c9904e..b690246101 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
@@ -628,6 +628,58 @@ public class LocalOrphanFilesCleanTest {
                 .isTrue();
     }
 
+    @Test
+    void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception {
+        commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));
+
+        Path partitionPath = new Path(tablePath, "part1=0/part2=a");
+        Path bucketPath =
+                listSubDirs(partitionPath, p -> 
p.getName().startsWith(BUCKET_PATH_PREFIX)).get(0);
+        assertThat(fileIO.listStatus(bucketPath)).isNotEmpty();
+
+        Path subdirInBucket = new Path(bucketPath, "orphan-subdir");
+        fileIO.mkdirs(subdirInBucket);
+        fileIO.tryToWriteAtomic(new Path(subdirInBucket, "stale-file.tmp"), 
"data");
+
+        String bucketName = bucketPath.getName();
+        long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
+        Files.setLastModifiedTime(
+                tempDir.resolve("part1=0/part2=a/" + bucketName + 
"/orphan-subdir"),
+                FileTime.fromMillis(oldTime));
+
+        LocalOrphanFilesClean orphanFilesClean =
+                new LocalOrphanFilesClean(table, System.currentTimeMillis());
+        CleanOrphanFilesResult result = orphanFilesClean.clean();
+
+        assertThat(result.getDeletedFilesPath())
+                .noneMatch(p -> p.toString().contains("orphan-subdir"));
+        assertThat(fileIO.exists(bucketPath)).isTrue();
+        
assertThat(fileIO.listStatus(bucketPath).length).isGreaterThanOrEqualTo(1);
+    }
+
+    @Test
+    void testDirectoryInSnapshotDirNotTreatedAsCandidate() throws Exception {
+        commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));
+
+        Path snapshotDir = new Path(tablePath, "snapshot");
+        assertThat(fileIO.exists(snapshotDir)).isTrue();
+
+        Path unknownDir = new Path(snapshotDir, "UNKNOWN-stale-dir");
+        fileIO.mkdirs(unknownDir);
+
+        long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
+        Files.setLastModifiedTime(
+                tempDir.resolve("snapshot/UNKNOWN-stale-dir"), 
FileTime.fromMillis(oldTime));
+
+        LocalOrphanFilesClean orphanFilesClean =
+                new LocalOrphanFilesClean(table, System.currentTimeMillis());
+        CleanOrphanFilesResult result = orphanFilesClean.clean();
+
+        assertThat(result.getDeletedFilesPath())
+                .noneMatch(p -> p.toString().contains("UNKNOWN-stale-dir"));
+        assertThat(fileIO.exists(unknownDir)).isTrue();
+    }
+
     private void writeData(
             SnapshotManager snapshotManager,
             List<List<TestPojo>> committedData,
@@ -824,11 +876,7 @@ public class LocalOrphanFilesCleanTest {
             String fileName =
                     fileNamePrefix.get(RANDOM.nextInt(fileNamePrefix.size())) 
+ UUID.randomUUID();
             Path file = new Path(dir, fileName);
-            if (RANDOM.nextBoolean()) {
-                fileIO.tryToWriteAtomic(file, "");
-            } else {
-                fileIO.mkdirs(file);
-            }
+            fileIO.tryToWriteAtomic(file, "");
             manuallyAddedFiles.add(file);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index d3376cb80c..a30cc3e0dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -282,7 +282,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                         Path dirPath = new Path(dir);
                                         List<FileStatus> files = 
tryBestListingDirs(dirPath);
                                         for (FileStatus file : files) {
-                                            if (oldEnough(file)) {
+                                            if (!file.isDir() && 
oldEnough(file)) {
                                                 out.collect(
                                                         Tuple2.of(
                                                                 
file.getPath().toString(),
@@ -324,9 +324,12 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                             @Override
                             public void endInput() throws IOException {
                                 // delete empty dir
-                                while (!emptyDirs.isEmpty()) {
+                                while (!dryRun && !emptyDirs.isEmpty()) {
                                     Set<Path> newEmptyDir = new HashSet<>();
                                     for (Path emptyDir : emptyDirs) {
+                                        if (table.location().equals(emptyDir)) 
{
+                                            continue;
+                                        }
                                         try {
                                             if (fileIO.delete(emptyDir, 
false)) {
                                                 LOG.info("Clean empty dir: 
{}", emptyDir);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 11f1364c18..98d049d619 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -125,11 +125,14 @@ case class SparkOrphanFilesClean(
       .parallelize(fileDirs, maxFileDirsParallelism)
       .flatMap {
         dir =>
-          tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
-            file =>
-              val path = file.getPath
-              (path.getName, path.toString, file.getLen, 
path.getParent.toString)
-          }
+          tryBestListingDirs(new Path(dir)).asScala
+            .filter(file => !file.isDir())
+            .filter(oldEnough)
+            .map {
+              file =>
+                val path = file.getPath
+                (path.getName, path.toString, file.getLen, 
path.getParent.toString)
+            }
       }
       .toDF("name", "path", "len", "dataDir")
       .repartition(parallelism)

Reply via email to