This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 937feb2bf [core] Introduce expire_snapshots procedure (#2738)
937feb2bf is described below

commit 937feb2bf9eea33e6faf49d107ae5fedac106927
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 19 15:23:37 2024 +0800

    [core] Introduce expire_snapshots procedure (#2738)
---
 docs/content/engines/flink.md                      |  13 +++
 docs/content/engines/spark.md                      |  16 ++-
 .../java/org/apache/paimon/AbstractFileStore.java  |  14 ---
 .../src/main/java/org/apache/paimon/FileStore.java |   3 -
 .../paimon/table/AbstractFileStoreTable.java       |  32 ++++-
 .../ExpireSnapshots.java}                          |  18 +--
 .../ExpireSnapshotsImpl.java}                      | 130 +++++++++------------
 .../org/apache/paimon/table/ReadonlyTable.java     |   8 ++
 .../main/java/org/apache/paimon/table/Table.java   |   2 +
 .../apache/paimon/table/sink/TableCommitImpl.java  |  25 ++--
 .../test/java/org/apache/paimon/TestFileStore.java |  25 ++--
 .../operation/CleanedFileStoreExpireTest.java      |  24 ++--
 .../paimon/operation/OrphanFilesCleanTest.java     |   2 +-
 .../operation/UncleanedFileStoreExpireTest.java    |   7 +-
 .../paimon/table/FileStoreTableTestBase.java       |  11 +-
 .../paimon/table/IndexFileExpireTableTest.java     |   7 +-
 .../flink/procedure/ExpireSnapshotsProcedure.java  |  26 +++--
 .../paimon/flink/procedure/ProcedureBase.java      |   6 +
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../apache/paimon/flink/BatchFileStoreITCase.java  |   2 +-
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/procedure/ExpireSnapshotsProcedure.java  | 108 +++++++++++++++++
 .../apache/paimon/spark/SparkTimeTravelITCase.java |   2 +-
 .../procedure/ExpireSnapshotsProcedureTest.scala   |  85 ++++++++++++++
 24 files changed, 398 insertions(+), 171 deletions(-)

diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index 7175269ab..822cbd7d7 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -502,5 +502,18 @@ table options syntax: we use string to represent table 
options. The format is 'k
       </td>
       <td>CALL sys.rollback_to('default.T', 10)</td>
    </tr>
+   <tr>
+      <td>expire_snapshots</td>
+      <td>
+         -- expires snapshot<br/>
+         CALL sys.expire_snapshots('identifier', retainMax)<br/><br/>
+      </td>
+      <td>
+         To expire snapshots. Argument:
+            <li>identifier: the target table identifier. Cannot be empty.</li>
+            <li>retainMax: the maximum number of completed snapshots to 
retain.</li>
+      </td>
+      <td>CALL sys.expire_snapshots('default.T', 2)</td>
+   </tr>
    </tbody>
 </table>
diff --git a/docs/content/engines/spark.md b/docs/content/engines/spark.md
index a583d5a24..1b2d2f1a4 100644
--- a/docs/content/engines/spark.md
+++ b/docs/content/engines/spark.md
@@ -549,12 +549,11 @@ Here list the configurations.
 ## Spark Procedure
 
 This section introduce all available spark procedures about paimon.
-
+s
 <table class="table table-bordered">
     <thead>
     <tr>
       <th class="text-left" style="width: 4%">Procedure Name</th>
-      <th class="text-left" style="width: 4%">Usage</th>
       <th class="text-left" style="width: 20%">Explaination</th>
       <th class="text-left" style="width: 4%">Example</th>
     </tr>
@@ -562,10 +561,21 @@ This section introduce all available spark procedures 
about paimon.
     <tbody style="font-size: 12px; ">
     <tr>
       <td>compact</td>
-      <td><nobr>CALL [paimon.]sys.compact(table => '&ltidentifier&gt' 
[,partitions => '&ltpartitions&gt'] </nobr><br>[, order_strategy 
=>'&ltsort_type&gt'] [,order_by => '&ltcolumns&gt'])</td>
       <td>identifier: the target table identifier. Cannot be 
empty.<br><br><nobr>partitions: partition filter. Left empty for all 
partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 
'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'. 
<br><br><nobr>order_columns: the columns need to be sort. Left empty if 
'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two 
partitions date=01 and date=02, you need to write 'date=01;date=02'<br> [...]
       <td><nobr>SET spark.sql.shuffle.partitions=10; --set the compact 
