This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9791e0974 [core] OrphanFilesClean now support branches (#3970)
9791e0974 is described below

commit 9791e0974203394659b6c5a1b5af0d7229f2d79e
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 16 12:04:10 2024 +0800

    [core] OrphanFilesClean now support branches (#3970)
---
 .../apache/paimon/operation/OrphanFilesClean.java  | 106 +++++++++++++--------
 .../paimon/privilege/PrivilegedFileStoreTable.java |   6 ++
 .../paimon/table/AbstractFileStoreTable.java       |  26 +++--
 .../java/org/apache/paimon/table/DataTable.java    |   6 ++
 .../paimon/table/FallbackReadFileStoreTable.java   |  11 +++
 .../org/apache/paimon/table/FileStoreTable.java    |   7 ++
 .../apache/paimon/table/FileStoreTableFactory.java |   2 +-
 .../apache/paimon/table/system/AuditLogTable.java  |   5 +
 .../apache/paimon/table/system/BucketsTable.java   |   5 +
 .../paimon/table/system/FileMonitorTable.java      |   5 +
 .../paimon/table/system/ReadOptimizedTable.java    |   5 +
 .../org/apache/paimon/utils/BranchManager.java     |  11 ---
 .../org/apache/paimon/utils/SnapshotManager.java   |  38 +-------
 .../paimon/operation/OrphanFilesCleanTest.java     |   3 +
 .../action/RemoveOrphanFilesActionITCase.java      |  60 ++++++++++++
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   5 +-
 16 files changed, 202 insertions(+), 99 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index f0d49964c..dd57f269c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.Changelog;
-import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
@@ -33,8 +32,10 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -102,30 +103,21 @@ public class OrphanFilesClean {
     private static final int READ_FILE_RETRY_INTERVAL = 5;
     private static final int SHOW_LIMIT = 200;
 
-    private final SnapshotManager snapshotManager;
-    private final TagManager tagManager;
+    private final FileStoreTable table;
     private final FileIO fileIO;
     private final Path location;
     private final int partitionKeysNum;
-    private final ManifestList manifestList;
-    private final ManifestFile manifestFile;
-    private final IndexFileHandler indexFileHandler;
 
     private final List<Path> deleteFiles;
     private long olderThanMillis = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(1);
     private Consumer<Path> fileCleaner;
 
     public OrphanFilesClean(FileStoreTable table) {
-        this.snapshotManager = table.snapshotManager();
-        this.tagManager = table.tagManager();
+        this.table = table;
         this.fileIO = table.fileIO();
         this.location = table.location();
         this.partitionKeysNum = table.partitionKeys().size();
 
-        FileStore<?> store = table.store();
-        this.manifestList = store.manifestListFactory().create();
-        this.manifestFile = store.manifestFileFactory().create();
-        this.indexFileHandler = store.newIndexFileHandler();
         this.deleteFiles = new ArrayList<>();
         this.fileCleaner =
                 path -> {
@@ -154,23 +146,42 @@ public class OrphanFilesClean {
     }
 
     public List<Path> clean() throws IOException, ExecutionException, 
InterruptedException {
-        if (snapshotManager.earliestSnapshotId() == null) {
-            LOG.info("No snapshot found, skip removing.");
+        List<String> branches = table.branchManager().branches();
+        branches.add(BranchManager.DEFAULT_MAIN_BRANCH);
+
+        List<String> abnormalBranches = new ArrayList<>();
+        for (String branch : branches) {
+            if (!new SchemaManager(table.fileIO(), table.location(), 
branch).latest().isPresent()) {
+                abnormalBranches.add(branch);
+            }
+        }
+        if (!abnormalBranches.isEmpty()) {
+            LOG.warn(
+                    "Branches {} have no schemas. Orphan files cleaning 
aborted. "
+                            + "Please check these branches manually.",
+                    abnormalBranches);
             return Collections.emptyList();
         }
 
-        // specially handle the snapshot directory
-        List<Path> nonSnapshotFiles = 
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
-        nonSnapshotFiles.forEach(fileCleaner);
-        deleteFiles.addAll(nonSnapshotFiles);
+        Map<String, Path> candidates = getCandidateDeletingFiles();
+        Set<String> usedFiles = new HashSet<>();
 
-        // specially handle the changelog directory
-        List<Path> nonChangelogFiles = 
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
-        nonChangelogFiles.forEach(fileCleaner);
-        deleteFiles.addAll(nonChangelogFiles);
+        for (String branch : branches) {
+            FileStoreTable branchTable = table.switchToBranch(branch);
+            SnapshotManager snapshotManager = branchTable.snapshotManager();
 
-        Map<String, Path> candidates = getCandidateDeletingFiles();
-        Set<String> usedFiles = getUsedFiles();
+            // specially handle the snapshot directory
+            List<Path> nonSnapshotFiles = 
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
+            nonSnapshotFiles.forEach(fileCleaner);
+            deleteFiles.addAll(nonSnapshotFiles);
+
+            // specially handle the changelog directory
+            List<Path> nonChangelogFiles = 
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
+            nonChangelogFiles.forEach(fileCleaner);
+            deleteFiles.addAll(nonChangelogFiles);
+
+            usedFiles.addAll(getUsedFiles(branchTable));
+        }
 
         Set<String> deleted = new HashSet<>(candidates.keySet());
         deleted.removeAll(usedFiles);
@@ -181,21 +192,33 @@ public class OrphanFilesClean {
     }
 
     /** Get all the files used by snapshots and tags. */
-    private Set<String> getUsedFiles()
-            throws IOException, ExecutionException, InterruptedException {
+    private Set<String> getUsedFiles(FileStoreTable branchTable) throws 
IOException {
+        SnapshotManager snapshotManager = branchTable.snapshotManager();
+        TagManager tagManager = branchTable.tagManager();
+
         // safely get all snapshots to be read
         Set<Snapshot> readSnapshots = new 
HashSet<>(snapshotManager.safelyGetAllSnapshots());
         List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
         readSnapshots.addAll(taggedSnapshots);
         readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
-        return Sets.newHashSet(randomlyExecute(EXECUTOR, this::getUsedFiles, 
readSnapshots));
+
+        return Sets.newHashSet(
+                randomlyExecute(
+                        EXECUTOR, snapshot -> getUsedFiles(branchTable, 
snapshot), readSnapshots));
     }
 
-    private List<String> getUsedFiles(Snapshot snapshot) {
+    private List<String> getUsedFiles(FileStoreTable branchTable, Snapshot 
snapshot) {
+        ManifestList manifestList = 
branchTable.store().manifestListFactory().create();
+        ManifestFile manifestFile = 
branchTable.store().manifestFileFactory().create();
+
         if (snapshot instanceof Changelog) {
-            return getUsedFilesForChangelog((Changelog) snapshot);
+            return getUsedFilesForChangelog(manifestList, manifestFile, 
(Changelog) snapshot);
         } else {
-            return getUsedFilesForSnapshot(snapshot);
+            return getUsedFilesForSnapshot(
+                    manifestList,
+                    manifestFile,
+                    branchTable.store().newIndexFileHandler(),
+                    snapshot);
         }
     }
 
@@ -220,7 +243,8 @@ public class OrphanFilesClean {
         return result;
     }
 
-    private List<String> getUsedFilesForChangelog(Changelog changelog) {
+    private List<String> getUsedFilesForChangelog(
+            ManifestList manifestList, ManifestFile manifestFile, Changelog 
changelog) {
         List<String> files = new ArrayList<>();
         List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
         try {
@@ -290,7 +314,7 @@ public class OrphanFilesClean {
             }
 
             // try to read data files
-            List<String> dataFiles = retryReadingDataFiles(manifestFileName);
+            List<String> dataFiles = retryReadingDataFiles(manifestFile, 
manifestFileName);
             if (dataFiles == null) {
                 return Collections.emptyList();
             }
@@ -306,14 +330,19 @@ public class OrphanFilesClean {
      * If getting null when reading some files, the snapshot/tag is being 
deleted, so just return an
      * empty result.
      */
-    private List<String> getUsedFilesForSnapshot(Snapshot snapshot) {
+    private List<String> getUsedFilesForSnapshot(
+            ManifestList manifestList,
+            ManifestFile manifestFile,
+            IndexFileHandler indexFileHandler,
+            Snapshot snapshot) {
         List<String> files = new ArrayList<>();
         addManifestList(files, snapshot);
 
         try {
             // try to read manifests
             List<ManifestFileMeta> manifestFileMetas =
-                    retryReadingFiles(() -> 
readAllManifestsWithIOException(snapshot));
+                    retryReadingFiles(
+                            () -> 
readAllManifestsWithIOException(manifestList, snapshot));
             if (manifestFileMetas == null) {
                 return Collections.emptyList();
             }
@@ -324,7 +353,7 @@ public class OrphanFilesClean {
             files.addAll(manifestFileName);
 
             // try to read data files
-            List<String> dataFiles = retryReadingDataFiles(manifestFileName);
+            List<String> dataFiles = retryReadingDataFiles(manifestFile, 
manifestFileName);
             if (dataFiles == null) {
                 return Collections.emptyList();
             }
@@ -396,8 +425,8 @@ public class OrphanFilesClean {
         throw caught;
     }
 
-    private List<ManifestFileMeta> readAllManifestsWithIOException(Snapshot 
snapshot)
-            throws IOException {
+    private List<ManifestFileMeta> readAllManifestsWithIOException(
+            ManifestList manifestList, Snapshot snapshot) throws IOException {
         List<ManifestFileMeta> result = new ArrayList<>();
 
         
result.addAll(manifestList.readWithIOException(snapshot.baseManifestList()));
@@ -412,7 +441,8 @@ public class OrphanFilesClean {
     }
 
     @Nullable
-    private List<String> retryReadingDataFiles(List<String> manifestNames) 
throws IOException {
+    private List<String> retryReadingDataFiles(
+            ManifestFile manifestFile, List<String> manifestNames) throws 
IOException {
         List<String> dataFiles = new ArrayList<>();
         for (String manifestName : manifestNames) {
             List<ManifestEntry> manifestEntries =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index f412d4e5e..6842f0142 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -235,6 +235,12 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         return wrapped.newLocalTableQuery();
     }
 
+    @Override
+    public FileStoreTable switchToBranch(String branchName) {
+        return new PrivilegedFileStoreTable(
+                wrapped.switchToBranch(branchName), privilegeChecker, 
identifier);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 95b9d8dcf..a3b1735dd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -58,6 +58,7 @@ import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanne
 import 
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
 import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.SnapshotNotExistException;
@@ -270,15 +271,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         CoreOptions.setDefaultValues(newOptions);
 
         // copy a new table schema to contain dynamic options
-        TableSchema newTableSchema = tableSchema;
-        if (newOptions.contains(CoreOptions.BRANCH)) {
-            newTableSchema =
-                    schemaManager()
-                            .copyWithBranch(new 
CoreOptions(newOptions).branch())
-                            .latest()
-                            .get();
-        }
-        newTableSchema = newTableSchema.copy(newOptions.toMap());
+        TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
 
         if (tryTimeTravel) {
             // see if merged options contain time travel option
@@ -624,6 +617,21 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return new BranchManager(fileIO, path, snapshotManager(), 
tagManager(), schemaManager());
     }
 
+    @Override
+    public FileStoreTable switchToBranch(String branchName) {
+        Optional<TableSchema> optionalSchema =
+                new SchemaManager(fileIO(), location(), branchName).latest();
+        Preconditions.checkArgument(
+                optionalSchema.isPresent(), "Branch " + branchName + " does 
not exist");
+
+        TableSchema branchSchema = optionalSchema.get();
+        Options branchOptions = new Options(branchSchema.options());
+        branchOptions.set(CoreOptions.BRANCH, branchName);
+        branchSchema = branchSchema.copy(branchOptions.toMap());
+        return FileStoreTableFactory.create(
+                fileIO(), location(), branchSchema, new Options(), 
catalogEnvironment());
+    }
+
     private RollbackHelper rollbackHelper() {
         return new RollbackHelper(
                 snapshotManager(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index 3c56b4b3b..e330db0e0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -43,6 +43,12 @@ public interface DataTable extends InnerTable {
 
     BranchManager branchManager();
 
+    /**
+     * Get {@link DataTable} with branch identified by {@code branchName}. 
Note that this method
+     * does not keep dynamic options in current table.
+     */
+    DataTable switchToBranch(String branchName);
+
     Path location();
 
     FileIO fileIO();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 0cd991b7f..f7238f033 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -147,9 +147,20 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
                 wrapped.copyWithLatestSchema(), 
fallback.copyWithLatestSchema());
     }
 
+    @Override
+    public FileStoreTable switchToBranch(String branchName) {
+        return new 
FallbackReadFileStoreTable(wrapped.switchToBranch(branchName), fallback);
+    }
+
     private Map<String, String> rewriteFallbackOptions(Map<String, String> 
options) {
         Map<String, String> result = new HashMap<>(options);
 
+        // branch of fallback table should never change
+        String branchKey = CoreOptions.BRANCH.key();
+        if (options.containsKey(branchKey)) {
+            result.put(branchKey, fallback.options().get(branchKey));
+        }
+
         // snapshot ids may be different between the main branch and the 
fallback branch,
         // so we need to convert main branch snapshot id to millisecond,
         // then convert millisecond to fallback branch snapshot id
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index ed1ba1da5..3bd337294 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -111,4 +111,11 @@ public interface FileStoreTable extends DataTable {
     boolean supportStreamingReadOverwrite();
 
     RowKeyExtractor createRowKeyExtractor();
+
+    /**
+     * Get {@link DataTable} with branch identified by {@code branchName}. 
Note that this method
+     * does not keep dynamic options in current table.
+     */
+    @Override
+    FileStoreTable switchToBranch(String branchName);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 58449c9d7..d0753259f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -91,7 +91,7 @@ public class FileStoreTableFactory {
         Options options = new Options(table.options());
         String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
         if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
-            Options branchOptions = new Options();
+            Options branchOptions = new Options(dynamicOptions.toMap());
             branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
             FileStoreTable fallbackTable =
                     createWithoutFallbackBranch(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 7192c3630..559649331 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -174,6 +174,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         return wrapped.branchManager();
     }
 
+    @Override
+    public DataTable switchToBranch(String branchName) {
+        return new AuditLogTable(wrapped.switchToBranch(branchName));
+    }
+
     @Override
     public InnerTableRead newRead() {
         return new AuditLogRead(wrapped.newRead());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 7f1e8fb0c..7b67f00bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -129,6 +129,11 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
         return wrapped.branchManager();
     }
 
+    @Override
+    public DataTable switchToBranch(String branchName) {
+        return new BucketsTable(wrapped.switchToBranch(branchName), 
isContinuous, databaseName);
+    }
+
     @Override
     public String name() {
         return "__internal_buckets_" + wrapped.location().getName();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 8eb97130e..bd4efc80e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -116,6 +116,11 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
         return wrapped.branchManager();
     }
 
+    @Override
+    public DataTable switchToBranch(String branchName) {
+        return new FileMonitorTable(wrapped.switchToBranch(branchName));
+    }
+
     @Override
     public String name() {
         return "__internal_file_monitor_" + wrapped.location().getName();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 6be556f0b..e7b3bd171 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -145,6 +145,11 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
         return wrapped.branchManager();
     }
 
+    @Override
+    public DataTable switchToBranch(String branchName) {
+        return new ReadOptimizedTable(wrapped.switchToBranch(branchName));
+    }
+
     @Override
     public InnerTableRead newRead() {
         return wrapped.newRead();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 0e905cd68..5a439a8ec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
 import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
 import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -70,16 +69,6 @@ public class BranchManager {
         return new Path(tablePath + "/branch");
     }
 
-    /** Return the root Directory of branch by given tablePath. */
-    public static Path branchDirectory(Path tablePath) {
-        return new Path(tablePath + "/branch");
-    }
-
-    public static List<String> branchNames(FileIO fileIO, Path tablePath) 
throws IOException {
-        return listOriginalVersionedFiles(fileIO, branchDirectory(tablePath), 
BRANCH_PREFIX)
-                .collect(Collectors.toList());
-    }
-
     public static boolean isMainBranch(String branch) {
         return branch.equals(DEFAULT_MAIN_BRANCH);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c5fdb042e..9cce9233f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -50,7 +50,6 @@ import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
-import static org.apache.paimon.utils.BranchManager.branchNames;
 import static org.apache.paimon.utils.BranchManager.branchPath;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 
@@ -117,10 +116,6 @@ public class SnapshotManager implements Serializable {
         return new Path(branchPath(tablePath, branch) + "/snapshot");
     }
 
-    public static Path snapshotDirectory(Path tablePath, String branch) {
-        return new Path(branchPath(tablePath, branch) + "/snapshot");
-    }
-
     public Snapshot snapshot(long snapshotId) {
         Path snapshotPath = snapshotPath(snapshotId);
         return Snapshot.fromPath(fileIO, snapshotPath);
@@ -405,25 +400,11 @@ public class SnapshotManager implements Serializable {
      * be deleted by other processes, so just skip this snapshot.
      */
     public List<Snapshot> safelyGetAllSnapshots() throws IOException {
-        // For main branch
         List<Path> paths =
                 listVersionedFiles(fileIO, snapshotDirectory(), 
SNAPSHOT_PREFIX)
                         .map(id -> snapshotPath(id))
                         .collect(Collectors.toList());
 
-        // For other branch
-        List<String> allBranchNames = branchNames(fileIO, tablePath);
-        for (String branchName : allBranchNames) {
-            List<Path> branchPaths =
-                    listVersionedFiles(
-                                    fileIO,
-                                    snapshotDirectory(tablePath, branchName),
-                                    SNAPSHOT_PREFIX)
-                            .map(this::snapshotPath)
-                            .collect(Collectors.toList());
-            paths.addAll(branchPaths);
-        }
-
         List<Snapshot> snapshots = new ArrayList<>();
         for (Path path : paths) {
             Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
@@ -457,23 +438,8 @@ public class SnapshotManager implements Serializable {
      * Try to get non snapshot files. If any error occurred, just ignore it 
and return an empty
      * result.
      */
-    public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> 
fileStatusFilter)
-            throws IOException {
-        // For main branch
-        List<Path> nonSnapshotFiles =
-                listPathWithFilter(snapshotDirectory(), fileStatusFilter, 
nonSnapshotFileFilter());
-
-        // For other branch
-        List<String> allBranchNames = branchNames(fileIO, tablePath);
-        allBranchNames.stream()
-                .map(
-                        branchName ->
-                                listPathWithFilter(
-                                        snapshotDirectory(tablePath, 
branchName),
-                                        fileStatusFilter,
-                                        nonSnapshotFileFilter()))
-                .forEach(nonSnapshotFiles::addAll);
-        return nonSnapshotFiles;
+    public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> 
fileStatusFilter) {
+        return listPathWithFilter(snapshotDirectory(), fileStatusFilter, 
nonSnapshotFileFilter());
     }
 
     public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> 
fileStatusFilter) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index ff67dbc27..7e912a839 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -361,6 +361,9 @@ public class OrphanFilesCleanTest {
         SnapshotManager snapshotManager = table.snapshotManager();
         writeData(snapshotManager, committedData, snapshotData, changelogData, 
commitTimes);
 
+        // create empty branch with same schema
+        table.createBranch("branch1");
+
         // generate non used files
         int shouldBeDeleted = generateUnUsedFile();
         assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 9e7b6a7c3..653e97cc6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -18,10 +18,19 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -37,6 +46,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -191,4 +201,54 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
                         Row.of(orphanFile21.toUri().getPath()),
                         Row.of(orphanFile22.toUri().getPath()));
     }
+
+    @Test
+    public void testCleanWithBranch() throws Exception {
+        // create main branch
+        FileStoreTable table = createTableAndWriteData(tableName);
+        Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
+        Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
+
+        // create first branch and write some data
+        table.createBranch("br");
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location(), "br");
+        TableSchema branchSchema =
+                schemaManager.commitChanges(SchemaChange.addColumn("v2", 
DataTypes.INT()));
+        Options branchOptions = new Options(branchSchema.options());
+        branchOptions.set(CoreOptions.BRANCH, "br");
+        branchSchema = branchSchema.copy(branchOptions.toMap());
+        FileStoreTable branchTable =
+                FileStoreTableFactory.create(table.fileIO(), table.location(), 
branchSchema);
+
+        String commitUser = UUID.randomUUID().toString();
+        StreamTableWrite write = branchTable.newWrite(commitUser);
+        StreamTableCommit commit = branchTable.newCommit(commitUser);
+        write.write(GenericRow.of(2L, BinaryString.fromString("Hello"), 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+        write.close();
+        commit.close();
+
+        // create orphan file in snapshot directory of first branch
+        Path orphanFile3 = new Path(table.location(), 
"branch/branch-br/snapshot/orphan_file3");
+        branchTable.fileIO().writeFile(orphanFile3, "x", true);
+
+        // create second branch, which is empty
+        table.createBranch("br2");
+
+        // create orphan file in snapshot directory of second branch
+        Path orphanFile4 = new Path(table.location(), 
"branch/branch-br2/snapshot/orphan_file4");
+        branchTable.fileIO().writeFile(orphanFile4, "y", true);
+
+        String procedure =
+                String.format(
+                        "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 
23:59:59')",
+                        database, "*");
+        ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(callProcedure(procedure));
+        assertThat(actualDeleteFile)
+                .containsExactlyInAnyOrder(
+                        Row.of(orphanFile1.toUri().getPath()),
+                        Row.of(orphanFile2.toUri().getPath()),
+                        Row.of(orphanFile3.toUri().getPath()),
+                        Row.of(orphanFile4.toUri().getPath()));
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 36a11cdc7..58289b179 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -41,7 +41,6 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -346,9 +345,7 @@ public class HiveCatalog extends AbstractCatalog {
                 continue;
             }
 
-            FileStoreTable table =
-                    FileStoreTableFactory.create(
-                            mainTable.fileIO(), mainTable.location(), 
branchSchema.get());
+            FileStoreTable table = mainTable.switchToBranch(branchName);
             if 
(!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty())
 {
                 return true;
             }

Reply via email to