This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 493a99fa7c [core] Support empty dirs cleaning without bucket sub dir 
in orphan files cleanup (#7295)
493a99fa7c is described below

commit 493a99fa7c13bda78fbe6f606756497caee0aecc
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Feb 26 16:06:23 2026 +0800

    [core] Support empty dirs cleaning without bucket sub dir in orphan files 
cleanup (#7295)
    
    Currently, OrphanFilesClean does not clean empty partition directories
    with no bucket subdirectories, causing a lot of empty dirs left.
    
    **Example Scenario:**
     **Note: All belows are empty dirs.**
    
    
    **Before:**
    ```
    table_root/
    ├── part1=0/
    │   ├── part2=a/
    │   │   ├── bucket-0/      ← cleaned
    │   │   └── bucket-1/      ← cleaned
    │   └── part2=b/           ← NOT cleaned
    └── part1=1/               ← NOT cleaned
    ```
    
    **After:**
    ```
    table_root/
    ├── part1=0/
    │   ├── part2=a/
    │   │   ├── bucket-0/      ← cleaned
    │   │   └── bucket-1/      ← cleaned
    │   └── part2=b/           ← cleaned
    └── part1=1/               ← cleaned
    ```
---
 .../paimon/operation/LocalOrphanFilesClean.java    |   9 +-
 .../apache/paimon/operation/OrphanFilesClean.java  |   9 +-
 .../operation/LocalOrphanFilesCleanTest.java       |  62 +++++++++++
 .../paimon/operation/OrphanFilesCleanTest.java     | 106 ++++++++++++++++++
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java |  13 ++-
 .../action/RemoveOrphanFilesActionITCaseBase.java  | 120 ++++++++++++++++++++-
 6 files changed, 315 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 76c3fe7c15..f8a2e080c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -246,7 +246,14 @@ public class LocalOrphanFilesClean extends 
OrphanFilesClean {
             List<FileStatus> files = tryBestListingDirs(path);
 
             if (files.isEmpty()) {
-                emptyDirs.add(path);
+                try {
+                    FileStatus dirStatus = fileIO.getFileStatus(path);
+                    if (oldEnough(dirStatus)) {
+                        emptyDirs.add(path);
+                    }
+                } catch (IOException e) {
+                    LOG.warn("IOException during check dirStatus for {}, 
ignore it", path, e);
+                }
                 return Collections.emptyList();
             }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index bb08d90a42..5b1b045a7d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -376,7 +376,14 @@ public abstract class OrphanFilesClean implements 
Serializable {
 
         List<Path> result = new ArrayList<>();
         for (Path partitionPath : partitionPaths) {
-            result.addAll(listFileDirs(partitionPath, level - 1));
+            List<Path> sub = listFileDirs(partitionPath, level - 1);
+            if (sub.isEmpty()) {
+                // Empty partition (no bucket subdirs), include for empty-dir 
cleanup
+                LOG.info("Found empty partition directory for cleanup: {}", 
partitionPath);
+                result.add(partitionPath);
+            } else {
+                result.addAll(sub);
+            }
         }
         return result;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
index 9d3a0e9476..5c582e22a6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
@@ -64,6 +64,8 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -558,6 +560,13 @@ public class LocalOrphanFilesCleanTest {
         assertThat(fileIO.exists(emptyDirectory1)).isTrue();
         assertThat(fileIO.exists(emptyDirectory2)).isTrue();
 
+        Files.setLastModifiedTime(
+                tempDir.resolve("part1=1/part2=2/bucket-0"),
+                FileTime.fromMillis(System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(2)));
+        Files.setLastModifiedTime(
+                tempDir.resolve("part1=1/part2=2/bucket-1"),
+                FileTime.fromMillis(System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(2)));
+
         LocalOrphanFilesClean orphanFilesClean = new 
LocalOrphanFilesClean(table);
         List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
         assertThat(fileIO.exists(emptyDirectory1)).isFalse();
@@ -566,6 +575,59 @@ public class LocalOrphanFilesCleanTest {
         validate(deleted, snapshotData, new HashMap<>());
     }
 
+    @Test
+    void testEmptyPartitionDirectories() throws Exception {
+        commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));
+        commit(Collections.singletonList(new TestPojo(2, 0, "b", "v2")));
+
+        Path partitionPath1 = new Path(tablePath, "part1=0/part2=a");
+        Path partitionPath2 = new Path(tablePath, "part1=0/part2=b");
+        assertThat(fileIO.exists(partitionPath1)).isTrue();
+        assertThat(fileIO.exists(partitionPath2)).isTrue();
+
+        FileStatus[] partition2Files = fileIO.listStatus(partitionPath2);
+        assertThat(partition2Files).isNotEmpty();
+        for (FileStatus file : partition2Files) {
+            if (file.isDir() && 
file.getPath().getName().startsWith("bucket-")) {
+                FileStatus[] bucketFiles = fileIO.listStatus(file.getPath());
+                for (FileStatus bucketFile : bucketFiles) {
+                    fileIO.deleteQuietly(bucketFile.getPath());
+                }
+                fileIO.deleteQuietly(file.getPath());
+            }
+        }
+        assertThat(fileIO.listStatus(partitionPath2)).isEmpty();
+        assertThat(fileIO.exists(partitionPath2)).isTrue();
+
+        Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1");
+        fileIO.mkdirs(emptyNonLeafPartitionPath);
+
+        long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
+        Files.setLastModifiedTime(tempDir.resolve("part1=0/part2=b"), 
FileTime.fromMillis(oldTime));
+        Files.setLastModifiedTime(tempDir.resolve("part1=1"), 
FileTime.fromMillis(oldTime));
+
+        LocalOrphanFilesClean orphanFilesClean = new 
LocalOrphanFilesClean(table);
+        orphanFilesClean.clean();
+
+        assertThat(fileIO.exists(partitionPath2))
+                .as("Empty partition (no bucket subdirs) is cleaned by orphan 
files clean.")
+                .isFalse();
+        assertThat(fileIO.exists(emptyNonLeafPartitionPath))
+                .as(
+                        "Empty non-leaf partition dir (e.g. part1=1 with no 
part2) is cleaned by orphan files clean.")
+                .isFalse();
+        assertThat(fileIO.exists(partitionPath1)).isTrue();
+
+        Path recentEmptyPath = new Path(tablePath, "part1=2");
+        fileIO.mkdirs(recentEmptyPath);
+        LocalOrphanFilesClean cleanRecent =
+                new LocalOrphanFilesClean(table, System.currentTimeMillis() - 
1, false);
+        cleanRecent.clean();
+        assertThat(fileIO.exists(recentEmptyPath))
+                .as("Recent empty partition dir must not be deleted (age 
safeguard).")
+                .isTrue();
+    }
+
     private void writeData(
             SnapshotManager snapshotManager,
             List<List<TestPojo>> committedData,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 97ba35b2c0..beead97cc7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -18,13 +18,43 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Utils for {@link OrphanFilesClean}. */
 public class OrphanFilesCleanTest {
 
+    @Rule public TemporaryFolder tempDir = new TemporaryFolder();
+
     @Test
     public void testOlderThanMillis() {
         // normal olderThan
@@ -37,4 +67,80 @@ public class OrphanFilesCleanTest {
                 .hasMessage(
                         "The arg olderThan must be less than now, because 
dataFiles that are currently being written and not referenced by snapshots will 
be mistakenly cleaned up.");
     }
+
+    @Test
+    public void testListPaimonFileDirsWithEmptyPartition() throws Exception {
+        Path tablePath = new Path(tempDir.newFolder().toURI());
+        FileIO fileIO = LocalFileIO.create();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING(), DataTypes.STRING()
+                        },
+                        new String[] {"pk", "part1", "part2", "value"});
+        FileStoreTable table = createFileStoreTable(fileIO, tablePath, 
rowType, new Options());
+        String commitUser = UUID.randomUUID().toString();
+        try (TableWriteImpl<?> write = table.newWrite(commitUser);
+                TableCommitImpl commit = table.newCommit(commitUser)) {
+            write.write(
+                    GenericRow.ofKind(
+                            RowKind.INSERT,
+                            1,
+                            0,
+                            BinaryString.fromString("a"),
+                            BinaryString.fromString("v1")));
+            commit.commit(0, write.prepareCommit(true, 0));
+        }
+
+        Path emptyPartitionPath = new Path(tablePath, "part1=0/part2=b");
+        fileIO.mkdirs(emptyPartitionPath);
+        Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1");
+        fileIO.mkdirs(emptyNonLeafPartitionPath);
+
+        java.lang.reflect.Method method =
+                
LocalOrphanFilesClean.class.getSuperclass().getDeclaredMethod("listPaimonFileDirs");
+        method.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        List<Path> dirs = (List<Path>) method.invoke(new 
LocalOrphanFilesClean(table));
+
+        assertThat(dirs)
+                .as(
+                        "Empty partition (no bucket subdirs) is listed by 
listPaimonFileDirs for empty-dir cleanup")
+                .contains(emptyPartitionPath);
+        assertThat(dirs)
+                .as(
+                        "Empty non-leaf partition dir (e.g. part1=1 with no 
part2) is listed by listPaimonFileDirs")
+                .contains(emptyNonLeafPartitionPath);
+    }
+
+    @Test
+    public void testDeleteNonEmptyDir() throws Exception {
+        Path dir = new Path(tempDir.newFolder().toURI().toString(), "part1=0");
+        FileIO fileIO = LocalFileIO.create();
+        fileIO.mkdirs(dir);
+        Path file = new Path(dir, "data.dat");
+        fileIO.writeFile(file, "x", true);
+
+        assertThat(fileIO.exists(dir)).isTrue();
+        assertThatThrownBy(() -> fileIO.delete(dir, false))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("not empty");
+        assertThat(fileIO.exists(dir)).isTrue();
+    }
+
+    private FileStoreTable createFileStoreTable(
+            FileIO fileIO, Path tablePath, RowType rowType, Options conf) 
throws Exception {
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.BUCKET, 2);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath),
+                        new Schema(
+                                rowType.getFields(),
+                                Arrays.asList("part1", "part2"),
+                                Arrays.asList("pk", "part1", "part2"),
+                                conf.toMap(),
+                                ""));
+        return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 1a91d937d7..d3376cb80c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -290,7 +290,18 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                             }
                                         }
                                         if (files.isEmpty()) {
-                                            ctx.output(emptyDirOutputTag, 
dirPath);
+                                            try {
+                                                FileStatus dirStatus =
+                                                        
fileIO.getFileStatus(dirPath);
+                                                if (oldEnough(dirStatus)) {
+                                                    
ctx.output(emptyDirOutputTag, dirPath);
+                                                }
+                                            } catch (IOException e) {
+                                                LOG.warn(
+                                                        "IOException during 
check dirStatus for {}, ignore it",
+                                                        dirPath,
+                                                        e);
+                                            }
                                         }
                                     }
                                 })
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 50fbd7dac1..e54fd5c662 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaChange;
@@ -41,15 +42,21 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -171,8 +178,11 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
         assertThat(fileIO.listDirectories(bucketDir)).isEmpty();
 
         // clean empty directories
+        setLastModifiedToPastIfLocal(bucketDir, 2);
         ImmutableList.copyOf(executeSQL(withOlderThan));
-        assertThat(fileIO.exists(bucketDir)).isFalse();
+        if ("file".equals(bucketDir.toUri().getScheme())) {
+            assertThat(fileIO.exists(bucketDir)).isFalse();
+        }
         // table should not be deleted
         assertThat(fileIO.exists(location)).isTrue();
     }
