This is an automated email from the ASF dual-hosted git repository.

liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cf230b888 [Core]Clean orphan files for branch (#2863)
cf230b888 is described below

commit cf230b888fefc3e49929105eef05ba72fe0887c5
Author: TaoZex <[email protected]>
AuthorDate: Tue Jul 2 19:07:56 2024 +0800

    [Core]Clean orphan files for branch (#2863)
    
    * [Core]Clean orphan files for branch
---
 .../org/apache/paimon/utils/BranchManager.java     | 11 +++++++
 .../org/apache/paimon/utils/SnapshotManager.java   | 38 ++++++++++++++++++++--
 .../paimon/operation/OrphanFilesCleanTest.java     | 12 +++++++
 3 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 8cda5a4ed..6ff8d4c2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -41,6 +41,7 @@ import java.util.SortedMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
 import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
 import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -77,6 +78,16 @@ public class BranchManager {
         return new Path(tablePath + "/branch");
     }
 
+    /** Return the root Directory of branch by given tablePath. */
+    public static Path branchDirectory(Path tablePath) {
+        return new Path(tablePath + "/branch");
+    }
+
+    public static List<String> branchNames(FileIO fileIO, Path tablePath) 
throws IOException {
+        return listOriginalVersionedFiles(fileIO, branchDirectory(tablePath), 
BRANCH_PREFIX)
+                .collect(Collectors.toList());
+    }
+
     public static boolean isMainBranch(String branch) {
         return branch.equals(DEFAULT_MAIN_BRANCH);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c72734753..ca88259de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static org.apache.paimon.utils.BranchManager.branchNames;
 import static org.apache.paimon.utils.BranchManager.branchPath;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 
@@ -107,6 +108,10 @@ public class SnapshotManager implements Serializable {
         return new Path(branchPath(tablePath, branch) + "/snapshot");
     }
 
+    public static Path snapshotDirectory(Path tablePath, String branch) {
+        return new Path(branchPath(tablePath, branch) + "/snapshot");
+    }
+
     public Snapshot snapshot(long snapshotId) {
         Path snapshotPath = snapshotPath(snapshotId);
         return Snapshot.fromPath(fileIO, snapshotPath);
@@ -390,11 +395,25 @@ public class SnapshotManager implements Serializable {
      * be deleted by other processes, so just skip this snapshot.
      */
     public List<Snapshot> safelyGetAllSnapshots() throws IOException {
+        // For main branch
         List<Path> paths =
                 listVersionedFiles(fileIO, snapshotDirectory(), 
SNAPSHOT_PREFIX)
                         .map(id -> snapshotPath(id))
                         .collect(Collectors.toList());
 
+        // For other branch
+        List<String> allBranchNames = branchNames(fileIO, tablePath);
+        for (String branchName : allBranchNames) {
+            List<Path> branchPaths =
+                    listVersionedFiles(
+                                    fileIO,
+                                    snapshotDirectory(tablePath, branchName),
+                                    SNAPSHOT_PREFIX)
+                            .map(this::snapshotPath)
+                            .collect(Collectors.toList());
+            paths.addAll(branchPaths);
+        }
+
         List<Snapshot> snapshots = new ArrayList<>();
         for (Path path : paths) {
             Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
@@ -428,8 +447,23 @@ public class SnapshotManager implements Serializable {
      * Try to get non snapshot files. If any error occurred, just ignore it 
and return an empty
      * result.
      */
-    public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> 
fileStatusFilter) {
-        return listPathWithFilter(snapshotDirectory(), fileStatusFilter, 
nonSnapshotFileFilter());
+    public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> 
fileStatusFilter)
+            throws IOException {
+        // For main branch
+        List<Path> nonSnapshotFiles =
+                listPathWithFilter(snapshotDirectory(), fileStatusFilter, 
nonSnapshotFileFilter());
+
+        // For other branch
+        List<String> allBranchNames = branchNames(fileIO, tablePath);
+        allBranchNames.stream()
+                .map(
+                        branchName ->
+                                listPathWithFilter(
+                                        snapshotDirectory(tablePath, 
branchName),
+                                        fileStatusFilter,
+                                        nonSnapshotFileFilter()))
+                .forEach(nonSnapshotFiles::addAll);
+        return nonSnapshotFiles;
     }
 
     public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> 
fileStatusFilter) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 6dbd33c76..ec9197b8b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -81,6 +81,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
 import static org.apache.paimon.io.DataFilePathFactory.DATA_FILE_PREFIX;
+import static org.apache.paimon.utils.BranchManager.branchPath;
 import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -148,6 +149,9 @@ public class OrphanFilesCleanTest {
             }
         }
 
+        // create branch1 by tag
+        table.createBranch("branch1", allTags.get(0));
+
         // generate non used files
         int shouldBeDeleted = generateUnUsedFile();
         assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
@@ -465,6 +469,14 @@ public class OrphanFilesCleanTest {
                 fileNum,
                 Arrays.asList("manifest-list-", "manifest-", 
"index-manifest-", "UNKNOWN-"));
         shouldBeDeleted += fileNum;
+
+        // branch snapshot
+        addNonUsedFiles(
+                new Path(branchPath(tablePath, "branch1") + "/snapshot"),
+                fileNum,
+                Collections.singletonList("UNKNOWN"));
+        shouldBeDeleted += fileNum;
+
         return shouldBeDeleted;
     }
 

Reply via email to