This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4573ae489c [hotfix] Move tryGetNonSnapshotFiles out SnapshotManager
4573ae489c is described below
commit 4573ae489c31a2e05821290828062757702e1dc4
Author: JingsongLi <[email protected]>
AuthorDate: Fri Feb 28 15:55:39 2025 +0800
[hotfix] Move tryGetNonSnapshotFiles out SnapshotManager
---
.../apache/paimon/operation/OrphanFilesClean.java | 59 +++++++++++++++++++++-
.../org/apache/paimon/utils/SnapshotManager.java | 54 +-------------------
2 files changed, 59 insertions(+), 54 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index e7778319c8..8be994414b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -61,6 +61,10 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
+import static org.apache.paimon.utils.SnapshotManager.CHANGELOG_PREFIX;
+import static org.apache.paimon.utils.SnapshotManager.EARLIEST;
+import static org.apache.paimon.utils.SnapshotManager.LATEST;
+import static org.apache.paimon.utils.SnapshotManager.SNAPSHOT_PREFIX;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
/**
@@ -132,7 +136,7 @@ public abstract class OrphanFilesClean implements
Serializable {
// specially handle the snapshot directory
List<Pair<Path, Long>> nonSnapshotFiles =
- snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
+
tryGetNonSnapshotFiles(snapshotManager.snapshotDirectory(), this::oldEnough);
nonSnapshotFiles.forEach(
nonSnapshotFile ->
cleanFile(
@@ -142,7 +146,7 @@ public abstract class OrphanFilesClean implements
Serializable {
// specially handle the changelog directory
List<Pair<Path, Long>> nonChangelogFiles =
- snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
+
tryGetNonChangelogFiles(snapshotManager.changelogDirectory(), this::oldEnough);
nonChangelogFiles.forEach(
nonChangelogFile ->
cleanFile(
@@ -152,6 +156,57 @@ public abstract class OrphanFilesClean implements
Serializable {
}
}
+ private List<Pair<Path, Long>> tryGetNonSnapshotFiles(
+ Path snapshotDirectory, Predicate<FileStatus> fileStatusFilter) {
+ return listPathWithFilter(
+ fileIO, snapshotDirectory, fileStatusFilter,
nonSnapshotFileFilter());
+ }
+
+ private List<Pair<Path, Long>> tryGetNonChangelogFiles(
+ Path changelogDirectory, Predicate<FileStatus> fileStatusFilter) {
+ return listPathWithFilter(
+ fileIO, changelogDirectory, fileStatusFilter,
nonChangelogFileFilter());
+ }
+
+ private static List<Pair<Path, Long>> listPathWithFilter(
+ FileIO fileIO,
+ Path directory,
+ Predicate<FileStatus> fileStatusFilter,
+ Predicate<Path> fileFilter) {
+ try {
+ FileStatus[] statuses = fileIO.listStatus(directory);
+ if (statuses == null) {
+ return Collections.emptyList();
+ }
+
+ return Arrays.stream(statuses)
+ .filter(fileStatusFilter)
+ .filter(status -> fileFilter.test(status.getPath()))
+ .map(status -> Pair.of(status.getPath(), status.getLen()))
+ .collect(Collectors.toList());
+ } catch (IOException ignored) {
+ return Collections.emptyList();
+ }
+ }
+
+ private static Predicate<Path> nonSnapshotFileFilter() {
+ return path -> {
+ String name = path.getName();
+ return !name.startsWith(SNAPSHOT_PREFIX)
+ && !name.equals(EARLIEST)
+ && !name.equals(LATEST);
+ };
+ }
+
+ private static Predicate<Path> nonChangelogFileFilter() {
+ return path -> {
+ String name = path.getName();
+ return !name.startsWith(CHANGELOG_PREFIX)
+ && !name.equals(EARLIEST)
+ && !name.equals(LATEST);
+ };
+ }
+
private void cleanFile(
Pair<Path, Long> deleteFileInfo,
Consumer<Path> deletedFilesConsumer,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 5147a19838..fc2013f019 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -21,7 +21,6 @@ package org.apache.paimon.utils;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -37,7 +36,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -67,8 +65,8 @@ public class SnapshotManager implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(SnapshotManager.class);
- private static final String SNAPSHOT_PREFIX = "snapshot-";
- private static final String CHANGELOG_PREFIX = "changelog-";
+ public static final String SNAPSHOT_PREFIX = "snapshot-";
+ public static final String CHANGELOG_PREFIX = "changelog-";
public static final String EARLIEST = "EARLIEST";
public static final String LATEST = "LATEST";
private static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3;
@@ -695,54 +693,6 @@ public class SnapshotManager implements Serializable {
}
}
- /**
- * Try to get non snapshot files. If any error occurred, just ignore it
and return an empty
- * result.
- */
- public List<Pair<Path, Long>> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter) {
- return listPathWithFilter(snapshotDirectory(), fileStatusFilter,
nonSnapshotFileFilter());
- }
-
- public List<Pair<Path, Long>>
tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
- return listPathWithFilter(changelogDirectory(), fileStatusFilter,
nonChangelogFileFilter());
- }
-
- private List<Pair<Path, Long>> listPathWithFilter(
- Path directory, Predicate<FileStatus> fileStatusFilter,
Predicate<Path> fileFilter) {
- try {
- FileStatus[] statuses = fileIO.listStatus(directory);
- if (statuses == null) {
- return Collections.emptyList();
- }
-
- return Arrays.stream(statuses)
- .filter(fileStatusFilter)
- .filter(status -> fileFilter.test(status.getPath()))
- .map(status -> Pair.of(status.getPath(), status.getLen()))
- .collect(Collectors.toList());
- } catch (IOException ignored) {
- return Collections.emptyList();
- }
- }
-
- private Predicate<Path> nonSnapshotFileFilter() {
- return path -> {
- String name = path.getName();
- return !name.startsWith(SNAPSHOT_PREFIX)
- && !name.equals(EARLIEST)
- && !name.equals(LATEST);
- };
- }
-
- private Predicate<Path> nonChangelogFileFilter() {
- return path -> {
- String name = path.getName();
- return !name.startsWith(CHANGELOG_PREFIX)
- && !name.equals(EARLIEST)
- && !name.equals(LATEST);
- };
- }
-
public Optional<Snapshot> latestSnapshotOfUser(String user) {
Long latestId = latestSnapshotId();
if (latestId == null) {