@@ -392,6 +402,114 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
                 .hasMessageContaining("Unknown mode");
     }
 
+    @Test
+    public void testEmptyPartitionDirectories() throws Exception {
+        FileStoreTable table = createPartitionedTableWithData();
+        table = getFileStoreTable(tableName);
+        FileIO fileIO = table.fileIO();
+        Path location = table.location();
+        Path partitionPath1 = new Path(location, "part1=0/part2=a");
+        Path partitionPath2 = new Path(location, "part1=0/part2=b");
+        Path emptyNonLeaf = new Path(location, "part1=1");
+
+        emptyPartitionDir(fileIO, partitionPath2);
+        fileIO.mkdirs(emptyNonLeaf);
+
+        boolean localFs = "file".equals(partitionPath2.toUri().getScheme());
+        if (localFs) {
+            setLastModifiedToPastIfLocal(partitionPath2, emptyNonLeaf, 2);
+            executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')", 
database, tableName));
+            assertThat(fileIO.exists(partitionPath2)).isFalse();
+            assertThat(fileIO.exists(emptyNonLeaf)).isFalse();
+            assertThat(fileIO.exists(partitionPath1)).isTrue();
+        }
+
+        Path recentEmpty = new Path(location, "part1=2");
+        fileIO.mkdirs(recentEmpty);
+        executeSQL(
+                String.format(
+                        "CALL sys.remove_orphan_files('%s.%s', '%s')",
+                        database, tableName, olderThanNowMinusMillis(1000)));
+        assertThat(fileIO.exists(recentEmpty)).isTrue();
+    }
+
+    private FileStoreTable createPartitionedTableWithData() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING(), DataTypes.STRING()
+                        },
+                        new String[] {"pk", "part1", "part2", "value"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        tableName,
+                        rowType,
+                        Arrays.asList("part1", "part2"),
+                        Arrays.asList("pk", "part1", "part2", "value"),
+                        Collections.emptyList(),
+                        Collections.singletonMap("bucket", "2"));
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+        writeData(
+                rowData(1, 0, BinaryString.fromString("a"), 
BinaryString.fromString("v1")),
+                rowData(2, 0, BinaryString.fromString("a"), 
BinaryString.fromString("v2")));
+        writeData(rowData(3, 0, BinaryString.fromString("b"), 
BinaryString.fromString("v3")));
+        write.close();
+        commit.close();
+        write = null;
+        commit = null;
+        return table;
+    }
+
+    private void emptyPartitionDir(FileIO fileIO, Path partitionPath) throws 
IOException {
+        for (FileStatus file : fileIO.listStatus(partitionPath)) {
+            if (file.isDir() && 
file.getPath().getName().startsWith("bucket-")) {
+                for (FileStatus bucketFile : 
fileIO.listStatus(file.getPath())) {
+                    fileIO.deleteQuietly(bucketFile.getPath());
+                }
+                fileIO.deleteQuietly(file.getPath());
+            }
+        }
+    }
+
+    private void setLastModifiedToPastIfLocal(Path path1, Path path2, int 
daysAgo) {
+        setLastModifiedToPastIfLocal(path1, daysAgo);
+        setLastModifiedToPastIfLocal(path2, daysAgo);
+    }
+
+    private void setLastModifiedToPastIfLocal(Path path, int daysAgo) {
+        try {
+            if (path.toUri().getScheme() != null && 
"file".equals(path.toUri().getScheme())) {
+                Files.setLastModifiedTime(
+                        Paths.get(path.toUri()),
+                        FileTime.fromMillis(
+                                System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(daysAgo)));
+            }
+        } catch (Exception ignored) {
+        }
+    }
+
+    private static String olderThanNowMinusMillis(long millis) {
+        return DateTimeUtils.formatLocalDateTime(
+                DateTimeUtils.toLocalDateTime(System.currentTimeMillis() - 
millis), 3);
+    }
+
+    @Test
+    public void testNonEmptyPartitionDir() throws Exception {
+        createPartitionedTableWithData();
+        FileStoreTable table = getFileStoreTable(tableName);
+        FileIO fileIO = table.fileIO();
+        Path nonEmptyPath = new Path(table.location(), "part1=0/part2=c");
+        fileIO.mkdirs(nonEmptyPath);
+        fileIO.writeFile(new Path(nonEmptyPath, "guard.txt"), "must not 
delete", true);
+
+        executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')", 
database, tableName));
+
+        assertThat(fileIO.exists(nonEmptyPath)).isTrue();
+        assertThat(fileIO.exists(new Path(nonEmptyPath, 
"guard.txt"))).isTrue();
+    }
+
     protected boolean supportNamedArgument() {
         return true;
     }

Reply via email to