This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d7b08252d1 [core] Avoid deleting directories in orphan files cleanup
(#7920)
d7b08252d1 is described below
commit d7b08252d1be9210c3280bae9579b2b06739430b
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu May 21 16:02:28 2026 +0800
[core] Avoid deleting directories in orphan files cleanup (#7920)
This PR fixes a potential data loss risk in orphan files cleanup.
The issue was introduced by #7295, which added support for cleaning
empty partition directories without
bucket subdirectories. In that change, `listFileDirs` may add a
partition directory to the directories to
be scanned when its child listing is empty.
This is safe when the partition is truly empty. However, if a transient
`listStatus` failure returns an
empty result, a partition directory that still contains data may be
treated as empty and later scanned
again. When the later listing succeeds, bucket or partition directories
could be collected as orphan file
candidates.
Before this fix, `cleanFile` recursively deleted directory candidates
via `deleteDirectoryQuietly`, so a
directory incorrectly collected as an orphan candidate could delete
still-referenced data files under
that directory.
---
.../paimon/operation/LocalOrphanFilesClean.java | 1 +
.../apache/paimon/operation/OrphanFilesClean.java | 11 ++--
.../operation/LocalOrphanFilesCleanTest.java | 58 ++++++++++++++++++++--
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 7 ++-
.../spark/procedure/SparkOrphanFilesClean.scala | 13 +++--
5 files changed, 75 insertions(+), 15 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index f8a2e080c3..fb9f9f7589 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -258,6 +258,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
}
return files.stream()
+ .filter(status -> !status.isDir())
.filter(this::oldEnough)
.map(status -> Pair.of(status.getPath(), status.getLen()))
.collect(Collectors.toList());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 5b1b045a7d..b146ffbac7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -181,6 +181,7 @@ public abstract class OrphanFilesClean implements
Serializable {
Path directory, Predicate<FileStatus> fileStatusFilter,
Predicate<Path> fileFilter) {
List<FileStatus> statuses = tryBestListingDirs(directory);
return statuses.stream()
+ .filter(status -> !status.isDir())
.filter(fileStatusFilter)
.filter(status -> fileFilter.test(status.getPath()))
.map(status -> Pair.of(status.getPath(), status.getLen()))
@@ -220,11 +221,15 @@ public abstract class OrphanFilesClean implements
Serializable {
if (!dryRun) {
try {
if (fileIO.isDir(path)) {
- fileIO.deleteDirectoryQuietly(path);
+ LOG.error(
+ "Refusing to delete directory {} in orphan file
cleanup. "
+ + "This indicates a bug in candidate
collection.",
+ path);
} else {
fileIO.deleteQuietly(path);
}
- } catch (IOException ignored) {
+ } catch (IOException e) {
+ LOG.warn("Failed to check whether {} is directory, skip
deleting it.", path, e);
}
}
}
@@ -393,7 +398,7 @@ public abstract class OrphanFilesClean implements
Serializable {
for (FileStatus status : statuses) {
Path path = status.getPath();
- if (filter.test(path)) {
+ if (status.isDir() && filter.test(path)) {
filtered.add(path);
}
// ignore unknown dirs
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
index bb20c9904e..b690246101 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
@@ -628,6 +628,58 @@ public class LocalOrphanFilesCleanTest {
.isTrue();
}
+ @Test
+ void testDirectoriesNotTreatedAsOrphanCandidates() throws Exception {
+ commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));
+
+ Path partitionPath = new Path(tablePath, "part1=0/part2=a");
+ Path bucketPath =
+ listSubDirs(partitionPath, p ->
p.getName().startsWith(BUCKET_PATH_PREFIX)).get(0);
+ assertThat(fileIO.listStatus(bucketPath)).isNotEmpty();
+
+ Path subdirInBucket = new Path(bucketPath, "orphan-subdir");
+ fileIO.mkdirs(subdirInBucket);
+ fileIO.tryToWriteAtomic(new Path(subdirInBucket, "stale-file.tmp"),
"data");
+
+ String bucketName = bucketPath.getName();
+ long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
+ Files.setLastModifiedTime(
+ tempDir.resolve("part1=0/part2=a/" + bucketName +
"/orphan-subdir"),
+ FileTime.fromMillis(oldTime));
+
+ LocalOrphanFilesClean orphanFilesClean =
+ new LocalOrphanFilesClean(table, System.currentTimeMillis());
+ CleanOrphanFilesResult result = orphanFilesClean.clean();
+
+ assertThat(result.getDeletedFilesPath())
+ .noneMatch(p -> p.toString().contains("orphan-subdir"));
+ assertThat(fileIO.exists(bucketPath)).isTrue();
+
assertThat(fileIO.listStatus(bucketPath).length).isGreaterThanOrEqualTo(1);
+ }
+
+ @Test
+ void testDirectoryInSnapshotDirNotTreatedAsCandidate() throws Exception {
+ commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));
+
+ Path snapshotDir = new Path(tablePath, "snapshot");
+ assertThat(fileIO.exists(snapshotDir)).isTrue();
+
+ Path unknownDir = new Path(snapshotDir, "UNKNOWN-stale-dir");
+ fileIO.mkdirs(unknownDir);
+
+ long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
+ Files.setLastModifiedTime(
+ tempDir.resolve("snapshot/UNKNOWN-stale-dir"),
FileTime.fromMillis(oldTime));
+
+ LocalOrphanFilesClean orphanFilesClean =
+ new LocalOrphanFilesClean(table, System.currentTimeMillis());
+ CleanOrphanFilesResult result = orphanFilesClean.clean();
+
+ assertThat(result.getDeletedFilesPath())
+ .noneMatch(p -> p.toString().contains("UNKNOWN-stale-dir"));
+ assertThat(fileIO.exists(unknownDir)).isTrue();
+ }
+
private void writeData(
SnapshotManager snapshotManager,
List<List<TestPojo>> committedData,
@@ -824,11 +876,7 @@ public class LocalOrphanFilesCleanTest {
String fileName =
fileNamePrefix.get(RANDOM.nextInt(fileNamePrefix.size()))
+ UUID.randomUUID();
Path file = new Path(dir, fileName);
- if (RANDOM.nextBoolean()) {
- fileIO.tryToWriteAtomic(file, "");
- } else {
- fileIO.mkdirs(file);
- }
+ fileIO.tryToWriteAtomic(file, "");
manuallyAddedFiles.add(file);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index d3376cb80c..a30cc3e0dd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -282,7 +282,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
Path dirPath = new Path(dir);
List<FileStatus> files =
tryBestListingDirs(dirPath);
for (FileStatus file : files) {
- if (oldEnough(file)) {
+ if (!file.isDir() &&
oldEnough(file)) {
out.collect(
Tuple2.of(
file.getPath().toString(),
@@ -324,9 +324,12 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
@Override
public void endInput() throws IOException {
// delete empty dir
- while (!emptyDirs.isEmpty()) {
+ while (!dryRun && !emptyDirs.isEmpty()) {
Set<Path> newEmptyDir = new HashSet<>();
for (Path emptyDir : emptyDirs) {
+ if (table.location().equals(emptyDir))
{
+ continue;
+ }
try {
if (fileIO.delete(emptyDir,
false)) {
LOG.info("Clean empty dir:
{}", emptyDir);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 11f1364c18..98d049d619 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -125,11 +125,14 @@ case class SparkOrphanFilesClean(
.parallelize(fileDirs, maxFileDirsParallelism)
.flatMap {
dir =>
- tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
- file =>
- val path = file.getPath
- (path.getName, path.toString, file.getLen,
path.getParent.toString)
- }
+ tryBestListingDirs(new Path(dir)).asScala
+ .filter(file => !file.isDir())
+ .filter(oldEnough)
+ .map {
+ file =>
+ val path = file.getPath
+ (path.getName, path.toString, file.getLen,
path.getParent.toString)
+ }
}
.toDF("name", "path", "len", "dataDir")
.repartition(parallelism)