parallelism</nobr><br><nobr>CALL sys.compact(table => 'T', partitions => 'p=0', 
 order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
     </tr>
+    <tr>
+      <td>expire_snapshots</td>
+      <td>
+         To expire snapshots. Argument:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>retainMax: the maximum number of completed snapshots to 
retain.</li>
+            <li>retainMin: the minimum number of completed snapshots to 
retain.</li>
+            <li>older_than: timestamp before which snapshots will be 
removed.</li>
+            <li>max_deletes: the maximum number of snapshots that can be 
deleted at once.</li>
+      </td>
+      <td>CALL sys.expire_snapshots(table => 'default.T', retainMax => 10)</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 0bd878c96..28672bf4f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -28,7 +28,6 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.metastore.AddPartitionTagCallback;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
@@ -179,19 +178,6 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 newKeyComparator());
     }
 
-    @Override
-    public FileStoreExpireImpl newExpire() {
-        return new FileStoreExpireImpl(
-                options.snapshotNumRetainMin(),
-                options.snapshotNumRetainMax(),
-                options.snapshotTimeRetain().toMillis(),
-                snapshotManager(),
-                newSnapshotDeletion(),
-                newTagManager(),
-                options.snapshotExpireLimit(),
-                options.snapshotExpireCleanEmptyDirectories());
-    }
-
     @Override
     public SnapshotDeletion newSnapshotDeletion() {
         return new SnapshotDeletion(
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 1b7f9f1c9..5406828d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -23,7 +23,6 @@ import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.operation.FileStoreExpire;
 import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.FileStoreWrite;
@@ -77,8 +76,6 @@ public interface FileStore<T> extends Serializable {
 
     FileStoreCommit newCommit(String commitUser);
 
-    FileStoreExpire newExpire();
-
     SnapshotDeletion newSnapshotDeletion();
 
     TagManager newTagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index f9da880d2..5929bc1ab 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -269,18 +269,42 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return store().snapshotManager();
     }
 
+    @Override
+    public ExpireSnapshots newExpireSnapshots() {
+        return new ExpireSnapshotsImpl(
+                snapshotManager(),
+                store().newSnapshotDeletion(),
+                store().newTagManager(),
+                coreOptions().snapshotExpireCleanEmptyDirectories());
+    }
+
     @Override
     public TableCommitImpl newCommit(String commitUser) {
+        CoreOptions options = coreOptions();
+        Runnable snapshotExpire = null;
+        if (!options.writeOnly()) {
+            ExpireSnapshots expireSnapshots =
+                    newExpireSnapshots()
+                            .retainMax(options.snapshotNumRetainMax())
+                            .retainMin(options.snapshotNumRetainMin())
+                            .maxDeletes(options.snapshotExpireLimit());
+            long snapshotTimeRetain = options.snapshotTimeRetain().toMillis();
+            snapshotExpire =
+                    () ->
+                            expireSnapshots
+                                    .olderThanMills(System.currentTimeMillis() 
- snapshotTimeRetain)
+                                    .expire();
+        }
         return new TableCommitImpl(
                 store().newCommit(commitUser),
                 createCommitCallbacks(),
-                coreOptions().writeOnly() ? null : store().newExpire(),
-                coreOptions().writeOnly() ? null : 
store().newPartitionExpire(commitUser),
-                coreOptions().writeOnly() ? null : 
store().newTagCreationManager(),
+                snapshotExpire,
+                options.writeOnly() ? null : 
store().newPartitionExpire(commitUser),
+                options.writeOnly() ? null : store().newTagCreationManager(),
                 catalogEnvironment.lockFactory().create(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
                 new ConsumerManager(fileIO, path),
-                coreOptions().snapshotExpireExecutionMode(),
+                options.snapshotExpireExecutionMode(),
                 name());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java
similarity index 72%
copy from 
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
copy to paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java
index 6827740ce..8aa9a3e2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java
@@ -16,14 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation;
+package org.apache.paimon.table;
 
-/** Expire operation which provides snapshots expire. */
-public interface FileStoreExpire {
+/** Expire snapshots. */
+public interface ExpireSnapshots {
 
-    /** With global lock. */
-    FileStoreExpire withLock(Lock lock);
+    ExpireSnapshots retainMax(int retainMax);
 
-    /** Expire snapshots. */
-    void expire();
+    ExpireSnapshots retainMin(int retainMin);
+
+    ExpireSnapshots olderThanMills(long olderThanMills);
+
+    ExpireSnapshots maxDeletes(int maxDeletes);
+
+    int expire();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
 b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
similarity index 69%
rename from 
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 522907587..00c40d0fe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -16,112 +16,99 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation;
+package org.apache.paimon.table;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.function.Predicate;
 
-/**
- * Default implementation of {@link FileStoreExpire}. It retains a certain 
number or period of
- * latest snapshots.
- *
- * <p>NOTE: This implementation will keep at least one snapshot so that users 
will not accidentally
- * clear all snapshots.
- *
- * <p>TODO: add concurrent tests.
- */
-public class FileStoreExpireImpl implements FileStoreExpire {
+/** An implementation for {@link ExpireSnapshots}. */
+public class ExpireSnapshotsImpl implements ExpireSnapshots {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreExpireImpl.class);
-
-    private final int numRetainedMin;
-    // snapshots exceeding any constraint will be expired
-    private final int numRetainedMax;
-    private final long millisRetained;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsImpl.class);
 
     private final SnapshotManager snapshotManager;
     private final ConsumerManager consumerManager;
     private final SnapshotDeletion snapshotDeletion;
-
     private final TagManager tagManager;
-    private final int expireLimit;
-    private final boolean snapshotExpireCleanEmptyDirectories;
+    private final boolean cleanEmptyDirectories;
 
-    private Lock lock;
+    private int retainMax = Integer.MAX_VALUE;
+    private int retainMin = 1;
+    private long olderThanMills = 0;
+    private int maxDeletes = Integer.MAX_VALUE;
 
-    public FileStoreExpireImpl(
-            int numRetainedMin,
-            int numRetainedMax,
-            long millisRetained,
+    public ExpireSnapshotsImpl(
             SnapshotManager snapshotManager,
             SnapshotDeletion snapshotDeletion,
             TagManager tagManager,
-            int expireLimit,
-            boolean snapshotExpireCleanEmptyDirectories) {
-        Preconditions.checkArgument(
-                numRetainedMin >= 1,
-                "The minimum number of completed snapshots to retain should be 
>= 1.");
-        Preconditions.checkArgument(
-                numRetainedMax >= numRetainedMin,
-                "The maximum number of snapshots to retain should be >= the 
minimum number.");
-        Preconditions.checkArgument(
-                expireLimit > 1,
-                String.format("The %s should be > 1.", 
CoreOptions.SNAPSHOT_EXPIRE_LIMIT.key()));
-        this.numRetainedMin = numRetainedMin;
-        this.numRetainedMax = numRetainedMax;
-        this.millisRetained = millisRetained;
+            boolean cleanEmptyDirectories) {
         this.snapshotManager = snapshotManager;
         this.consumerManager =
                 new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         this.snapshotDeletion = snapshotDeletion;
         this.tagManager = tagManager;
-        this.expireLimit = expireLimit;
-        this.snapshotExpireCleanEmptyDirectories = 
snapshotExpireCleanEmptyDirectories;
+        this.cleanEmptyDirectories = cleanEmptyDirectories;
     }
 
     @Override
-    public FileStoreExpire withLock(Lock lock) {
-        this.lock = lock;
+    public ExpireSnapshotsImpl retainMax(int retainMax) {
+        this.retainMax = retainMax;
         return this;
     }
 
     @Override
-    public void expire() {
+    public ExpireSnapshotsImpl retainMin(int retainMin) {
+        this.retainMin = retainMin;
+        return this;
+    }
+
+    @Override
+    public ExpireSnapshotsImpl olderThanMills(long olderThanMills) {
+        this.olderThanMills = olderThanMills;
+        return this;
+    }
+
+    @Override
+    public ExpireSnapshotsImpl maxDeletes(int maxDeletes) {
+        this.maxDeletes = maxDeletes;
+        return this;
+    }
+
+    @Override
+    public int expire() {
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         if (latestSnapshotId == null) {
             // no snapshot, nothing to expire
-            return;
+            return 0;
         }
 
-        long currentMillis = System.currentTimeMillis();
-
         Long earliest = snapshotManager.earliestSnapshotId();
         if (earliest == null) {
-            return;
+            return 0;
         }
 
         // the min snapshot to retain from 'snapshot.num-retained.max'
         // (the maximum number of snapshots to retain)
-        long min = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);
+        long min = Math.max(latestSnapshotId - retainMax + 1, earliest);
 
         // the max exclusive snapshot to expire until
         // protected by 'snapshot.num-retained.min'
         // (the minimum number of completed snapshots to retain)
-        long maxExclusive = latestSnapshotId - numRetainedMin + 1;
+        long maxExclusive = latestSnapshotId - retainMin + 1;
 
         // the snapshot being read by the consumer cannot be deleted
         maxExclusive =
@@ -129,24 +116,22 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
 
         // protected by 'snapshot.expire.limit'
         // (the maximum number of snapshots allowed to expire at a time)
-        maxExclusive = Math.min(maxExclusive, earliest + expireLimit);
+        maxExclusive = Math.min(maxExclusive, earliest + maxDeletes);
 
         for (long id = min; id < maxExclusive; id++) {
             // Early exit the loop for 'snapshot.time-retained'
             // (the maximum time of snapshots to retain)
             if (snapshotManager.snapshotExists(id)
-                    && currentMillis - 
snapshotManager.snapshot(id).timeMillis()
-                            <= millisRetained) {
-                expireUntil(earliest, id);
-                return;
+                    && olderThanMills <= 
snapshotManager.snapshot(id).timeMillis()) {
+                return expireUntil(earliest, id);
             }
         }
 
-        expireUntil(earliest, maxExclusive);
+        return expireUntil(earliest, maxExclusive);
     }
 
     @VisibleForTesting
-    public void expireUntil(long earliestId, long endExclusiveId) {
+    public int expireUntil(long earliestId, long endExclusiveId) {
         if (endExclusiveId <= earliestId) {
             // No expire happens:
             // write the hint file in order to see the earliest snapshot 
directly next time
@@ -156,7 +141,7 @@ public class FileStoreExpireImpl implements FileStoreExpire 
{
             }
 
             // fast exit
-            return;
+            return 0;
         }
 
         // find first snapshot to expire
@@ -214,7 +199,7 @@ public class FileStoreExpireImpl implements FileStoreExpire 
{
 
         // data files and changelog files in bucket directories has been 
deleted
         // then delete changed bucket directories if they are empty
-        if (snapshotExpireCleanEmptyDirectories) {
+        if (cleanEmptyDirectories) {
             snapshotDeletion.cleanDataDirectories();
         }
 
@@ -237,30 +222,19 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
         }
 
         writeEarliestHint(endExclusiveId);
+        return (int) (endExclusiveId - beginInclusiveId);
     }
 
     private void writeEarliestHint(long earliest) {
-        // update earliest hint file
-
-        Callable<Void> callable =
-                () -> {
-                    snapshotManager.commitEarliestHint(earliest);
-                    return null;
-                };
-
         try {
-            if (lock != null) {
-                lock.runWithLock(callable);
-            } else {
-                callable.call();
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+            snapshotManager.commitEarliestHint(earliest);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
         }
     }
 
     @VisibleForTesting
-    SnapshotDeletion snapshotDeletion() {
+    public SnapshotDeletion snapshotDeletion() {
         return snapshotDeletion;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 3e3bcb2d7..66b9142d2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -142,4 +142,12 @@ public interface ReadonlyTable extends InnerTable {
                         "Readonly Table %s does not support deleteBranch.",
                         this.getClass().getSimpleName()));
     }
+
+    @Override
+    default ExpireSnapshots newExpireSnapshots() {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support expireSnapshots.",
+                        this.getClass().getSimpleName()));
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 4f713e992..b9c28cd69 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -91,6 +91,8 @@ public interface Table extends Serializable {
     @Experimental
     void deleteBranch(String branchName);
 
+    ExpireSnapshots newExpireSnapshots();
+
     // =============== Read & Write Operations ==================
 
     /** Returns a new read builder. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index bfc52ec5e..fb7ab7974 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -28,7 +28,6 @@ import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.operation.FileStoreExpire;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.metrics.CommitMetrics;
@@ -68,16 +67,13 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
-/**
- * An abstraction layer above {@link FileStoreCommit} and {@link 
FileStoreExpire} to provide
- * snapshot commit and expiration.
- */
+/** An abstraction layer above {@link FileStoreCommit} to provide snapshot 
commit and expiration. */
 public class TableCommitImpl implements InnerTableCommit {
     private static final Logger LOG = 
LoggerFactory.getLogger(TableCommitImpl.class);
 
     private final FileStoreCommit commit;
     private final List<CommitCallback> commitCallbacks;
-    @Nullable private final FileStoreExpire expire;
+    @Nullable private final Runnable expireSnapshots;
     @Nullable private final PartitionExpire partitionExpire;
     @Nullable private final TagAutoCreation tagAutoCreation;
     private final Lock lock;
@@ -96,7 +92,7 @@ public class TableCommitImpl implements InnerTableCommit {
     public TableCommitImpl(
             FileStoreCommit commit,
             List<CommitCallback> commitCallbacks,
-            @Nullable FileStoreExpire expire,
+            @Nullable Runnable expireSnapshots,
             @Nullable PartitionExpire partitionExpire,
             @Nullable TagAutoCreation tagAutoCreation,
             Lock lock,
@@ -105,16 +101,13 @@ public class TableCommitImpl implements InnerTableCommit {
             ExpireExecutionMode expireExecutionMode,
             String tableName) {
         commit.withLock(lock);
-        if (expire != null) {
-            expire.withLock(lock);
-        }
         if (partitionExpire != null) {
             partitionExpire.withLock(lock);
         }
 
         this.commit = commit;
         this.commitCallbacks = commitCallbacks;
-        this.expire = expire;
+        this.expireSnapshots = expireSnapshots;
         this.partitionExpire = partitionExpire;
         this.tagAutoCreation = tagAutoCreation;
         this.lock = lock;
@@ -340,9 +333,7 @@ public class TableCommitImpl implements InnerTableCommit {
             
consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
         }
 
-        if (expire != null) {
-            expire.expire();
-        }
+        expireSnapshots();
 
         if (partitionExpire != null) {
             partitionExpire.expire(partitionExpireIdentifier);
@@ -353,6 +344,12 @@ public class TableCommitImpl implements InnerTableCommit {
         }
     }
 
+    public void expireSnapshots() {
+        if (expireSnapshots != null) {
+            expireSnapshots.run();
+        }
+    }
+
     @Override
     public void close() throws Exception {
         for (CommitCallback commitCallback : commitCallbacks) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 0a9b2bf5b..d9cc7f878 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -36,7 +36,6 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
@@ -46,6 +45,8 @@ import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.table.ExpireSnapshotsImpl;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -134,25 +135,23 @@ public class TestFileStore extends KeyValueFileStore {
         return super.newCommit(commitUser);
     }
 
-    public FileStoreExpireImpl newExpire(
-            int numRetainedMin, int numRetainedMax, long millisRetained) {
+    public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, 
long millisRetained) {
         return newExpire(numRetainedMin, numRetainedMax, millisRetained, true);
     }
 
-    public FileStoreExpireImpl newExpire(
+    public ExpireSnapshots newExpire(
             int numRetainedMin,
             int numRetainedMax,
             long millisRetained,
             boolean snapshotExpireCleanEmptyDirectories) {
-        return new FileStoreExpireImpl(
-                numRetainedMin,
-                numRetainedMax,
-                millisRetained,
-                snapshotManager(),
-                newSnapshotDeletion(),
-                new TagManager(fileIO, options.path()),
-                Integer.MAX_VALUE,
-                snapshotExpireCleanEmptyDirectories);
+        return new ExpireSnapshotsImpl(
+                        snapshotManager(),
+                        newSnapshotDeletion(),
+                        new TagManager(fileIO, options.path()),
+                        snapshotExpireCleanEmptyDirectories)
+                .retainMax(numRetainedMax)
+                .retainMin(numRetainedMin)
+                .olderThanMills(System.currentTimeMillis() - millisRetained);
     }
 
     public List<Snapshot> commitData(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 1d5eb7329..463e8ba08 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.table.ExpireSnapshotsImpl;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -44,7 +46,7 @@ import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Tests for {@link FileStoreExpireImpl}. After expiration, only useful files 
should be retained.
+ * Tests for {@link ExpireSnapshotsImpl}. After expiration, only useful files 
should be retained.
  */
 public class CleanedFileStoreExpireTest extends FileStoreExpireTestBase {
 
@@ -55,7 +57,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
 
     @Test
     public void testExpireExtraFiles() throws IOException {
-        FileStoreExpireImpl expire = store.newExpire(1, 3, Long.MAX_VALUE);
+        ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(1, 
3, Long.MAX_VALUE);
 
         // write test files
         BinaryRow partition = gen.getPartition(gen.next());
@@ -98,7 +100,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
 
     @Test
     public void testNoSnapshot() {
-        FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);
+        ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE);
         expire.expire();
 
         assertThat(snapshotManager.latestSnapshotId()).isNull();
@@ -110,7 +112,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(2, allData, snapshotPositions);
         int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
-        FileStoreExpire expire = store.newExpire(1, latestSnapshotId + 1, 
Long.MAX_VALUE);
+        ExpireSnapshots expire = store.newExpire(1, latestSnapshotId + 1, 
Long.MAX_VALUE);
         expire.expire();
 
         for (int i = 1; i <= latestSnapshotId; i++) {
@@ -125,7 +127,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(5, allData, snapshotPositions);
         int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
-        FileStoreExpire expire = store.newExpire(1, Integer.MAX_VALUE, 
Long.MAX_VALUE);
+        ExpireSnapshots expire = store.newExpire(1, Integer.MAX_VALUE, 
Long.MAX_VALUE);
         expire.expire();
 
         for (int i = 1; i <= latestSnapshotId; i++) {
@@ -144,7 +146,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         commit(numRetainedMin + random.nextInt(5), allData, snapshotPositions);
         int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
         Thread.sleep(100);
-        FileStoreExpire expire = store.newExpire(numRetainedMin, 
Integer.MAX_VALUE, 1);
+        ExpireSnapshots expire = store.newExpire(numRetainedMin, 
Integer.MAX_VALUE, 1);
         expire.expire();
 
         for (int i = 1; i <= latestSnapshotId - numRetainedMin; i++) {
@@ -158,7 +160,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
 
     @Test
     public void testExpireWithNumber() throws Exception {
-        FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);
+        ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE);
 
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
@@ -194,7 +196,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
 
     @Test
     public void testExpireWithTime() throws Exception {
-        FileStoreExpire expire = store.newExpire(1, Integer.MAX_VALUE, 1000);
+        ExpireSnapshots expire = store.newExpire(1, Integer.MAX_VALUE, 1000);
 
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
@@ -203,8 +205,8 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         commit(5, allData, snapshotPositions);
         long expireMillis = System.currentTimeMillis();
         // expire twice to check for idempotence
-        expire.expire();
-        expire.expire();
+        expire.olderThanMills(expireMillis - 1000).expire();
+        expire.olderThanMills(expireMillis - 1000).expire();
 
         int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
         for (int i = 1; i <= latestSnapshotId; i++) {
@@ -252,7 +254,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
 
         // the data file still exists after expire
-        FileStoreExpire expire = store.newExpire(1, 1, Long.MAX_VALUE);
+        ExpireSnapshots expire = store.newExpire(1, 1, Long.MAX_VALUE);
         expire.expire();
         FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 759deb909..cf7535f76 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -195,7 +195,7 @@ public class OrphanFilesCleanTest {
         expireOptions.set(CoreOptions.SNAPSHOT_EXPIRE_LIMIT, snapshotCount);
         expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, snapshotCount 
- expired);
         expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, snapshotCount 
- expired);
-        table.copy(expireOptions.toMap()).store().newExpire().expire();
+        table.copy(expireOptions.toMap()).newCommit("").expireSnapshots();
 
         // randomly delete tags
         List<String> deleteTags = Collections.emptyList();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index a0794168f..ce93166a5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.ExpireSnapshots;
 import org.apache.paimon.utils.TagManager;
 
 import org.junit.jupiter.api.Test;
@@ -38,14 +39,14 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Tests for {@link FileStoreExpireImpl}. Some files not in use may still 
remain after the test due
- * to the testing methods.
+ * Tests for {@link ExpireSnapshots}. Some files not in use may still remain 
after the test due to
+ * the testing methods.
  */
 public class UncleanedFileStoreExpireTest extends FileStoreExpireTestBase {
 
     @Test
     public void testExpireWithMissingFiles() throws Exception {
-        FileStoreExpire expire = store.newExpire(1, 1, 1);
+        ExpireSnapshots expire = store.newExpire(1, 1, 1);
 
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f8f448814..c07fc2029 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -37,7 +37,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
-import org.apache.paimon.operation.FileStoreExpire;
 import org.apache.paimon.operation.FileStoreTestUtils;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -841,7 +840,7 @@ public abstract class FileStoreTableTestBase {
             Options options = new Options();
             options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
             options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
-            table.copy(options.toMap()).store().newExpire().expire();
+            table.copy(options.toMap()).newCommit("").expireSnapshots();
         }
 
         table.rollbackTo("test1");
@@ -1178,7 +1177,7 @@ public abstract class FileStoreTableTestBase {
         TableCommitImpl commit =
                 table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"))
                         .newCommit(commitUser);
-        FileStoreExpire expire = table.store().newExpire();
+        TableCommitImpl expire = table.newCommit(commitUser);
 
         try (StreamTableWrite write = table.newWrite(commitUser)) {
             for (int i = 0; i < 10; i++) {
@@ -1194,7 +1193,7 @@ public abstract class FileStoreTableTestBase {
         int index = 0;
 
         // trigger the first expire and the first two snapshots expired
-        expire.expire();
+        expire.expireSnapshots();
         
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
         
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
         for (int i = index; i < remainingSnapshot.size(); i++) {
@@ -1204,7 +1203,7 @@ public abstract class FileStoreTableTestBase {
                 .isEqualTo(remainingSnapshot.get(index).id());
 
         // trigger the second expire and the second two snapshots expired
-        expire.expire();
+        expire.expireSnapshots();
         
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
         
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
         for (int i = index; i < remainingSnapshot.size(); i++) {
@@ -1215,7 +1214,7 @@ public abstract class FileStoreTableTestBase {
 
         // trigger all remaining expires and only the last snapshot remaining
         for (int i = 0; i < 5; i++) {
-            expire.expire();
+            expire.expireSnapshots();
         }
 
         for (int i = 0; i < remainingSnapshot.size() - 1; i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index e4e890850..98816dd04 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -26,7 +26,6 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
@@ -60,7 +59,7 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
     @Test
     public void testIndexFileExpiration() throws Exception {
         prepareExpireTable();
-        FileStoreExpireImpl expire = (FileStoreExpireImpl) 
table.store().newExpire();
+        ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) 
table.newExpireSnapshots();
 
         long indexFileSize = indexFileSize();
         long indexManifestSize = indexManifestSize();
@@ -89,7 +88,7 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
     @Test
     public void testIndexFileExpirationWithTag() throws Exception {
         prepareExpireTable();
-        FileStoreExpireImpl expire = (FileStoreExpireImpl) 
table.store().newExpire();
+        ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) 
table.newExpireSnapshots();
 
         table.createTag("tag3", 3);
         table.createTag("tag5", 5);
@@ -115,7 +114,7 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
     @Test
     public void testIndexFileExpirationWhenDeletingTag() throws Exception {
         prepareExpireTable();
-        FileStoreExpireImpl expire = (FileStoreExpireImpl) 
table.store().newExpire();
+        ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) 
table.newExpireSnapshots();
 
         table.createTag("tag3", 3);
         table.createTag("tag5", 5);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
similarity index 53%
rename from 
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
index 6827740ce..db1d52170 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,14 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation;
+package org.apache.paimon.flink.procedure;
 
-/** Expire operation which provides snapshots expire. */
-public interface FileStoreExpire {
+import org.apache.paimon.catalog.Catalog;
 
-    /** With global lock. */
-    FileStoreExpire withLock(Lock lock);
+import org.apache.flink.table.procedure.ProcedureContext;
 
-    /** Expire snapshots. */
-    void expire();
+/** A procedure to expire snapshots. */
+public class ExpireSnapshotsProcedure extends ProcedureBase {
+
+    @Override
+    public String identifier() {
+        return "expire_snapshots";
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
int retainMax)
+            throws Catalog.TableNotExistException {
+        return new String[] {
+            table(tableId).newExpireSnapshots().retainMax(retainMax).expire() 
+ ""
+        };
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index fed5c43ac..fd7f74148 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -19,9 +19,11 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.factories.Factory;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.configuration.PipelineOptions;
@@ -46,6 +48,10 @@ public abstract class ProcedureBase implements Procedure, 
Factory {
         return this;
     }
 
+    protected Table table(String tableId) throws 
Catalog.TableNotExistException {
+        return catalog.getTable(Identifier.fromString(tableId));
+    }
+
     @Nullable
     protected String nullable(String arg) {
         return StringUtils.isBlank(arg) ? null : arg;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 634bd2768..e4b74f5cd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -42,3 +42,4 @@ org.apache.paimon.flink.procedure.MigrateTableProcedure
 org.apache.paimon.flink.procedure.MigrateFileProcedure
 org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
+org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 211fe1411..7e2432d08 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -233,7 +233,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
         expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
         expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
         FileStoreTable table = (FileStoreTable) paimonTable("T");
-        table.copy(expireOptions).store().newExpire().expire();
+        table.copy(expireOptions).newCommit("").expireSnapshots();
         assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
 
         assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.tag-name'='tag1') */"))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index e0921ed86..af512da2a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
 import org.apache.paimon.spark.procedure.CompactProcedure;
 import org.apache.paimon.spark.procedure.CreateTagProcedure;
 import org.apache.paimon.spark.procedure.DeleteTagProcedure;
+import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
 import org.apache.paimon.spark.procedure.MigrateFileProcedure;
 import org.apache.paimon.spark.procedure.MigrateTableProcedure;
 import org.apache.paimon.spark.procedure.Procedure;
@@ -56,6 +57,7 @@ public class SparkProcedures {
         procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
         procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
         procedureBuilders.put("remove_orphan_files", 
RemoveOrphanFilesProcedure::builder);
+        procedureBuilders.put("expire_snapshots", 
ExpireSnapshotsProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
new file mode 100644
index 000000000..4786fc6b5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.ExpireSnapshots;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
+
+/** A procedure to expire snapshots. */
+public class ExpireSnapshotsProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                ProcedureParameter.optional("retain_max", IntegerType),
+                ProcedureParameter.optional("retain_min", IntegerType),
+                ProcedureParameter.optional("older_than", TimestampType),
+                ProcedureParameter.optional("max_deletes", IntegerType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField(
+                                "deleted_snapshots_count", IntegerType, false, 
Metadata.empty())
+                    });
+
+    protected ExpireSnapshotsProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        Integer retainMax = args.isNullAt(1) ? null : args.getInt(1);
+        Integer retainMin = args.isNullAt(2) ? null : args.getInt(2);
+        Long olderThanMills = args.isNullAt(3) ? null : args.getLong(3) / 1000;
+        Integer maxDeletes = args.isNullAt(4) ? null : args.getInt(4);
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    ExpireSnapshots expireSnapshots = 
table.newExpireSnapshots();
+                    if (retainMax != null) {
+                        expireSnapshots.retainMax(retainMax);
+                    }
+                    if (retainMin != null) {
+                        expireSnapshots.retainMin(retainMin);
+                    }
+                    if (olderThanMills != null) {
+                        expireSnapshots.olderThanMills(olderThanMills);
+                    }
+                    if (maxDeletes != null) {
+                        expireSnapshots.maxDeletes(maxDeletes);
+                    }
+                    int deleted = expireSnapshots.expire();
+                    return new InternalRow[] {newInternalRow(deleted)};
+                });
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<ExpireSnapshotsProcedure>() {
+            @Override
+            public ExpireSnapshotsProcedure doBuild() {
+                return new ExpireSnapshotsProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "ExpireSnapshotsProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index ce9e72386..2c31664c2 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -268,7 +268,7 @@ public class SparkTimeTravelITCase extends 
SparkReadTestBase {
         Map<String, String> expireOptions = new HashMap<>();
         expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
         expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
-        table.copy(expireOptions).store().newExpire().expire();
+        table.copy(expireOptions).newCommit("").expireSnapshots();
         assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
 
         // time travel to tag2
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
new file mode 100644
index 000000000..fa6de113b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest 
{
+
+  import testImplicits._
+
+  test("Paimon Procedure: expire snapshots") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // define a change-log table and test `forEachBatch` api
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            // snapshot-1
+            inputData.addData((1, "a"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Nil)
+
+            // snapshot-2
+            inputData.addData((2, "b"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // snapshot-3
+            inputData.addData((2, "b2"))
+            stream.processAllAvailable()
+            checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+            // expire
+            checkAnswer(
+              spark.sql("CALL paimon.sys.expire_snapshots(table => 'test.T', 
retain_max => 2)"),
+              Row(1) :: Nil)
+
+            checkAnswer(
+              spark.sql("SELECT snapshot_id FROM paimon.test.`T$snapshots`"),
+              Row(2L) :: Row(3L) :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+}

Reply via email to