This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6772bcdb05 [core] Optimize snapshots collect in ExpireSnapshots (#7397)
6772bcdb05 is described below
commit 6772bcdb0549d7a4d95f8108aa0e6eb8462316f8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 11 10:35:02 2026 +0800
[core] Optimize snapshots collect in ExpireSnapshots (#7397)
---
.../apache/paimon/operation/FileDeletionBase.java | 10 ++-
.../apache/paimon/table/ExpireSnapshotsImpl.java | 95 +++++++++++++++-------
2 files changed, 73 insertions(+), 32 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 98a761b471..231636355a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -75,7 +75,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
private final boolean cleanEmptyDirectories;
protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
- private final Executor deleteFileExecutor;
+ private final Executor fileExecutor;
protected boolean changelogDecoupled;
@@ -102,7 +102,11 @@ public abstract class FileDeletionBase<T extends Snapshot>
{
this.statsFileHandler = statsFileHandler;
this.cleanEmptyDirectories = cleanEmptyDirectories;
this.deletionBuckets = new HashMap<>();
- this.deleteFileExecutor =
FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
+ this.fileExecutor =
FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
+ }
+
+ public Executor fileExecutor() {
+ return fileExecutor;
}
/**
@@ -461,7 +465,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
List<CompletableFuture<Void>> deletionFutures = new
ArrayList<>(files.size());
for (F file : files) {
deletionFutures.add(
- CompletableFuture.runAsync(() -> deletion.accept(file),
deleteFileExecutor));
+ CompletableFuture.runAsync(() -> deletion.accept(file),
fileExecutor));
}
try {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 01c7e74966..8cef1673a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -39,7 +39,12 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.function.Predicate;
import static
org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot;
@@ -54,6 +59,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
private final ChangelogManager changelogManager;
private final ConsumerManager consumerManager;
private final SnapshotDeletion snapshotDeletion;
+ private final Executor fileExecutor;
private final TagManager tagManager;
private ExpireConfig expireConfig;
@@ -73,6 +79,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
this.snapshotDeletion = snapshotDeletion;
this.tagManager = tagManager;
this.expireConfig = ExpireConfig.builder().build();
+ this.fileExecutor = snapshotDeletion.fileExecutor();
}
@Override
@@ -127,9 +134,14 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
for (long id = min; id < maxExclusive; id++) {
// Early exit the loop for 'snapshot.time-retained'
// (the maximum time of snapshots to retain)
- if (snapshotManager.snapshotExists(id)
- && olderThanMills <=
snapshotManager.snapshot(id).timeMillis()) {
- return expireUntil(earliest, id);
+ try {
+ Snapshot snapshot = snapshotManager.tryGetSnapshot(id);
+ if (olderThanMills <= snapshot.timeMillis()) {
+ return expireUntil(earliest, id);
+ }
+ } catch (FileNotFoundException e) {
+ // ignore
+ // snapshot may have been deleted by another process
}
}
@@ -138,6 +150,15 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
@VisibleForTesting
public int expireUntil(long earliestId, long endExclusiveId) {
+ try {
+ return innerExpireUntil(earliestId, endExclusiveId);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private int innerExpireUntil(long earliestId, long endExclusiveId)
+ throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
if (endExclusiveId <= earliestId) {
@@ -152,10 +173,13 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
return 0;
}
+ // collect all snapshots
+ Map<Long, Snapshot> snapshots = collectSnapshots(earliestId,
endExclusiveId);
+
// find first snapshot to expire
long beginInclusiveId = earliestId;
for (long id = endExclusiveId - 1; id >= earliestId; id--) {
- if (!snapshotManager.snapshotExists(id)) {
+ if (!snapshots.containsKey(id)) {
// only latest snapshots are retained, as we cannot find this
snapshot, we can
// assume that all snapshots preceding it have been removed
beginInclusiveId = id + 1;
@@ -170,12 +194,10 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
// id should be (beginInclusiveId, endExclusiveId]
for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Ready to delete merge tree files not used by
snapshot #" + id);
+ LOG.debug("Ready to delete merge tree files not used by
snapshot #{}", id);
}
- Snapshot snapshot;
- try {
- snapshot = snapshotManager.tryGetSnapshot(id);
- } catch (FileNotFoundException e) {
+ Snapshot snapshot = snapshots.get(id);
+ if (snapshot == null) {
beginInclusiveId = id + 1;
continue;
}
@@ -185,9 +207,8 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
skipper =
snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
- String.format(
- "Skip cleaning data files of snapshot '%s' due
to failed to build skipping set.",
- id),
+ "Skip cleaning data files of snapshot '{}' due to
failed to build skipping set.",
+ id,
e);
continue;
}
@@ -199,12 +220,10 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
if (!expireConfig.isChangelogDecoupled()) {
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Ready to delete changelog files from snapshot
#" + id);
+ LOG.debug("Ready to delete changelog files from snapshot
#{}", id);
}
- Snapshot snapshot;
- try {
- snapshot = snapshotManager.tryGetSnapshot(id);
- } catch (FileNotFoundException e) {
+ Snapshot snapshot = snapshots.get(id);
+ if (snapshot == null) {
beginInclusiveId = id + 1;
continue;
}
@@ -222,13 +241,13 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
List<Snapshot> skippingSnapshots =
findSkippingTags(taggedSnapshots, beginInclusiveId,
endExclusiveId);
- try {
-
skippingSnapshots.add(snapshotManager.tryGetSnapshot(endExclusiveId));
- } catch (FileNotFoundException e) {
+ Snapshot endExclusiveSnapshot = snapshots.get(endExclusiveId);
+ if (endExclusiveSnapshot == null) {
// the end exclusive snapshot is gone
// there is no need to proceed
return 0;
}
+ skippingSnapshots.add(endExclusiveSnapshot);
Set<String> skippingSet = null;
try {
@@ -239,13 +258,11 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
if (skippingSet != null) {
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Ready to delete manifests in snapshot #" + id);
+ LOG.debug("Ready to delete manifests in snapshot #{}", id);
}
- Snapshot snapshot;
- try {
- snapshot = snapshotManager.tryGetSnapshot(id);
- } catch (FileNotFoundException e) {
+ Snapshot snapshot = snapshots.get(id);
+ if (snapshot == null) {
beginInclusiveId = id + 1;
continue;
}
@@ -255,10 +272,8 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
// delete snapshot file finally
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
- Snapshot snapshot;
- try {
- snapshot = snapshotManager.tryGetSnapshot(id);
- } catch (FileNotFoundException e) {
+ Snapshot snapshot = snapshots.get(id);
+ if (snapshot == null) {
beginInclusiveId = id + 1;
continue;
}
@@ -278,6 +293,28 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
return (int) (endExclusiveId - beginInclusiveId);
}
+ private Map<Long, Snapshot> collectSnapshots(long earliestId, long
endExclusiveId)
+ throws InterruptedException, ExecutionException {
+ Map<Long, Snapshot> snapshots = new ConcurrentHashMap<>();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (long id = earliestId; id <= endExclusiveId; id++) {
+ long snapshotId = id;
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ Snapshot snapshot =
snapshotManager.tryGetSnapshot(snapshotId);
+ snapshots.put(snapshotId, snapshot);
+ } catch (FileNotFoundException ignored) {
+ }
+ },
+ fileExecutor);
+ futures.add(future);
+ }
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).get();
+ return snapshots;
+ }
+
private void commitChangelog(Changelog changelog) {
try {
changelogManager.commitChangelog(changelog, changelog.id());