This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6772bcdb05 [core] Optimize snapshots collect in ExpireSnapshots (#7397)
6772bcdb05 is described below

commit 6772bcdb0549d7a4d95f8108aa0e6eb8462316f8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 11 10:35:02 2026 +0800

    [core] Optimize snapshots collect in ExpireSnapshots (#7397)
---
 .../apache/paimon/operation/FileDeletionBase.java  | 10 ++-
 .../apache/paimon/table/ExpireSnapshotsImpl.java   | 95 +++++++++++++++-------
 2 files changed, 73 insertions(+), 32 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 98a761b471..231636355a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -75,7 +75,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
     private final boolean cleanEmptyDirectories;
     protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
 
-    private final Executor deleteFileExecutor;
+    private final Executor fileExecutor;
 
     protected boolean changelogDecoupled;
 
@@ -102,7 +102,11 @@ public abstract class FileDeletionBase<T extends Snapshot> 
{
         this.statsFileHandler = statsFileHandler;
         this.cleanEmptyDirectories = cleanEmptyDirectories;
         this.deletionBuckets = new HashMap<>();
-        this.deleteFileExecutor = 
FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
+        this.fileExecutor = 
FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
+    }
+
+    public Executor fileExecutor() {
+        return fileExecutor;
     }
 
     /**
@@ -461,7 +465,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
         List<CompletableFuture<Void>> deletionFutures = new 
ArrayList<>(files.size());
         for (F file : files) {
             deletionFutures.add(
-                    CompletableFuture.runAsync(() -> deletion.accept(file), 
deleteFileExecutor));
+                    CompletableFuture.runAsync(() -> deletion.accept(file), 
fileExecutor));
         }
 
         try {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 01c7e74966..8cef1673a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -39,7 +39,12 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.function.Predicate;
 
 import static 
org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot;
@@ -54,6 +59,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
     private final ChangelogManager changelogManager;
     private final ConsumerManager consumerManager;
     private final SnapshotDeletion snapshotDeletion;
+    private final Executor fileExecutor;
     private final TagManager tagManager;
 
     private ExpireConfig expireConfig;
@@ -73,6 +79,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
         this.snapshotDeletion = snapshotDeletion;
         this.tagManager = tagManager;
         this.expireConfig = ExpireConfig.builder().build();
+        this.fileExecutor = snapshotDeletion.fileExecutor();
     }
 
     @Override
@@ -127,9 +134,14 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
         for (long id = min; id < maxExclusive; id++) {
             // Early exit the loop for 'snapshot.time-retained'
             // (the maximum time of snapshots to retain)
-            if (snapshotManager.snapshotExists(id)
-                    && olderThanMills <= 
snapshotManager.snapshot(id).timeMillis()) {
-                return expireUntil(earliest, id);
+            try {
+                Snapshot snapshot = snapshotManager.tryGetSnapshot(id);
+                if (olderThanMills <= snapshot.timeMillis()) {
+                    return expireUntil(earliest, id);
+                }
+            } catch (FileNotFoundException e) {
+                // ignore
+                // snapshot may have been deleted by another process
             }
         }
 
@@ -138,6 +150,15 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
 
     @VisibleForTesting
     public int expireUntil(long earliestId, long endExclusiveId) {
+        try {
+            return innerExpireUntil(earliestId, endExclusiveId);
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private int innerExpireUntil(long earliestId, long endExclusiveId)
+            throws ExecutionException, InterruptedException {
         long startTime = System.currentTimeMillis();
 
         if (endExclusiveId <= earliestId) {
@@ -152,10 +173,13 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
             return 0;
         }
 
+        // collect all snapshots
+        Map<Long, Snapshot> snapshots = collectSnapshots(earliestId, 
endExclusiveId);
+
         // find first snapshot to expire
         long beginInclusiveId = earliestId;
         for (long id = endExclusiveId - 1; id >= earliestId; id--) {
-            if (!snapshotManager.snapshotExists(id)) {
+            if (!snapshots.containsKey(id)) {
                 // only latest snapshots are retained, as we cannot find this 
snapshot, we can
                 // assume that all snapshots preceding it have been removed
                 beginInclusiveId = id + 1;
@@ -170,12 +194,10 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
         // id should be (beginInclusiveId, endExclusiveId]
         for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Ready to delete merge tree files not used by 
snapshot #" + id);
+                LOG.debug("Ready to delete merge tree files not used by 
snapshot #{}", id);
             }
-            Snapshot snapshot;
-            try {
-                snapshot = snapshotManager.tryGetSnapshot(id);
-            } catch (FileNotFoundException e) {
+            Snapshot snapshot = snapshots.get(id);
+            if (snapshot == null) {
                 beginInclusiveId = id + 1;
                 continue;
             }
@@ -185,9 +207,8 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots 
{
                 skipper = 
snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
             } catch (Exception e) {
                 LOG.info(
-                        String.format(
-                                "Skip cleaning data files of snapshot '%s' due 
to failed to build skipping set.",
-                                id),
+                        "Skip cleaning data files of snapshot '{}' due to 
failed to build skipping set.",
+                        id,
                         e);
                 continue;
             }
@@ -199,12 +220,10 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
         if (!expireConfig.isChangelogDecoupled()) {
             for (long id = beginInclusiveId; id < endExclusiveId; id++) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Ready to delete changelog files from snapshot 
#" + id);
+                    LOG.debug("Ready to delete changelog files from snapshot 
#{}", id);
                 }
-                Snapshot snapshot;
-                try {
-                    snapshot = snapshotManager.tryGetSnapshot(id);
-                } catch (FileNotFoundException e) {
+                Snapshot snapshot = snapshots.get(id);
+                if (snapshot == null) {
                     beginInclusiveId = id + 1;
                     continue;
                 }
@@ -222,13 +241,13 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
         List<Snapshot> skippingSnapshots =
                 findSkippingTags(taggedSnapshots, beginInclusiveId, 
endExclusiveId);
 
-        try {
-            
skippingSnapshots.add(snapshotManager.tryGetSnapshot(endExclusiveId));
-        } catch (FileNotFoundException e) {
+        Snapshot endExclusiveSnapshot = snapshots.get(endExclusiveId);
+        if (endExclusiveSnapshot == null) {
             // the end exclusive snapshot is gone
             // there is no need to proceed
             return 0;
         }
+        skippingSnapshots.add(endExclusiveSnapshot);
 
         Set<String> skippingSet = null;
         try {
@@ -239,13 +258,11 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
         if (skippingSet != null) {
             for (long id = beginInclusiveId; id < endExclusiveId; id++) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Ready to delete manifests in snapshot #" + id);
+                    LOG.debug("Ready to delete manifests in snapshot #{}", id);
                 }
 
-                Snapshot snapshot;
-                try {
-                    snapshot = snapshotManager.tryGetSnapshot(id);
-                } catch (FileNotFoundException e) {
+                Snapshot snapshot = snapshots.get(id);
+                if (snapshot == null) {
                     beginInclusiveId = id + 1;
                     continue;
                 }
@@ -255,10 +272,8 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
 
         // delete snapshot file finally
         for (long id = beginInclusiveId; id < endExclusiveId; id++) {
-            Snapshot snapshot;
-            try {
-                snapshot = snapshotManager.tryGetSnapshot(id);
-            } catch (FileNotFoundException e) {
+            Snapshot snapshot = snapshots.get(id);
+            if (snapshot == null) {
                 beginInclusiveId = id + 1;
                 continue;
             }
@@ -278,6 +293,28 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
         return (int) (endExclusiveId - beginInclusiveId);
     }
 
+    private Map<Long, Snapshot> collectSnapshots(long earliestId, long 
endExclusiveId)
+            throws InterruptedException, ExecutionException {
+        Map<Long, Snapshot> snapshots = new ConcurrentHashMap<>();
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (long id = earliestId; id <= endExclusiveId; id++) {
+            long snapshotId = id;
+            CompletableFuture<Void> future =
+                    CompletableFuture.runAsync(
+                            () -> {
+                                try {
+                                    Snapshot snapshot = 
snapshotManager.tryGetSnapshot(snapshotId);
+                                    snapshots.put(snapshotId, snapshot);
+                                } catch (FileNotFoundException ignored) {
+                                }
+                            },
+                            fileExecutor);
+            futures.add(future);
+        }
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).get();
+        return snapshots;
+    }
+
     private void commitChangelog(Changelog changelog) {
         try {
             changelogManager.commitChangelog(changelog, changelog.id());

Reply via email to