This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d9326c667 [core] supports asynchronous deletion of expired snapshot 
files (#1823)
d9326c667 is described below

commit d9326c6670f2af9af0ccc80106fb22d3ad961d78
Author: liming.1018 <[email protected]>
AuthorDate: Fri Sep 15 11:58:44 2023 +0800

    [core] supports asynchronous deletion of expired snapshot files (#1823)
---
 .../shortcodes/generated/core_configuration.html   |  13 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  49 +++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |   3 +-
 .../apache/paimon/operation/FileDeletionBase.java  |  56 +++++--
 .../paimon/operation/FileStoreExpireImpl.java      |  12 +-
 .../apache/paimon/operation/SnapshotDeletion.java  |  10 +-
 .../org/apache/paimon/operation/TagDeletion.java   |   7 +-
 .../paimon/table/AbstractFileStoreTable.java       |   3 +-
 .../apache/paimon/table/sink/TableCommitImpl.java  |  52 ++++++-
 .../test/java/org/apache/paimon/TestFileStore.java |  24 ++-
 .../paimon/table/FileStoreTableTestBase.java       | 161 +++++++++++++++++++++
 11 files changed, 360 insertions(+), 30 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index cf02d0bb9..6ade8c0b5 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -491,6 +491,19 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Duration</td>
             <td>The maximum time of completed snapshots to retain.</td>
         </tr>
+        <tr>
+            <td><h5>snapshot.expire.execution-mode</h5></td>
+            <td style="word-wrap: break-word;">sync</td>
+            <td>Enum</td>
+            <td>Specifies the execution mode of expire.<br /><br />Possible 
values:<ul><li>"sync": Execute expire synchronously. If there are too many 
files, it may take a long time and block stream processing.</li><li>"async": 
Execute expire asynchronously. If the generation of snapshots is greater than 
the deletion, there will be a backlog of files.</li></ul></td>
+        </tr>
+        </tr>
+        <tr>
+            <td><h5>snapshot.expire.limit</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>The maximum number of snapshots allowed to expire at a 
time.</td>
+        </tr>
         <tr>
             <td><h5>sort-engine</h5></td>
             <td style="word-wrap: break-word;">loser-tree</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 13cf60ec3..7e1f8498b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -194,6 +194,19 @@ public class CoreOptions implements Serializable {
                     .defaultValue(Duration.ofHours(1))
                     .withDescription("The maximum time of completed snapshots 
to retain.");
 
+    public static final ConfigOption<ExpireExecutionMode> 
SNAPSHOT_EXPIRE_EXECUTION_MODE =
+            key("snapshot.expire.execution-mode")
+                    .enumType(ExpireExecutionMode.class)
+                    .defaultValue(ExpireExecutionMode.SYNC)
+                    .withDescription("Specifies the execution mode of 
expire.");
+
+    public static final ConfigOption<Integer> SNAPSHOT_EXPIRE_LIMIT =
+            key("snapshot.expire.limit")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The maximum number of snapshots allowed to expire 
at a time.");
+
     public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
             key("continuous.discovery-interval")
                     .durationType()
@@ -983,6 +996,14 @@ public class CoreOptions implements Serializable {
         return options.get(SNAPSHOT_TIME_RETAINED);
     }
 
+    public ExpireExecutionMode snapshotExpireExecutionMode() {
+        return options.get(SNAPSHOT_EXPIRE_EXECUTION_MODE);
+    }
+
+    public int snapshotExpireLimit() {
+        return options.get(SNAPSHOT_EXPIRE_LIMIT);
+    }
+
     public int manifestMergeMinCount() {
         return options.get(MANIFEST_MERGE_MIN_COUNT);
     }
@@ -1849,4 +1870,32 @@ public class CoreOptions implements Serializable {
             return text(description);
         }
     }
+
+    /** The execution mode for expire. */
+    public enum ExpireExecutionMode implements DescribedEnum {
+        SYNC(
+                "sync",
+                "Execute expire synchronously. If there are too many files, it 
may take a long time and block stream processing."),
+        ASYNC(
+                "async",
+                "Execute expire asynchronously. If the generation of snapshots 
is greater than the deletion, there will be a backlog of files.");
+
+        private final String value;
+        private final String description;
+
+        ExpireExecutionMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 48cb6e231..d78013ffd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -176,7 +176,8 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.snapshotTimeRetain().toMillis(),
                 snapshotManager(),
                 newSnapshotDeletion(),
-                newTagManager());
+                newTagManager(),
+                options.snapshotExpireLimit());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index a9ad4cac2..8de8dfa85 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.FileUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
 
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -47,6 +49,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -64,6 +69,7 @@ public abstract class FileDeletionBase {
     protected final ManifestList manifestList;
     protected final IndexFileHandler indexFileHandler;
     protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
+    protected final Executor ioExecutor;
 
     public FileDeletionBase(
             FileIO fileIO,
@@ -78,6 +84,7 @@ public abstract class FileDeletionBase {
         this.indexFileHandler = indexFileHandler;
 
         this.deletionBuckets = new HashMap<>();
+        this.ioExecutor = FileUtils.COMMON_IO_FORK_JOIN_POOL;
     }
 
     /**
@@ -107,10 +114,12 @@ public abstract class FileDeletionBase {
         // All directory paths are deduplicated and sorted by hierarchy level
         Map<Integer, Set<Path>> deduplicate = new HashMap<>();
         for (Map.Entry<BinaryRow, Set<Integer>> entry : 
deletionBuckets.entrySet()) {
+            List<Path> toDeleteEmptyDirectory = new ArrayList<>();
             // try to delete bucket directories
             for (Integer bucket : entry.getValue()) {
-                tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(), 
bucket));
+                
toDeleteEmptyDirectory.add(pathFactory.bucketPath(entry.getKey(), bucket));
             }
+            deleteFiles(toDeleteEmptyDirectory, this::tryDeleteEmptyDirectory);
 
             List<Path> hierarchicalPaths = 
pathFactory.getHierarchicalPartitionPath(entry.getKey());
             int hierarchies = hierarchicalPaths.size();
@@ -144,31 +153,34 @@ public abstract class FileDeletionBase {
     protected void cleanUnusedManifests(
             Snapshot snapshot, Set<String> skippingSet, boolean 
deleteChangelog) {
         // clean base and delta manifests
+        List<String> toDeleteManifests = new ArrayList<>();
         List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
         
toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList()));
         
toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
         for (ManifestFileMeta manifest : toExpireManifests) {
             String fileName = manifest.fileName();
             if (!skippingSet.contains(fileName)) {
-                manifestFile.delete(fileName);
+                toDeleteManifests.add(fileName);
                 // to avoid other snapshots trying to delete again
                 skippingSet.add(fileName);
             }
         }
+        deleteFiles(toDeleteManifests, manifestFile::delete);
 
+        toDeleteManifests.clear();
         if (!skippingSet.contains(snapshot.baseManifestList())) {
-            manifestList.delete(snapshot.baseManifestList());
+            toDeleteManifests.add(snapshot.baseManifestList());
         }
         if (!skippingSet.contains(snapshot.deltaManifestList())) {
-            manifestList.delete(snapshot.deltaManifestList());
+            toDeleteManifests.add(snapshot.deltaManifestList());
         }
+        deleteFiles(toDeleteManifests, manifestList::delete);
 
         // clean changelog manifests
         if (deleteChangelog && snapshot.changelogManifestList() != null) {
-            for (ManifestFileMeta manifest :
-                    tryReadManifestList(snapshot.changelogManifestList())) {
-                manifestFile.delete(manifest.fileName());
-            }
+            deleteFiles(
+                    tryReadManifestList(snapshot.changelogManifestList()),
+                    manifest -> manifestFile.delete(manifest.fileName()));
             manifestList.delete(snapshot.changelogManifestList());
         }
 
@@ -176,11 +188,11 @@ public abstract class FileDeletionBase {
         String indexManifest = snapshot.indexManifest();
         // check exists, it may have been deleted by other snapshots
         if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
-            for (IndexManifestEntry entry : 
indexFileHandler.readManifest(indexManifest)) {
-                if (!skippingSet.contains(entry.indexFile().fileName())) {
-                    indexFileHandler.deleteIndexFile(entry);
-                }
-            }
+            List<IndexManifestEntry> indexManifestEntries =
+                    indexFileHandler.readManifest(indexManifest);
+            indexManifestEntries.removeIf(
+                    entry -> 
skippingSet.contains(entry.indexFile().fileName()));
+            deleteFiles(indexManifestEntries, 
indexFileHandler::deleteIndexFile);
 
             if (!skippingSet.contains(indexManifest)) {
                 indexFileHandler.deleteManifest(indexManifest);
@@ -305,4 +317,22 @@ public abstract class FileDeletionBase {
             return false;
         }
     }
+
+    protected <T> void deleteFiles(Collection<T> files, Consumer<T> deletion) {
+        if (files.isEmpty()) {
+            return;
+        }
+
+        List<CompletableFuture<Void>> deletionFutures = new 
ArrayList<>(files.size());
+        for (T file : files) {
+            deletionFutures.add(
+                    CompletableFuture.runAsync(() -> deletion.accept(file), 
ioExecutor));
+        }
+
+        try {
+            CompletableFuture.allOf(deletionFutures.toArray(new 
CompletableFuture[0])).get();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index cd52589d0..8b177bdf9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
@@ -56,6 +57,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
     private final SnapshotDeletion snapshotDeletion;
 
     private final TagManager tagManager;
+    private final int expireLimit;
 
     private Lock lock;
 
@@ -65,13 +67,17 @@ public class FileStoreExpireImpl implements FileStoreExpire 
{
             long millisRetained,
             SnapshotManager snapshotManager,
             SnapshotDeletion snapshotDeletion,
-            TagManager tagManager) {
+            TagManager tagManager,
+            int expireLimit) {
         Preconditions.checkArgument(
                 numRetainedMin >= 1,
                 "The minimum number of completed snapshots to retain should be 
>= 1.");
         Preconditions.checkArgument(
                 numRetainedMax >= numRetainedMin,
                 "The maximum number of snapshots to retain should be >= the 
minimum number.");
+        Preconditions.checkArgument(
+                expireLimit > 1,
+                String.format("The %s should be > 1.", 
CoreOptions.SNAPSHOT_EXPIRE_LIMIT.key()));
         this.numRetainedMin = numRetainedMin;
         this.numRetainedMax = numRetainedMax;
         this.millisRetained = millisRetained;
@@ -80,6 +86,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
                 new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         this.snapshotDeletion = snapshotDeletion;
         this.tagManager = tagManager;
+        this.expireLimit = expireLimit;
     }
 
     @Override
@@ -152,6 +159,9 @@ public class FileStoreExpireImpl implements FileStoreExpire 
{
                 break;
             }
         }
+
+        endExclusiveId = Math.min(beginInclusiveId + expireLimit, 
endExclusiveId);
+
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Snapshot expire range is [" + beginInclusiveId + ", " + 
endExclusiveId + ")");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index 123fdabc5..8b9740c33 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -94,18 +94,20 @@ public class SnapshotDeletion extends FileDeletionBase {
             }
         }
 
+        List<Path> actualDataFileToDelete = new ArrayList<>();
         dataFileToDelete.forEach(
                 (path, pair) -> {
                     ManifestEntry entry = pair.getLeft();
                     // check whether we should skip the data file
                     if (!skipper.test(entry)) {
                         // delete data files
-                        fileIO.deleteQuietly(path);
-                        pair.getRight().forEach(fileIO::deleteQuietly);
+                        actualDataFileToDelete.add(path);
+                        actualDataFileToDelete.addAll(pair.getRight());
 
                         recordDeletionBuckets(entry);
                     }
                 });
+        deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly);
     }
 
     /**
@@ -118,15 +120,17 @@ public class SnapshotDeletion extends FileDeletionBase {
     }
 
     public void deleteAddedDataFiles(Iterable<ManifestEntry> manifestEntries) {
+        List<Path> dataFileToDelete = new ArrayList<>();
         for (ManifestEntry entry : manifestEntries) {
             if (entry.kind() == FileKind.ADD) {
-                fileIO.deleteQuietly(
+                dataFileToDelete.add(
                         new Path(
                                 pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
                                 entry.file().fileName()));
                 recordDeletionBuckets(entry);
             }
         }
+        deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
     }
 
     public Predicate<ManifestEntry> dataFileSkipper(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 8615cbc86..77e3d7c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -60,17 +61,19 @@ public class TagDeletion extends FileDeletionBase {
 
     public void cleanUnusedDataFiles(
             Iterable<ManifestEntry> entries, Predicate<ManifestEntry> skipper) 
{
+        List<Path> dataFileToDelete = new ArrayList<>();
         for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
             if (!skipper.test(entry)) {
                 Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
-                fileIO.deleteQuietly(new Path(bucketPath, 
entry.file().fileName()));
+                dataFileToDelete.add(new Path(bucketPath, 
entry.file().fileName()));
                 for (String file : entry.file().extraFiles()) {
-                    fileIO.deleteQuietly(new Path(bucketPath, file));
+                    dataFileToDelete.add(new Path(bucketPath, file));
                 }
 
                 recordDeletionBuckets(entry);
             }
         }
+        deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
     }
 
     public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index ea939bf49..eb15566b2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -264,7 +264,8 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 coreOptions().writeOnly() ? null : 
store().newTagCreationManager(),
                 catalogEnvironment.lockFactory().create(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
-                new ConsumerManager(fileIO, path));
+                new ConsumerManager(fileIO, path),
+                coreOptions().snapshotExpireExecutionMode());
     }
 
     private List<CommitCallback> createCommitCallbacks() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e5ba0eba1..7293dce3a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.operation.FileStoreCommit;
@@ -25,8 +26,14 @@ import org.apache.paimon.operation.FileStoreExpire;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.tag.TagAutoCreation;
+import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.IOUtils;
 
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.time.Duration;
@@ -37,8 +44,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
@@ -46,6 +57,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
  * snapshot commit and expiration.
  */
 public class TableCommitImpl implements InnerTableCommit {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableCommitImpl.class);
 
     private final FileStoreCommit commit;
     private final List<CommitCallback> commitCallbacks;
@@ -61,6 +73,9 @@ public class TableCommitImpl implements InnerTableCommit {
 
     private boolean batchCommitted = false;
 
+    private ExecutorService expireMainExecutor;
+    private AtomicReference<Throwable> expireError;
+
     public TableCommitImpl(
             FileStoreCommit commit,
             List<CommitCallback> commitCallbacks,
@@ -69,7 +84,8 @@ public class TableCommitImpl implements InnerTableCommit {
             @Nullable TagAutoCreation tagAutoCreation,
             Lock lock,
             @Nullable Duration consumerExpireTime,
-            ConsumerManager consumerManager) {
+            ConsumerManager consumerManager,
+            ExpireExecutionMode expireExecutionMode) {
         commit.withLock(lock);
         if (expire != null) {
             expire.withLock(lock);
@@ -87,6 +103,14 @@ public class TableCommitImpl implements InnerTableCommit {
 
         this.consumerExpireTime = consumerExpireTime;
         this.consumerManager = consumerManager;
+
+        this.expireMainExecutor =
+                expireExecutionMode == ExpireExecutionMode.SYNC
+                        ? MoreExecutors.newDirectExecutorService()
+                        : Executors.newSingleThreadExecutor(
+                                new ExecutorThreadFactory(
+                                        Thread.currentThread().getName() + 
"expire-main-thread"));
+        this.expireError = new AtomicReference<>(null);
     }
 
     @Override
@@ -145,7 +169,7 @@ public class TableCommitImpl implements InnerTableCommit {
                 commit.commit(committable, new HashMap<>());
             }
             if (!committables.isEmpty()) {
-                expire(committables.get(committables.size() - 1).identifier());
+                expire(committables.get(committables.size() - 1).identifier(), 
expireMainExecutor);
             }
         } else {
             ManifestCommittable committable;
@@ -162,7 +186,7 @@ public class TableCommitImpl implements InnerTableCommit {
                 committable = new ManifestCommittable(Long.MAX_VALUE);
             }
             commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
-            expire(committable.identifier());
+            expire(committable.identifier(), expireMainExecutor);
         }
 
         commitCallbacks.forEach(c -> c.call(committables));
@@ -195,6 +219,22 @@ public class TableCommitImpl implements InnerTableCommit {
         return retryCommittables.size();
     }
 
+    private void expire(long partitionExpireIdentifier, ExecutorService 
executor) {
+        if (expireError.get() != null) {
+            throw new RuntimeException(expireError.get());
+        }
+
+        executor.execute(
+                () -> {
+                    try {
+                        expire(partitionExpireIdentifier);
+                    } catch (Throwable t) {
+                        LOG.error("Executing expire encountered an error.", t);
+                        expireError.compareAndSet(null, t);
+                    }
+                });
+    }
+
     private void expire(long partitionExpireIdentifier) {
         // expire consumer first to avoid preventing snapshot expiration
         if (consumerExpireTime != null) {
@@ -220,10 +260,16 @@ public class TableCommitImpl implements InnerTableCommit {
             IOUtils.closeQuietly(commitCallback);
         }
         IOUtils.closeQuietly(lock);
+        expireMainExecutor.shutdownNow();
     }
 
     @Override
     public void abort(List<CommitMessage> commitMessages) {
         commit.abort(commitMessages);
     }
+
+    @VisibleForTesting
+    public ExecutorService getExpireMainExecutor() {
+        return expireMainExecutor;
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index f545f7a28..6b3b3f8a0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -138,7 +138,8 @@ public class TestFileStore extends KeyValueFileStore {
                 millisRetained,
                 snapshotManager(),
                 newSnapshotDeletion(),
-                new TagManager(fileIO, options.path()));
+                new TagManager(fileIO, options.path()),
+                Integer.MAX_VALUE);
     }
 
     public List<Snapshot> commitData(
@@ -492,12 +493,23 @@ public class TestFileStore extends KeyValueFileStore {
     }
 
     public Set<Path> getFilesInUse(long snapshotId) {
-        Set<Path> result = new HashSet<>();
+        return getFilesInUse(
+                snapshotId,
+                snapshotManager(),
+                newScan(),
+                fileIO,
+                pathFactory(),
+                manifestListFactory().create());
+    }
 
-        SnapshotManager snapshotManager = snapshotManager();
-        FileStorePathFactory pathFactory = pathFactory();
-        ManifestList manifestList = manifestListFactory().create();
-        FileStoreScan scan = newScan();
+    public static Set<Path> getFilesInUse(
+            long snapshotId,
+            SnapshotManager snapshotManager,
+            FileStoreScan scan,
+            FileIO fileIO,
+            FileStorePathFactory pathFactory,
+            ManifestList manifestList) {
+        Set<Path> result = new HashSet<>();
 
         Path snapshotPath = snapshotManager.snapshotPath(snapshotId);
         Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 8d7f94140..b87311c4c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -18,8 +18,10 @@
 
 package org.apache.paimon.table;
 
+import org.apache.paimon.AbstractFileStore;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
@@ -35,6 +37,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.operation.FileStoreExpire;
+import org.apache.paimon.operation.FileStoreTestUtils;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -48,6 +52,7 @@ import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -66,6 +71,7 @@ import org.apache.paimon.utils.TraceableFileIO;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -75,13 +81,17 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -91,7 +101,10 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_EXECUTION_MODE;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.WRITE_ONLY;
@@ -907,6 +920,154 @@ public abstract class FileStoreTableTestBase {
                                 IllegalArgumentException.class, "Tag 'tag1' 
doesn't exist."));
     }
 
+    @Test
+    @Timeout(120)
+    public void testAsyncExpireExecutionMode() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        Map<String, String> options = new HashMap<>();
+        options.put(SNAPSHOT_EXPIRE_EXECUTION_MODE.key(), 
ExpireExecutionMode.ASYNC.toString());
+        options.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+        options.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+        options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2");
+
+        TableCommitImpl commit = table.copy(options).newCommit(commitUser);
+        ExecutorService executor = commit.getExpireMainExecutor();
+        CountDownLatch before = new CountDownLatch(1);
+        CountDownLatch after = new CountDownLatch(1);
+
+        executor.execute(
+                () -> {
+                    try {
+                        before.await();
+                    } catch (Exception ignore) {
+                        // ignore
+                    }
+                });
+
+        try (StreamTableWrite write = table.newWrite(commitUser)) {
+            for (int i = 0; i < 10; i++) {
+                write.write(rowData(i, 10 * i, 100L * i));
+                commit.commit(i, write.prepareCommit(false, i));
+            }
+        }
+
+        executor.execute(after::countDown);
+
+        SnapshotManager snapshotManager = table.snapshotManager();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        AbstractFileStore<?> store = (AbstractFileStore<?>) table.store();
+        Set<Path> filesInUse =
+                TestFileStore.getFilesInUse(
+                        latestSnapshotId,
+                        snapshotManager,
+                        store.newScan(),
+                        table.fileIO(),
+                        store.pathFactory(),
+                        store.manifestListFactory().create());
+
+        List<Path> unusedFileList =
+                Files.walk(Paths.get(tempDir.toString()))
+                        .filter(Files::isRegularFile)
+                        .filter(p -> 
!p.getFileName().toString().startsWith("snapshot"))
+                        .filter(p -> 
!p.getFileName().toString().startsWith("schema"))
+                        .filter(p -> 
!p.getFileName().toString().equals(SnapshotManager.LATEST))
+                        .filter(p -> 
!p.getFileName().toString().equals(SnapshotManager.EARLIEST))
+                        .map(p -> new Path(TraceableFileIO.SCHEME + "://" + 
p.toString()))
+                        .filter(p -> !filesInUse.contains(p))
+                        .collect(Collectors.toList());
+
+        // no expire happens, all files are preserved
+        for (int i = 1; i <= latestSnapshotId; i++) {
+            assertThat(snapshotManager.snapshotExists(i)).isTrue();
+        }
+        for (Path file : unusedFileList) {
+            FileStoreTestUtils.assertPathExists(table.fileIO(), file);
+        }
+
+        // waiting for all expire, only keeping files in use.
+        before.countDown();
+        after.await();
+
+        for (int i = 1; i < latestSnapshotId - 1; i++) {
+            assertThat(snapshotManager.snapshotExists(i)).isFalse();
+        }
+        for (Path file : unusedFileList) {
+            FileStoreTestUtils.assertPathNotExists(table.fileIO(), file);
+        }
+        assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
+        
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(latestSnapshotId);
+        
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestSnapshotId);
+
+        commit.close();
+    }
+
+    @Test
+    @Timeout(120)
+    public void testExpireWithLimit() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        Map<String, String> options = new HashMap<>();
+        options.put(SNAPSHOT_EXPIRE_EXECUTION_MODE.key(), 
ExpireExecutionMode.ASYNC.toString());
+        options.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+        options.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+        options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2");
+
+        table = table.copy(options);
+        TableCommitImpl commit =
+                table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"))
+                        .newCommit(commitUser);
+        FileStoreExpire expire = table.store().newExpire();
+
+        try (StreamTableWrite write = table.newWrite(commitUser)) {
+            for (int i = 0; i < 10; i++) {
+                write.write(rowData(i, 10 * i, 100L * i));
+                commit.commit(i, write.prepareCommit(false, i));
+            }
+        }
+
+        SnapshotManager snapshotManager = table.snapshotManager();
+        List<Snapshot> remainingSnapshot = new ArrayList<>();
+        snapshotManager.snapshots().forEachRemaining(remainingSnapshot::add);
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+        int index = 0;
+
+        // trigger the first expire and the first two snapshots expired
+        expire.expire();
+        
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+        
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+        for (int i = index; i < remainingSnapshot.size(); i++) {
+            
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(i).id())).isTrue();
+        }
+        assertThat(snapshotManager.earliestSnapshotId())
+                .isEqualTo(remainingSnapshot.get(index).id());
+
+        // trigger the second expire and the second two snapshots expired
+        expire.expire();
+        
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+        
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+        for (int i = index; i < remainingSnapshot.size(); i++) {
+            
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(i).id())).isTrue();
+        }
+        assertThat(snapshotManager.earliestSnapshotId())
+                .isEqualTo(remainingSnapshot.get(index).id());
+
+        // trigger all remaining expires and only the last snapshot remaining
+        for (int i = 0; i < 5; i++) {
+            expire.expire();
+        }
+
+        for (int i = 0; i < remainingSnapshot.size() - 1; i++) {
+            
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(i).id())).isFalse();
+        }
+        assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
+        
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(latestSnapshotId);
+        
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestSnapshotId);
+
+        commit.close();
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,


Reply via email to