This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9791e0974 [core] OrphanFilesClean now support branches (#3970)
9791e0974 is described below
commit 9791e0974203394659b6c5a1b5af0d7229f2d79e
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 16 12:04:10 2024 +0800
[core] OrphanFilesClean now support branches (#3970)
---
.../apache/paimon/operation/OrphanFilesClean.java | 106 +++++++++++++--------
.../paimon/privilege/PrivilegedFileStoreTable.java | 6 ++
.../paimon/table/AbstractFileStoreTable.java | 26 +++--
.../java/org/apache/paimon/table/DataTable.java | 6 ++
.../paimon/table/FallbackReadFileStoreTable.java | 11 +++
.../org/apache/paimon/table/FileStoreTable.java | 7 ++
.../apache/paimon/table/FileStoreTableFactory.java | 2 +-
.../apache/paimon/table/system/AuditLogTable.java | 5 +
.../apache/paimon/table/system/BucketsTable.java | 5 +
.../paimon/table/system/FileMonitorTable.java | 5 +
.../paimon/table/system/ReadOptimizedTable.java | 5 +
.../org/apache/paimon/utils/BranchManager.java | 11 ---
.../org/apache/paimon/utils/SnapshotManager.java | 38 +-------
.../paimon/operation/OrphanFilesCleanTest.java | 3 +
.../action/RemoveOrphanFilesActionITCase.java | 60 ++++++++++++
.../java/org/apache/paimon/hive/HiveCatalog.java | 5 +-
16 files changed, 202 insertions(+), 99 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index f0d49964c..dd57f269c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -19,7 +19,6 @@
package org.apache.paimon.operation;
import org.apache.paimon.Changelog;
-import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
@@ -33,8 +32,10 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -102,30 +103,21 @@ public class OrphanFilesClean {
private static final int READ_FILE_RETRY_INTERVAL = 5;
private static final int SHOW_LIMIT = 200;
- private final SnapshotManager snapshotManager;
- private final TagManager tagManager;
+ private final FileStoreTable table;
private final FileIO fileIO;
private final Path location;
private final int partitionKeysNum;
- private final ManifestList manifestList;
- private final ManifestFile manifestFile;
- private final IndexFileHandler indexFileHandler;
private final List<Path> deleteFiles;
private long olderThanMillis = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(1);
private Consumer<Path> fileCleaner;
public OrphanFilesClean(FileStoreTable table) {
- this.snapshotManager = table.snapshotManager();
- this.tagManager = table.tagManager();
+ this.table = table;
this.fileIO = table.fileIO();
this.location = table.location();
this.partitionKeysNum = table.partitionKeys().size();
- FileStore<?> store = table.store();
- this.manifestList = store.manifestListFactory().create();
- this.manifestFile = store.manifestFileFactory().create();
- this.indexFileHandler = store.newIndexFileHandler();
this.deleteFiles = new ArrayList<>();
this.fileCleaner =
path -> {
@@ -154,23 +146,42 @@ public class OrphanFilesClean {
}
public List<Path> clean() throws IOException, ExecutionException,
InterruptedException {
- if (snapshotManager.earliestSnapshotId() == null) {
- LOG.info("No snapshot found, skip removing.");
+ List<String> branches = table.branchManager().branches();
+ branches.add(BranchManager.DEFAULT_MAIN_BRANCH);
+
+ List<String> abnormalBranches = new ArrayList<>();
+ for (String branch : branches) {
+ if (!new SchemaManager(table.fileIO(), table.location(),
branch).latest().isPresent()) {
+ abnormalBranches.add(branch);
+ }
+ }
+ if (!abnormalBranches.isEmpty()) {
+ LOG.warn(
+ "Branches {} have no schemas. Orphan files cleaning
aborted. "
+ + "Please check these branches manually.",
+ abnormalBranches);
return Collections.emptyList();
}
- // specially handle the snapshot directory
- List<Path> nonSnapshotFiles =
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
- nonSnapshotFiles.forEach(fileCleaner);
- deleteFiles.addAll(nonSnapshotFiles);
+ Map<String, Path> candidates = getCandidateDeletingFiles();
+ Set<String> usedFiles = new HashSet<>();
- // specially handle the changelog directory
- List<Path> nonChangelogFiles =
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
- nonChangelogFiles.forEach(fileCleaner);
- deleteFiles.addAll(nonChangelogFiles);
+ for (String branch : branches) {
+ FileStoreTable branchTable = table.switchToBranch(branch);
+ SnapshotManager snapshotManager = branchTable.snapshotManager();
- Map<String, Path> candidates = getCandidateDeletingFiles();
- Set<String> usedFiles = getUsedFiles();
+ // specially handle the snapshot directory
+ List<Path> nonSnapshotFiles =
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
+ nonSnapshotFiles.forEach(fileCleaner);
+ deleteFiles.addAll(nonSnapshotFiles);
+
+ // specially handle the changelog directory
+ List<Path> nonChangelogFiles =
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
+ nonChangelogFiles.forEach(fileCleaner);
+ deleteFiles.addAll(nonChangelogFiles);
+
+ usedFiles.addAll(getUsedFiles(branchTable));
+ }
Set<String> deleted = new HashSet<>(candidates.keySet());
deleted.removeAll(usedFiles);
@@ -181,21 +192,33 @@ public class OrphanFilesClean {
}
/** Get all the files used by snapshots and tags. */
- private Set<String> getUsedFiles()
- throws IOException, ExecutionException, InterruptedException {
+ private Set<String> getUsedFiles(FileStoreTable branchTable) throws
IOException {
+ SnapshotManager snapshotManager = branchTable.snapshotManager();
+ TagManager tagManager = branchTable.tagManager();
+
// safely get all snapshots to be read
Set<Snapshot> readSnapshots = new
HashSet<>(snapshotManager.safelyGetAllSnapshots());
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
readSnapshots.addAll(taggedSnapshots);
readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
- return Sets.newHashSet(randomlyExecute(EXECUTOR, this::getUsedFiles,
readSnapshots));
+
+ return Sets.newHashSet(
+ randomlyExecute(
+ EXECUTOR, snapshot -> getUsedFiles(branchTable,
snapshot), readSnapshots));
}
- private List<String> getUsedFiles(Snapshot snapshot) {
+ private List<String> getUsedFiles(FileStoreTable branchTable, Snapshot
snapshot) {
+ ManifestList manifestList =
branchTable.store().manifestListFactory().create();
+ ManifestFile manifestFile =
branchTable.store().manifestFileFactory().create();
+
if (snapshot instanceof Changelog) {
- return getUsedFilesForChangelog((Changelog) snapshot);
+ return getUsedFilesForChangelog(manifestList, manifestFile,
(Changelog) snapshot);
} else {
- return getUsedFilesForSnapshot(snapshot);
+ return getUsedFilesForSnapshot(
+ manifestList,
+ manifestFile,
+ branchTable.store().newIndexFileHandler(),
+ snapshot);
}
}
@@ -220,7 +243,8 @@ public class OrphanFilesClean {
return result;
}
- private List<String> getUsedFilesForChangelog(Changelog changelog) {
+ private List<String> getUsedFilesForChangelog(
+ ManifestList manifestList, ManifestFile manifestFile, Changelog
changelog) {
List<String> files = new ArrayList<>();
List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
try {
@@ -290,7 +314,7 @@ public class OrphanFilesClean {
}
// try to read data files
- List<String> dataFiles = retryReadingDataFiles(manifestFileName);
+ List<String> dataFiles = retryReadingDataFiles(manifestFile,
manifestFileName);
if (dataFiles == null) {
return Collections.emptyList();
}
@@ -306,14 +330,19 @@ public class OrphanFilesClean {
* If getting null when reading some files, the snapshot/tag is being
deleted, so just return an
* empty result.
*/
- private List<String> getUsedFilesForSnapshot(Snapshot snapshot) {
+ private List<String> getUsedFilesForSnapshot(
+ ManifestList manifestList,
+ ManifestFile manifestFile,
+ IndexFileHandler indexFileHandler,
+ Snapshot snapshot) {
List<String> files = new ArrayList<>();
addManifestList(files, snapshot);
try {
// try to read manifests
List<ManifestFileMeta> manifestFileMetas =
- retryReadingFiles(() ->
readAllManifestsWithIOException(snapshot));
+ retryReadingFiles(
+ () ->
readAllManifestsWithIOException(manifestList, snapshot));
if (manifestFileMetas == null) {
return Collections.emptyList();
}
@@ -324,7 +353,7 @@ public class OrphanFilesClean {
files.addAll(manifestFileName);
// try to read data files
- List<String> dataFiles = retryReadingDataFiles(manifestFileName);
+ List<String> dataFiles = retryReadingDataFiles(manifestFile,
manifestFileName);
if (dataFiles == null) {
return Collections.emptyList();
}
@@ -396,8 +425,8 @@ public class OrphanFilesClean {
throw caught;
}
- private List<ManifestFileMeta> readAllManifestsWithIOException(Snapshot
snapshot)
- throws IOException {
+ private List<ManifestFileMeta> readAllManifestsWithIOException(
+ ManifestList manifestList, Snapshot snapshot) throws IOException {
List<ManifestFileMeta> result = new ArrayList<>();
result.addAll(manifestList.readWithIOException(snapshot.baseManifestList()));
@@ -412,7 +441,8 @@ public class OrphanFilesClean {
}
@Nullable
- private List<String> retryReadingDataFiles(List<String> manifestNames)
throws IOException {
+ private List<String> retryReadingDataFiles(
+ ManifestFile manifestFile, List<String> manifestNames) throws
IOException {
List<String> dataFiles = new ArrayList<>();
for (String manifestName : manifestNames) {
List<ManifestEntry> manifestEntries =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index f412d4e5e..6842f0142 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -235,6 +235,12 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
return wrapped.newLocalTableQuery();
}
+ @Override
+ public FileStoreTable switchToBranch(String branchName) {
+ return new PrivilegedFileStoreTable(
+ wrapped.switchToBranch(branchName), privilegeChecker,
identifier);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 95b9d8dcf..a3b1735dd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -58,6 +58,7 @@ import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanne
import
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
@@ -270,15 +271,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
CoreOptions.setDefaultValues(newOptions);
// copy a new table schema to contain dynamic options
- TableSchema newTableSchema = tableSchema;
- if (newOptions.contains(CoreOptions.BRANCH)) {
- newTableSchema =
- schemaManager()
- .copyWithBranch(new
CoreOptions(newOptions).branch())
- .latest()
- .get();
- }
- newTableSchema = newTableSchema.copy(newOptions.toMap());
+ TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
if (tryTimeTravel) {
// see if merged options contain time travel option
@@ -624,6 +617,21 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
return new BranchManager(fileIO, path, snapshotManager(),
tagManager(), schemaManager());
}
+ @Override
+ public FileStoreTable switchToBranch(String branchName) {
+ Optional<TableSchema> optionalSchema =
+ new SchemaManager(fileIO(), location(), branchName).latest();
+ Preconditions.checkArgument(
+ optionalSchema.isPresent(), "Branch " + branchName + " does
not exist");
+
+ TableSchema branchSchema = optionalSchema.get();
+ Options branchOptions = new Options(branchSchema.options());
+ branchOptions.set(CoreOptions.BRANCH, branchName);
+ branchSchema = branchSchema.copy(branchOptions.toMap());
+ return FileStoreTableFactory.create(
+ fileIO(), location(), branchSchema, new Options(),
catalogEnvironment());
+ }
+
private RollbackHelper rollbackHelper() {
return new RollbackHelper(
snapshotManager(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index 3c56b4b3b..e330db0e0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -43,6 +43,12 @@ public interface DataTable extends InnerTable {
BranchManager branchManager();
+ /**
+ * Get {@link DataTable} with branch identified by {@code branchName}.
Note that this method
+ * does not keep dynamic options in current table.
+ */
+ DataTable switchToBranch(String branchName);
+
Path location();
FileIO fileIO();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 0cd991b7f..f7238f033 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -147,9 +147,20 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
wrapped.copyWithLatestSchema(),
fallback.copyWithLatestSchema());
}
+ @Override
+ public FileStoreTable switchToBranch(String branchName) {
+ return new
FallbackReadFileStoreTable(wrapped.switchToBranch(branchName), fallback);
+ }
+
private Map<String, String> rewriteFallbackOptions(Map<String, String>
options) {
Map<String, String> result = new HashMap<>(options);
+ // branch of fallback table should never change
+ String branchKey = CoreOptions.BRANCH.key();
+ if (options.containsKey(branchKey)) {
+ result.put(branchKey, fallback.options().get(branchKey));
+ }
+
// snapshot ids may be different between the main branch and the
fallback branch,
// so we need to convert main branch snapshot id to millisecond,
// then convert millisecond to fallback branch snapshot id
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index ed1ba1da5..3bd337294 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -111,4 +111,11 @@ public interface FileStoreTable extends DataTable {
boolean supportStreamingReadOverwrite();
RowKeyExtractor createRowKeyExtractor();
+
+ /**
+ * Get {@link DataTable} with branch identified by {@code branchName}.
Note that this method
+ * does not keep dynamic options in current table.
+ */
+ @Override
+ FileStoreTable switchToBranch(String branchName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 58449c9d7..d0753259f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -91,7 +91,7 @@ public class FileStoreTableFactory {
Options options = new Options(table.options());
String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
- Options branchOptions = new Options();
+ Options branchOptions = new Options(dynamicOptions.toMap());
branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
FileStoreTable fallbackTable =
createWithoutFallbackBranch(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 7192c3630..559649331 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -174,6 +174,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return wrapped.branchManager();
}
+ @Override
+ public DataTable switchToBranch(String branchName) {
+ return new AuditLogTable(wrapped.switchToBranch(branchName));
+ }
+
@Override
public InnerTableRead newRead() {
return new AuditLogRead(wrapped.newRead());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 7f1e8fb0c..7b67f00bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -129,6 +129,11 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
return wrapped.branchManager();
}
+ @Override
+ public DataTable switchToBranch(String branchName) {
+ return new BucketsTable(wrapped.switchToBranch(branchName),
isContinuous, databaseName);
+ }
+
@Override
public String name() {
return "__internal_buckets_" + wrapped.location().getName();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index 8eb97130e..bd4efc80e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -116,6 +116,11 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
return wrapped.branchManager();
}
+ @Override
+ public DataTable switchToBranch(String branchName) {
+ return new FileMonitorTable(wrapped.switchToBranch(branchName));
+ }
+
@Override
public String name() {
return "__internal_file_monitor_" + wrapped.location().getName();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 6be556f0b..e7b3bd171 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -145,6 +145,11 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
return wrapped.branchManager();
}
+ @Override
+ public DataTable switchToBranch(String branchName) {
+ return new ReadOptimizedTable(wrapped.switchToBranch(branchName));
+ }
+
@Override
public InnerTableRead newRead() {
return wrapped.newRead();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 0e905cd68..5a439a8ec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -70,16 +69,6 @@ public class BranchManager {
return new Path(tablePath + "/branch");
}
- /** Return the root Directory of branch by given tablePath. */
- public static Path branchDirectory(Path tablePath) {
- return new Path(tablePath + "/branch");
- }
-
- public static List<String> branchNames(FileIO fileIO, Path tablePath)
throws IOException {
- return listOriginalVersionedFiles(fileIO, branchDirectory(tablePath),
BRANCH_PREFIX)
- .collect(Collectors.toList());
- }
-
public static boolean isMainBranch(String branch) {
return branch.equals(DEFAULT_MAIN_BRANCH);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index c5fdb042e..9cce9233f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -50,7 +50,6 @@ import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
-import static org.apache.paimon.utils.BranchManager.branchNames;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -117,10 +116,6 @@ public class SnapshotManager implements Serializable {
return new Path(branchPath(tablePath, branch) + "/snapshot");
}
- public static Path snapshotDirectory(Path tablePath, String branch) {
- return new Path(branchPath(tablePath, branch) + "/snapshot");
- }
-
public Snapshot snapshot(long snapshotId) {
Path snapshotPath = snapshotPath(snapshotId);
return Snapshot.fromPath(fileIO, snapshotPath);
@@ -405,25 +400,11 @@ public class SnapshotManager implements Serializable {
* be deleted by other processes, so just skip this snapshot.
*/
public List<Snapshot> safelyGetAllSnapshots() throws IOException {
- // For main branch
List<Path> paths =
listVersionedFiles(fileIO, snapshotDirectory(),
SNAPSHOT_PREFIX)
.map(id -> snapshotPath(id))
.collect(Collectors.toList());
- // For other branch
- List<String> allBranchNames = branchNames(fileIO, tablePath);
- for (String branchName : allBranchNames) {
- List<Path> branchPaths =
- listVersionedFiles(
- fileIO,
- snapshotDirectory(tablePath, branchName),
- SNAPSHOT_PREFIX)
- .map(this::snapshotPath)
- .collect(Collectors.toList());
- paths.addAll(branchPaths);
- }
-
List<Snapshot> snapshots = new ArrayList<>();
for (Path path : paths) {
Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
@@ -457,23 +438,8 @@ public class SnapshotManager implements Serializable {
* Try to get non snapshot files. If any error occurred, just ignore it
and return an empty
* result.
*/
- public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter)
- throws IOException {
- // For main branch
- List<Path> nonSnapshotFiles =
- listPathWithFilter(snapshotDirectory(), fileStatusFilter,
nonSnapshotFileFilter());
-
- // For other branch
- List<String> allBranchNames = branchNames(fileIO, tablePath);
- allBranchNames.stream()
- .map(
- branchName ->
- listPathWithFilter(
- snapshotDirectory(tablePath,
branchName),
- fileStatusFilter,
- nonSnapshotFileFilter()))
- .forEach(nonSnapshotFiles::addAll);
- return nonSnapshotFiles;
+ public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter) {
+ return listPathWithFilter(snapshotDirectory(), fileStatusFilter,
nonSnapshotFileFilter());
}
public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus>
fileStatusFilter) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index ff67dbc27..7e912a839 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -361,6 +361,9 @@ public class OrphanFilesCleanTest {
SnapshotManager snapshotManager = table.snapshotManager();
writeData(snapshotManager, committedData, snapshotData, changelogData,
commitTimes);
+ // create empty branch with same schema
+ table.createBranch("branch1");
+
// generate non used files
int shouldBeDeleted = generateUnUsedFile();
assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 9e7b6a7c3..653e97cc6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -18,10 +18,19 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -37,6 +46,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -191,4 +201,54 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
Row.of(orphanFile21.toUri().getPath()),
Row.of(orphanFile22.toUri().getPath()));
}
+
+ @Test
+ public void testCleanWithBranch() throws Exception {
+ // create main branch
+ FileStoreTable table = createTableAndWriteData(tableName);
+ Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
+ Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
+
+ // create first branch and write some data
+ table.createBranch("br");
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location(), "br");
+ TableSchema branchSchema =
+ schemaManager.commitChanges(SchemaChange.addColumn("v2",
DataTypes.INT()));
+ Options branchOptions = new Options(branchSchema.options());
+ branchOptions.set(CoreOptions.BRANCH, "br");
+ branchSchema = branchSchema.copy(branchOptions.toMap());
+ FileStoreTable branchTable =
+ FileStoreTableFactory.create(table.fileIO(), table.location(),
branchSchema);
+
+ String commitUser = UUID.randomUUID().toString();
+ StreamTableWrite write = branchTable.newWrite(commitUser);
+ StreamTableCommit commit = branchTable.newCommit(commitUser);
+ write.write(GenericRow.of(2L, BinaryString.fromString("Hello"), 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.close();
+ commit.close();
+
+ // create orphan file in snapshot directory of first branch
+ Path orphanFile3 = new Path(table.location(),
"branch/branch-br/snapshot/orphan_file3");
+ branchTable.fileIO().writeFile(orphanFile3, "x", true);
+
+ // create second branch, which is empty
+ table.createBranch("br2");
+
+ // create orphan file in snapshot directory of second branch
+ Path orphanFile4 = new Path(table.location(),
"branch/branch-br2/snapshot/orphan_file4");
+ branchTable.fileIO().writeFile(orphanFile4, "y", true);
+
+ String procedure =
+ String.format(
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59')",
+ database, "*");
+ ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(callProcedure(procedure));
+ assertThat(actualDeleteFile)
+ .containsExactlyInAnyOrder(
+ Row.of(orphanFile1.toUri().getPath()),
+ Row.of(orphanFile2.toUri().getPath()),
+ Row.of(orphanFile3.toUri().getPath()),
+ Row.of(orphanFile4.toUri().getPath()));
+ }
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 36a11cdc7..58289b179 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -41,7 +41,6 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -346,9 +345,7 @@ public class HiveCatalog extends AbstractCatalog {
continue;
}
- FileStoreTable table =
- FileStoreTableFactory.create(
- mainTable.fileIO(), mainTable.location(),
branchSchema.get());
+ FileStoreTable table = mainTable.switchToBranch(branchName);
if
(!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty())
{
return true;
}