This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 493a99fa7c [core] Support empty dirs cleaning without bucket sub dir
in orphan files cleanup (#7295)
493a99fa7c is described below
commit 493a99fa7c13bda78fbe6f606756497caee0aecc
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Feb 26 16:06:23 2026 +0800
[core] Support empty dirs cleaning without bucket sub dir in orphan files
cleanup (#7295)
Currently, OrphanFilesClean does not clean empty partition directories
with no bucket subdirectories, causing a lot of empty dirs left.
**Example Scenario:**
**Note: All belows are empty dirs.**
**Before:**
```
table_root/
├── part1=0/
│ ├── part2=a/
│ │ ├── bucket-0/ ← cleaned
│ │ └── bucket-1/ ← cleaned
│ └── part2=b/ ← NOT cleaned
└── part1=1/ ← NOT cleaned
```
**After:**
```
table_root/
├── part1=0/
│ ├── part2=a/
│ │ ├── bucket-0/ ← cleaned
│ │ └── bucket-1/ ← cleaned
│ └── part2=b/ ← cleaned
└── part1=1/ ← cleaned
```
---
.../paimon/operation/LocalOrphanFilesClean.java | 9 +-
.../apache/paimon/operation/OrphanFilesClean.java | 9 +-
.../operation/LocalOrphanFilesCleanTest.java | 62 +++++++++++
.../paimon/operation/OrphanFilesCleanTest.java | 106 ++++++++++++++++++
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 13 ++-
.../action/RemoveOrphanFilesActionITCaseBase.java | 120 ++++++++++++++++++++-
6 files changed, 315 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 76c3fe7c15..f8a2e080c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -246,7 +246,14 @@ public class LocalOrphanFilesClean extends
OrphanFilesClean {
List<FileStatus> files = tryBestListingDirs(path);
if (files.isEmpty()) {
- emptyDirs.add(path);
+ try {
+ FileStatus dirStatus = fileIO.getFileStatus(path);
+ if (oldEnough(dirStatus)) {
+ emptyDirs.add(path);
+ }
+ } catch (IOException e) {
+ LOG.warn("IOException during check dirStatus for {},
ignore it", path, e);
+ }
return Collections.emptyList();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index bb08d90a42..5b1b045a7d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -376,7 +376,14 @@ public abstract class OrphanFilesClean implements
Serializable {
List<Path> result = new ArrayList<>();
for (Path partitionPath : partitionPaths) {
- result.addAll(listFileDirs(partitionPath, level - 1));
+ List<Path> sub = listFileDirs(partitionPath, level - 1);
+ if (sub.isEmpty()) {
+ // Empty partition (no bucket subdirs), include for empty-dir
cleanup
+ LOG.info("Found empty partition directory for cleanup: {}",
partitionPath);
+ result.add(partitionPath);
+ } else {
+ result.addAll(sub);
+ }
}
return result;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
index 9d3a0e9476..5c582e22a6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
@@ -64,6 +64,8 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -558,6 +560,13 @@ public class LocalOrphanFilesCleanTest {
assertThat(fileIO.exists(emptyDirectory1)).isTrue();
assertThat(fileIO.exists(emptyDirectory2)).isTrue();
+ Files.setLastModifiedTime(
+ tempDir.resolve("part1=1/part2=2/bucket-0"),
+ FileTime.fromMillis(System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(2)));
+ Files.setLastModifiedTime(
+ tempDir.resolve("part1=1/part2=2/bucket-1"),
+ FileTime.fromMillis(System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(2)));
+
LocalOrphanFilesClean orphanFilesClean = new
LocalOrphanFilesClean(table);
List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
assertThat(fileIO.exists(emptyDirectory1)).isFalse();
@@ -566,6 +575,59 @@ public class LocalOrphanFilesCleanTest {
validate(deleted, snapshotData, new HashMap<>());
}
+ @Test
+ void testEmptyPartitionDirectories() throws Exception {
+ commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1")));
+ commit(Collections.singletonList(new TestPojo(2, 0, "b", "v2")));
+
+ Path partitionPath1 = new Path(tablePath, "part1=0/part2=a");
+ Path partitionPath2 = new Path(tablePath, "part1=0/part2=b");
+ assertThat(fileIO.exists(partitionPath1)).isTrue();
+ assertThat(fileIO.exists(partitionPath2)).isTrue();
+
+ FileStatus[] partition2Files = fileIO.listStatus(partitionPath2);
+ assertThat(partition2Files).isNotEmpty();
+ for (FileStatus file : partition2Files) {
+ if (file.isDir() &&
file.getPath().getName().startsWith("bucket-")) {
+ FileStatus[] bucketFiles = fileIO.listStatus(file.getPath());
+ for (FileStatus bucketFile : bucketFiles) {
+ fileIO.deleteQuietly(bucketFile.getPath());
+ }
+ fileIO.deleteQuietly(file.getPath());
+ }
+ }
+ assertThat(fileIO.listStatus(partitionPath2)).isEmpty();
+ assertThat(fileIO.exists(partitionPath2)).isTrue();
+
+ Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1");
+ fileIO.mkdirs(emptyNonLeafPartitionPath);
+
+ long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
+ Files.setLastModifiedTime(tempDir.resolve("part1=0/part2=b"),
FileTime.fromMillis(oldTime));
+ Files.setLastModifiedTime(tempDir.resolve("part1=1"),
FileTime.fromMillis(oldTime));
+
+ LocalOrphanFilesClean orphanFilesClean = new
LocalOrphanFilesClean(table);
+ orphanFilesClean.clean();
+
+ assertThat(fileIO.exists(partitionPath2))
+ .as("Empty partition (no bucket subdirs) is cleaned by orphan
files clean.")
+ .isFalse();
+ assertThat(fileIO.exists(emptyNonLeafPartitionPath))
+ .as(
+ "Empty non-leaf partition dir (e.g. part1=1 with no
part2) is cleaned by orphan files clean.")
+ .isFalse();
+ assertThat(fileIO.exists(partitionPath1)).isTrue();
+
+ Path recentEmptyPath = new Path(tablePath, "part1=2");
+ fileIO.mkdirs(recentEmptyPath);
+ LocalOrphanFilesClean cleanRecent =
+ new LocalOrphanFilesClean(table, System.currentTimeMillis() -
1, false);
+ cleanRecent.clean();
+ assertThat(fileIO.exists(recentEmptyPath))
+ .as("Recent empty partition dir must not be deleted (age
safeguard).")
+ .isTrue();
+ }
+
private void writeData(
SnapshotManager snapshotManager,
List<List<TestPojo>> committedData,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 97ba35b2c0..beead97cc7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -18,13 +18,43 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Utils for {@link OrphanFilesClean}. */
public class OrphanFilesCleanTest {
+ @Rule public TemporaryFolder tempDir = new TemporaryFolder();
+
@Test
public void testOlderThanMillis() {
// normal olderThan
@@ -37,4 +67,80 @@ public class OrphanFilesCleanTest {
.hasMessage(
"The arg olderThan must be less than now, because
dataFiles that are currently being written and not referenced by snapshots will
be mistakenly cleaned up.");
}
+
+ @Test
+ public void testListPaimonFileDirsWithEmptyPartition() throws Exception {
+ Path tablePath = new Path(tempDir.newFolder().toURI());
+ FileIO fileIO = LocalFileIO.create();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING(), DataTypes.STRING()
+ },
+ new String[] {"pk", "part1", "part2", "value"});
+ FileStoreTable table = createFileStoreTable(fileIO, tablePath,
rowType, new Options());
+ String commitUser = UUID.randomUUID().toString();
+ try (TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser)) {
+ write.write(
+ GenericRow.ofKind(
+ RowKind.INSERT,
+ 1,
+ 0,
+ BinaryString.fromString("a"),
+ BinaryString.fromString("v1")));
+ commit.commit(0, write.prepareCommit(true, 0));
+ }
+
+ Path emptyPartitionPath = new Path(tablePath, "part1=0/part2=b");
+ fileIO.mkdirs(emptyPartitionPath);
+ Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1");
+ fileIO.mkdirs(emptyNonLeafPartitionPath);
+
+ java.lang.reflect.Method method =
+
LocalOrphanFilesClean.class.getSuperclass().getDeclaredMethod("listPaimonFileDirs");
+ method.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ List<Path> dirs = (List<Path>) method.invoke(new
LocalOrphanFilesClean(table));
+
+ assertThat(dirs)
+ .as(
+ "Empty partition (no bucket subdirs) is listed by
listPaimonFileDirs for empty-dir cleanup")
+ .contains(emptyPartitionPath);
+ assertThat(dirs)
+ .as(
+ "Empty non-leaf partition dir (e.g. part1=1 with no
part2) is listed by listPaimonFileDirs")
+ .contains(emptyNonLeafPartitionPath);
+ }
+
+ @Test
+ public void testDeleteNonEmptyDir() throws Exception {
+ Path dir = new Path(tempDir.newFolder().toURI().toString(), "part1=0");
+ FileIO fileIO = LocalFileIO.create();
+ fileIO.mkdirs(dir);
+ Path file = new Path(dir, "data.dat");
+ fileIO.writeFile(file, "x", true);
+
+ assertThat(fileIO.exists(dir)).isTrue();
+ assertThatThrownBy(() -> fileIO.delete(dir, false))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("not empty");
+ assertThat(fileIO.exists(dir)).isTrue();
+ }
+
+ private FileStoreTable createFileStoreTable(
+ FileIO fileIO, Path tablePath, RowType rowType, Options conf)
throws Exception {
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.BUCKET, 2);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath),
+ new Schema(
+ rowType.getFields(),
+ Arrays.asList("part1", "part2"),
+ Arrays.asList("pk", "part1", "part2"),
+ conf.toMap(),
+ ""));
+ return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 1a91d937d7..d3376cb80c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -290,7 +290,18 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
}
}
if (files.isEmpty()) {
- ctx.output(emptyDirOutputTag,
dirPath);
+ try {
+ FileStatus dirStatus =
+
fileIO.getFileStatus(dirPath);
+ if (oldEnough(dirStatus)) {
+
ctx.output(emptyDirOutputTag, dirPath);
+ }
+ } catch (IOException e) {
+ LOG.warn(
+ "IOException during
check dirStatus for {}, ignore it",
+ dirPath,
+ e);
+ }
}
}
})
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 50fbd7dac1..e54fd5c662 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
@@ -41,15 +42,21 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
import static org.assertj.core.api.Assertions.assertThat;
@@ -171,8 +178,11 @@ public abstract class RemoveOrphanFilesActionITCaseBase
extends ActionITCaseBase
assertThat(fileIO.listDirectories(bucketDir)).isEmpty();
// clean empty directories
+ setLastModifiedToPastIfLocal(bucketDir, 2);
ImmutableList.copyOf(executeSQL(withOlderThan));
- assertThat(fileIO.exists(bucketDir)).isFalse();
+ if ("file".equals(bucketDir.toUri().getScheme())) {
+ assertThat(fileIO.exists(bucketDir)).isFalse();
+ }
// table should not be deleted
assertThat(fileIO.exists(location)).isTrue();
}
@@ -392,6 +402,114 @@ public abstract class RemoveOrphanFilesActionITCaseBase
extends ActionITCaseBase
.hasMessageContaining("Unknown mode");
}
+ @Test
+ public void testEmptyPartitionDirectories() throws Exception {
+ FileStoreTable table = createPartitionedTableWithData();
+ table = getFileStoreTable(tableName);
+ FileIO fileIO = table.fileIO();
+ Path location = table.location();
+ Path partitionPath1 = new Path(location, "part1=0/part2=a");
+ Path partitionPath2 = new Path(location, "part1=0/part2=b");
+ Path emptyNonLeaf = new Path(location, "part1=1");
+
+ emptyPartitionDir(fileIO, partitionPath2);
+ fileIO.mkdirs(emptyNonLeaf);
+
+ boolean localFs = "file".equals(partitionPath2.toUri().getScheme());
+ if (localFs) {
+ setLastModifiedToPastIfLocal(partitionPath2, emptyNonLeaf, 2);
+ executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')",
database, tableName));
+ assertThat(fileIO.exists(partitionPath2)).isFalse();
+ assertThat(fileIO.exists(emptyNonLeaf)).isFalse();
+ assertThat(fileIO.exists(partitionPath1)).isTrue();
+ }
+
+ Path recentEmpty = new Path(location, "part1=2");
+ fileIO.mkdirs(recentEmpty);
+ executeSQL(
+ String.format(
+ "CALL sys.remove_orphan_files('%s.%s', '%s')",
+ database, tableName, olderThanNowMinusMillis(1000)));
+ assertThat(fileIO.exists(recentEmpty)).isTrue();
+ }
+
+ private FileStoreTable createPartitionedTableWithData() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING(), DataTypes.STRING()
+ },
+ new String[] {"pk", "part1", "part2", "value"});
+ FileStoreTable table =
+ createFileStoreTable(
+ tableName,
+ rowType,
+ Arrays.asList("part1", "part2"),
+ Arrays.asList("pk", "part1", "part2", "value"),
+ Collections.emptyList(),
+ Collections.singletonMap("bucket", "2"));
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+ writeData(
+ rowData(1, 0, BinaryString.fromString("a"),
BinaryString.fromString("v1")),
+ rowData(2, 0, BinaryString.fromString("a"),
BinaryString.fromString("v2")));
+ writeData(rowData(3, 0, BinaryString.fromString("b"),
BinaryString.fromString("v3")));
+ write.close();
+ commit.close();
+ write = null;
+ commit = null;
+ return table;
+ }
+
+ private void emptyPartitionDir(FileIO fileIO, Path partitionPath) throws
IOException {
+ for (FileStatus file : fileIO.listStatus(partitionPath)) {
+ if (file.isDir() &&
file.getPath().getName().startsWith("bucket-")) {
+ for (FileStatus bucketFile :
fileIO.listStatus(file.getPath())) {
+ fileIO.deleteQuietly(bucketFile.getPath());
+ }
+ fileIO.deleteQuietly(file.getPath());
+ }
+ }
+ }
+
+ private void setLastModifiedToPastIfLocal(Path path1, Path path2, int
daysAgo) {
+ setLastModifiedToPastIfLocal(path1, daysAgo);
+ setLastModifiedToPastIfLocal(path2, daysAgo);
+ }
+
+ private void setLastModifiedToPastIfLocal(Path path, int daysAgo) {
+ try {
+ if (path.toUri().getScheme() != null &&
"file".equals(path.toUri().getScheme())) {
+ Files.setLastModifiedTime(
+ Paths.get(path.toUri()),
+ FileTime.fromMillis(
+ System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(daysAgo)));
+ }
+ } catch (Exception ignored) {
+ }
+ }
+
+ private static String olderThanNowMinusMillis(long millis) {
+ return DateTimeUtils.formatLocalDateTime(
+ DateTimeUtils.toLocalDateTime(System.currentTimeMillis() -
millis), 3);
+ }
+
+ @Test
+ public void testNonEmptyPartitionDir() throws Exception {
+ createPartitionedTableWithData();
+ FileStoreTable table = getFileStoreTable(tableName);
+ FileIO fileIO = table.fileIO();
+ Path nonEmptyPath = new Path(table.location(), "part1=0/part2=c");
+ fileIO.mkdirs(nonEmptyPath);
+ fileIO.writeFile(new Path(nonEmptyPath, "guard.txt"), "must not
delete", true);
+
+ executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')",
database, tableName));
+
+ assertThat(fileIO.exists(nonEmptyPath)).isTrue();
+ assertThat(fileIO.exists(new Path(nonEmptyPath,
"guard.txt"))).isTrue();
+ }
+
protected boolean supportNamedArgument() {
return true;
}