This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4154c2bba [core] Clarify the purpose of method findOverlappedSnapshots
and move to snapshotManager (#3808)
4154c2bba is described below
commit 4154c2bbaa4ada107ce7acc4cf635b524ed62f92
Author: HunterXHunter <[email protected]>
AuthorDate: Thu Aug 1 13:56:25 2024 +0800
[core] Clarify the purpose of method findOverlappedSnapshots and move to
snapshotManager (#3808)
---
.../apache/paimon/operation/FileDeletionBase.java | 29 ++++++++---------
.../apache/paimon/table/ExpireChangelogImpl.java | 7 +++--
.../apache/paimon/table/ExpireSnapshotsImpl.java | 8 ++---
.../org/apache/paimon/utils/SnapshotManager.java | 36 ++++++++++++++++++++++
.../java/org/apache/paimon/utils/TagManager.java | 32 -------------------
5 files changed, 59 insertions(+), 53 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 7c8d6656b..978735bab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -35,7 +35,7 @@ import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.FileDeletionThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.TagManager;
+import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,11 +77,12 @@ public abstract class FileDeletionBase<T extends Snapshot> {
protected boolean changelogDecoupled;
- /** Used to record which tag is cached. */
- private long cachedTag = 0;
+ /** Used to record which snapshot is cached. */
+ private long cachedSnapshotId = 0;
- /** Used to cache data files used by current tag. */
- private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles
= new HashMap<>();
+ /** Used to cache data files used by current snapshot. */
+ private final Map<BinaryRow, Map<Integer, Set<String>>>
cachedSnapshotDataFiles =
+ new HashMap<>();
public FileDeletionBase(
FileIO fileIO,
@@ -328,17 +329,17 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
}
public Predicate<ManifestEntry> dataFileSkipper(
- List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws
Exception {
- int index = TagManager.findPreviousTag(taggedSnapshots,
expiringSnapshotId);
- // refresh tag data files
+ List<Snapshot> skippingSnapshots, long expiringSnapshotId) throws
Exception {
+ int index = SnapshotManager.findPreviousSnapshot(skippingSnapshots,
expiringSnapshotId);
+ // refresh snapshot data files
if (index >= 0) {
- Snapshot previousTag = taggedSnapshots.get(index);
- if (previousTag.id() != cachedTag) {
- cachedTag = previousTag.id();
- cachedTagDataFiles.clear();
- addMergedDataFiles(cachedTagDataFiles, previousTag);
+ Snapshot previousSnapshot = skippingSnapshots.get(index);
+ if (previousSnapshot.id() != cachedSnapshotId) {
+ cachedSnapshotId = previousSnapshot.id();
+ cachedSnapshotDataFiles.clear();
+ addMergedDataFiles(cachedSnapshotDataFiles, previousSnapshot);
}
- return entry -> containsDataFile(cachedTagDataFiles, entry);
+ return entry -> containsDataFile(cachedSnapshotDataFiles, entry);
}
return entry -> false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index 759088a06..72f655bba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -134,10 +134,11 @@ public class ExpireChangelogImpl implements
ExpireSnapshots {
LOG.debug("Changelog expire range is [" + earliestId + ", " +
endExclusiveId + ")");
}
- List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
+ List<Snapshot> referencedSnapshots = tagManager.taggedSnapshots();
List<Snapshot> skippingSnapshots =
- TagManager.findOverlappedSnapshots(taggedSnapshots,
earliestId, endExclusiveId);
+ SnapshotManager.findOverlappedSnapshots(
+ referencedSnapshots, earliestId, endExclusiveId);
skippingSnapshots.add(snapshotManager.changelog(endExclusiveId));
Set<String> manifestSkippSet =
changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestId; id < endExclusiveId; id++) {
@@ -147,7 +148,7 @@ public class ExpireChangelogImpl implements ExpireSnapshots
{
Changelog changelog = snapshotManager.longLivedChangelog(id);
Predicate<ManifestEntry> skipper;
try {
- skipper = changelogDeletion.dataFileSkipper(taggedSnapshots,
id);
+ skipper =
changelogDeletion.dataFileSkipper(referencedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 170047297..c49db6498 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -153,7 +153,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
"Snapshot expire range is [" + beginInclusiveId + ", " +
endExclusiveId + ")");
}
- List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
+ List<Snapshot> referencedSnapshots = tagManager.taggedSnapshots();
// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next
snapshot, so the range of
@@ -166,7 +166,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
// expire merge tree files and collect changed buckets
Predicate<ManifestEntry> skipper;
try {
- skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots,
id);
+ skipper =
snapshotDeletion.dataFileSkipper(referencedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
@@ -198,8 +198,8 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
// delete manifests and indexFiles
List<Snapshot> skippingSnapshots =
- TagManager.findOverlappedSnapshots(
- taggedSnapshots, beginInclusiveId, endExclusiveId);
+ SnapshotManager.findOverlappedSnapshots(
+ referencedSnapshots, beginInclusiveId, endExclusiveId);
skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
Set<String> skippingSet =
snapshotDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 48627957c..af83fab6a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -660,6 +660,42 @@ public class SnapshotManager implements Serializable {
return listVersionedFiles(fileIO, dir,
prefix).reduce(reducer).orElse(null);
}
+ /**
+ * Find the overlapping snapshots between sortedSnapshots and range of
[beginInclusive,
+ * endExclusive).
+ */
+ public static List<Snapshot> findOverlappedSnapshots(
+ List<Snapshot> sortedSnapshots, long beginInclusive, long
endExclusive) {
+ List<Snapshot> overlappedSnapshots = new ArrayList<>();
+ int right = findPreviousSnapshot(sortedSnapshots, endExclusive);
+ if (right >= 0) {
+ int left = Math.max(findPreviousOrEqualSnapshot(sortedSnapshots,
beginInclusive), 0);
+ for (int i = left; i <= right; i++) {
+ overlappedSnapshots.add(sortedSnapshots.get(i));
+ }
+ }
+ return overlappedSnapshots;
+ }
+
+ public static int findPreviousSnapshot(List<Snapshot> sortedSnapshots,
long targetSnapshotId) {
+ for (int i = sortedSnapshots.size() - 1; i >= 0; i--) {
+ if (sortedSnapshots.get(i).id() < targetSnapshotId) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private static int findPreviousOrEqualSnapshot(
+ List<Snapshot> sortedSnapshots, long targetSnapshotId) {
+ for (int i = sortedSnapshots.size() - 1; i >= 0; i--) {
+ if (sortedSnapshots.get(i).id() <= targetSnapshotId) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
public void deleteLatestHint() throws IOException {
Path snapshotDir = snapshotDirectory();
Path hintFile = new Path(snapshotDir, LATEST);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 65c6c232d..259a5bdbc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -366,36 +366,4 @@ public class TagManager {
"Didn't find tag with snapshot id '%s'.This is
unexpected.",
taggedSnapshot.id()));
}
-
- public static List<Snapshot> findOverlappedSnapshots(
- List<Snapshot> taggedSnapshots, long beginInclusive, long
endExclusive) {
- List<Snapshot> snapshots = new ArrayList<>();
- int right = findPreviousTag(taggedSnapshots, endExclusive);
- if (right >= 0) {
- int left = Math.max(findPreviousOrEqualTag(taggedSnapshots,
beginInclusive), 0);
- for (int i = left; i <= right; i++) {
- snapshots.add(taggedSnapshots.get(i));
- }
- }
- return snapshots;
- }
-
- public static int findPreviousTag(List<Snapshot> taggedSnapshots, long
targetSnapshotId) {
- for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
- if (taggedSnapshots.get(i).id() < targetSnapshotId) {
- return i;
- }
- }
- return -1;
- }
-
- private static int findPreviousOrEqualTag(
- List<Snapshot> taggedSnapshots, long targetSnapshotId) {
- for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
- if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
- return i;
- }
- }
- return -1;
- }
}