This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 937feb2bf [core] Introduce expire_snapshots procedure (#2738)
937feb2bf is described below
commit 937feb2bf9eea33e6faf49d107ae5fedac106927
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 19 15:23:37 2024 +0800
[core] Introduce expire_snapshots procedure (#2738)
---
docs/content/engines/flink.md | 13 +++
docs/content/engines/spark.md | 16 ++-
.../java/org/apache/paimon/AbstractFileStore.java | 14 ---
.../src/main/java/org/apache/paimon/FileStore.java | 3 -
.../paimon/table/AbstractFileStoreTable.java | 32 ++++-
.../ExpireSnapshots.java} | 18 +--
.../ExpireSnapshotsImpl.java} | 130 +++++++++------------
.../org/apache/paimon/table/ReadonlyTable.java | 8 ++
.../main/java/org/apache/paimon/table/Table.java | 2 +
.../apache/paimon/table/sink/TableCommitImpl.java | 25 ++--
.../test/java/org/apache/paimon/TestFileStore.java | 25 ++--
.../operation/CleanedFileStoreExpireTest.java | 24 ++--
.../paimon/operation/OrphanFilesCleanTest.java | 2 +-
.../operation/UncleanedFileStoreExpireTest.java | 7 +-
.../paimon/table/FileStoreTableTestBase.java | 11 +-
.../paimon/table/IndexFileExpireTableTest.java | 7 +-
.../flink/procedure/ExpireSnapshotsProcedure.java | 26 +++--
.../paimon/flink/procedure/ProcedureBase.java | 6 +
.../services/org.apache.paimon.factories.Factory | 1 +
.../apache/paimon/flink/BatchFileStoreITCase.java | 2 +-
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../spark/procedure/ExpireSnapshotsProcedure.java | 108 +++++++++++++++++
.../apache/paimon/spark/SparkTimeTravelITCase.java | 2 +-
.../procedure/ExpireSnapshotsProcedureTest.scala | 85 ++++++++++++++
24 files changed, 398 insertions(+), 171 deletions(-)
diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index 7175269ab..822cbd7d7 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -502,5 +502,18 @@ table options syntax: we use string to represent table
options. The format is 'k
</td>
<td>CALL sys.rollback_to('default.T', 10)</td>
</tr>
+ <tr>
+ <td>expire_snapshots</td>
+ <td>
+ -- expires snapshot<br/>
+ CALL sys.expire_snapshots('identifier', retainMax)<br/><br/>
+ </td>
+ <td>
+ To expire snapshots. Argument:
+ <li>identifier: the target table identifier. Cannot be empty.</li>
+ <li>retainMax: the maximum number of completed snapshots to
retain.</li>
+ </td>
+ <td>CALL sys.expire_snapshots('default.T', 2)</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/engines/spark.md b/docs/content/engines/spark.md
index a583d5a24..1b2d2f1a4 100644
--- a/docs/content/engines/spark.md
+++ b/docs/content/engines/spark.md
@@ -549,12 +549,11 @@ Here list the configurations.
## Spark Procedure
This section introduce all available spark procedures about paimon.
-
+s
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 4%">Procedure Name</th>
- <th class="text-left" style="width: 4%">Usage</th>
<th class="text-left" style="width: 20%">Explaination</th>
<th class="text-left" style="width: 4%">Example</th>
</tr>
@@ -562,10 +561,21 @@ This section introduce all available spark procedures
about paimon.
<tbody style="font-size: 12px; ">
<tr>
<td>compact</td>
- <td><nobr>CALL [paimon.]sys.compact(table => '<identifier>'
[,partitions => '<partitions>'] </nobr><br>[, order_strategy
=>'<sort_type>'] [,order_by => '<columns>'])</td>
<td>identifier: the target table identifier. Cannot be
empty.<br><br><nobr>partitions: partition filter. Left empty for all
partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy:
'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.
<br><br><nobr>order_columns: the columns need to be sort. Left empty if
'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two
partitions date=01 and date=02, you need to write 'date=01;date=02'<br> [...]
<td><nobr>SET spark.sql.shuffle.partitions=10; --set the compact
parallelism</nobr><br><nobr>CALL sys.compact(table => 'T', partitions => 'p=0',
order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
</tr>
+ <tr>
+ <td>expire_snapshots</td>
+ <td>
+ To expire snapshots. Argument:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>retainMax: the maximum number of completed snapshots to
retain.</li>
+ <li>retainMin: the minimum number of completed snapshots to
retain.</li>
+ <li>older_than: timestamp before which snapshots will be
removed.</li>
+ <li>max_deletes: the maximum number of snapshots that can be
deleted at once.</li>
+ </td>
+ <td>CALL sys.expire_snapshots(table => 'default.T', retainMax => 10)</td>
+ </tr>
</tbody>
</table>
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 0bd878c96..28672bf4f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -28,7 +28,6 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
@@ -179,19 +178,6 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
newKeyComparator());
}
- @Override
- public FileStoreExpireImpl newExpire() {
- return new FileStoreExpireImpl(
- options.snapshotNumRetainMin(),
- options.snapshotNumRetainMax(),
- options.snapshotTimeRetain().toMillis(),
- snapshotManager(),
- newSnapshotDeletion(),
- newTagManager(),
- options.snapshotExpireLimit(),
- options.snapshotExpireCleanEmptyDirectories());
- }
-
@Override
public SnapshotDeletion newSnapshotDeletion() {
return new SnapshotDeletion(
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 1b7f9f1c9..5406828d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -23,7 +23,6 @@ import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
@@ -77,8 +76,6 @@ public interface FileStore<T> extends Serializable {
FileStoreCommit newCommit(String commitUser);
- FileStoreExpire newExpire();
-
SnapshotDeletion newSnapshotDeletion();
TagManager newTagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index f9da880d2..5929bc1ab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -269,18 +269,42 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
return store().snapshotManager();
}
+ @Override
+ public ExpireSnapshots newExpireSnapshots() {
+ return new ExpireSnapshotsImpl(
+ snapshotManager(),
+ store().newSnapshotDeletion(),
+ store().newTagManager(),
+ coreOptions().snapshotExpireCleanEmptyDirectories());
+ }
+
@Override
public TableCommitImpl newCommit(String commitUser) {
+ CoreOptions options = coreOptions();
+ Runnable snapshotExpire = null;
+ if (!options.writeOnly()) {
+ ExpireSnapshots expireSnapshots =
+ newExpireSnapshots()
+ .retainMax(options.snapshotNumRetainMax())
+ .retainMin(options.snapshotNumRetainMin())
+ .maxDeletes(options.snapshotExpireLimit());
+ long snapshotTimeRetain = options.snapshotTimeRetain().toMillis();
+ snapshotExpire =
+ () ->
+ expireSnapshots
+ .olderThanMills(System.currentTimeMillis()
- snapshotTimeRetain)
+ .expire();
+ }
return new TableCommitImpl(
store().newCommit(commitUser),
createCommitCallbacks(),
- coreOptions().writeOnly() ? null : store().newExpire(),
- coreOptions().writeOnly() ? null :
store().newPartitionExpire(commitUser),
- coreOptions().writeOnly() ? null :
store().newTagCreationManager(),
+ snapshotExpire,
+ options.writeOnly() ? null :
store().newPartitionExpire(commitUser),
+ options.writeOnly() ? null : store().newTagCreationManager(),
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
- coreOptions().snapshotExpireExecutionMode(),
+ options.snapshotExpireExecutionMode(),
name());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java
similarity index 72%
copy from
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
copy to paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java
index 6827740ce..8aa9a3e2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java
@@ -16,14 +16,18 @@
* limitations under the License.
*/
-package org.apache.paimon.operation;
+package org.apache.paimon.table;
-/** Expire operation which provides snapshots expire. */
-public interface FileStoreExpire {
+/** Expire snapshots. */
+public interface ExpireSnapshots {
- /** With global lock. */
- FileStoreExpire withLock(Lock lock);
+ ExpireSnapshots retainMax(int retainMax);
- /** Expire snapshots. */
- void expire();
+ ExpireSnapshots retainMin(int retainMin);
+
+ ExpireSnapshots olderThanMills(long olderThanMills);
+
+ ExpireSnapshots maxDeletes(int maxDeletes);
+
+ int expire();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
similarity index 69%
rename from
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 522907587..00c40d0fe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -16,112 +16,99 @@
* limitations under the License.
*/
-package org.apache.paimon.operation;
+package org.apache.paimon.table;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.function.Predicate;
-/**
- * Default implementation of {@link FileStoreExpire}. It retains a certain
number or period of
- * latest snapshots.
- *
- * <p>NOTE: This implementation will keep at least one snapshot so that users
will not accidentally
- * clear all snapshots.
- *
- * <p>TODO: add concurrent tests.
- */
-public class FileStoreExpireImpl implements FileStoreExpire {
+/** An implementation for {@link ExpireSnapshots}. */
+public class ExpireSnapshotsImpl implements ExpireSnapshots {
- private static final Logger LOG =
LoggerFactory.getLogger(FileStoreExpireImpl.class);
-
- private final int numRetainedMin;
- // snapshots exceeding any constraint will be expired
- private final int numRetainedMax;
- private final long millisRetained;
+ private static final Logger LOG =
LoggerFactory.getLogger(ExpireSnapshotsImpl.class);
private final SnapshotManager snapshotManager;
private final ConsumerManager consumerManager;
private final SnapshotDeletion snapshotDeletion;
-
private final TagManager tagManager;
- private final int expireLimit;
- private final boolean snapshotExpireCleanEmptyDirectories;
+ private final boolean cleanEmptyDirectories;
- private Lock lock;
+ private int retainMax = Integer.MAX_VALUE;
+ private int retainMin = 1;
+ private long olderThanMills = 0;
+ private int maxDeletes = Integer.MAX_VALUE;
- public FileStoreExpireImpl(
- int numRetainedMin,
- int numRetainedMax,
- long millisRetained,
+ public ExpireSnapshotsImpl(
SnapshotManager snapshotManager,
SnapshotDeletion snapshotDeletion,
TagManager tagManager,
- int expireLimit,
- boolean snapshotExpireCleanEmptyDirectories) {
- Preconditions.checkArgument(
- numRetainedMin >= 1,
- "The minimum number of completed snapshots to retain should be
>= 1.");
- Preconditions.checkArgument(
- numRetainedMax >= numRetainedMin,
- "The maximum number of snapshots to retain should be >= the
minimum number.");
- Preconditions.checkArgument(
- expireLimit > 1,
- String.format("The %s should be > 1.",
CoreOptions.SNAPSHOT_EXPIRE_LIMIT.key()));
- this.numRetainedMin = numRetainedMin;
- this.numRetainedMax = numRetainedMax;
- this.millisRetained = millisRetained;
+ boolean cleanEmptyDirectories) {
this.snapshotManager = snapshotManager;
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
this.snapshotDeletion = snapshotDeletion;
this.tagManager = tagManager;
- this.expireLimit = expireLimit;
- this.snapshotExpireCleanEmptyDirectories =
snapshotExpireCleanEmptyDirectories;
+ this.cleanEmptyDirectories = cleanEmptyDirectories;
}
@Override
- public FileStoreExpire withLock(Lock lock) {
- this.lock = lock;
+ public ExpireSnapshotsImpl retainMax(int retainMax) {
+ this.retainMax = retainMax;
return this;
}
@Override
- public void expire() {
+ public ExpireSnapshotsImpl retainMin(int retainMin) {
+ this.retainMin = retainMin;
+ return this;
+ }
+
+ @Override
+ public ExpireSnapshotsImpl olderThanMills(long olderThanMills) {
+ this.olderThanMills = olderThanMills;
+ return this;
+ }
+
+ @Override
+ public ExpireSnapshotsImpl maxDeletes(int maxDeletes) {
+ this.maxDeletes = maxDeletes;
+ return this;
+ }
+
+ @Override
+ public int expire() {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
// no snapshot, nothing to expire
- return;
+ return 0;
}
- long currentMillis = System.currentTimeMillis();
-
Long earliest = snapshotManager.earliestSnapshotId();
if (earliest == null) {
- return;
+ return 0;
}
// the min snapshot to retain from 'snapshot.num-retained.max'
// (the maximum number of snapshots to retain)
- long min = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);
+ long min = Math.max(latestSnapshotId - retainMax + 1, earliest);
// the max exclusive snapshot to expire until
// protected by 'snapshot.num-retained.min'
// (the minimum number of completed snapshots to retain)
- long maxExclusive = latestSnapshotId - numRetainedMin + 1;
+ long maxExclusive = latestSnapshotId - retainMin + 1;
// the snapshot being read by the consumer cannot be deleted
maxExclusive =
@@ -129,24 +116,22 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
// protected by 'snapshot.expire.limit'
// (the maximum number of snapshots allowed to expire at a time)
- maxExclusive = Math.min(maxExclusive, earliest + expireLimit);
+ maxExclusive = Math.min(maxExclusive, earliest + maxDeletes);
for (long id = min; id < maxExclusive; id++) {
// Early exit the loop for 'snapshot.time-retained'
// (the maximum time of snapshots to retain)
if (snapshotManager.snapshotExists(id)
- && currentMillis -
snapshotManager.snapshot(id).timeMillis()
- <= millisRetained) {
- expireUntil(earliest, id);
- return;
+ && olderThanMills <=
snapshotManager.snapshot(id).timeMillis()) {
+ return expireUntil(earliest, id);
}
}
- expireUntil(earliest, maxExclusive);
+ return expireUntil(earliest, maxExclusive);
}
@VisibleForTesting
- public void expireUntil(long earliestId, long endExclusiveId) {
+ public int expireUntil(long earliestId, long endExclusiveId) {
if (endExclusiveId <= earliestId) {
// No expire happens:
// write the hint file in order to see the earliest snapshot
directly next time
@@ -156,7 +141,7 @@ public class FileStoreExpireImpl implements FileStoreExpire
{
}
// fast exit
- return;
+ return 0;
}
// find first snapshot to expire
@@ -214,7 +199,7 @@ public class FileStoreExpireImpl implements FileStoreExpire
{
// data files and changelog files in bucket directories has been
deleted
// then delete changed bucket directories if they are empty
- if (snapshotExpireCleanEmptyDirectories) {
+ if (cleanEmptyDirectories) {
snapshotDeletion.cleanDataDirectories();
}
@@ -237,30 +222,19 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
}
writeEarliestHint(endExclusiveId);
+ return (int) (endExclusiveId - beginInclusiveId);
}
private void writeEarliestHint(long earliest) {
- // update earliest hint file
-
- Callable<Void> callable =
- () -> {
- snapshotManager.commitEarliestHint(earliest);
- return null;
- };
-
try {
- if (lock != null) {
- lock.runWithLock(callable);
- } else {
- callable.call();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ snapshotManager.commitEarliestHint(earliest);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
@VisibleForTesting
- SnapshotDeletion snapshotDeletion() {
+ public SnapshotDeletion snapshotDeletion() {
return snapshotDeletion;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 3e3bcb2d7..66b9142d2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -142,4 +142,12 @@ public interface ReadonlyTable extends InnerTable {
"Readonly Table %s does not support deleteBranch.",
this.getClass().getSimpleName()));
}
+
+ @Override
+ default ExpireSnapshots newExpireSnapshots() {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support expireSnapshots.",
+ this.getClass().getSimpleName()));
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 4f713e992..b9c28cd69 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -91,6 +91,8 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);
+ ExpireSnapshots newExpireSnapshots();
+
// =============== Read & Write Operations ==================
/** Returns a new read builder. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index bfc52ec5e..fb7ab7974 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -28,7 +28,6 @@ import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
@@ -68,16 +67,13 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.utils.Preconditions.checkState;
-/**
- * An abstraction layer above {@link FileStoreCommit} and {@link
FileStoreExpire} to provide
- * snapshot commit and expiration.
- */
+/** An abstraction layer above {@link FileStoreCommit} to provide snapshot
commit and expiration. */
public class TableCommitImpl implements InnerTableCommit {
private static final Logger LOG =
LoggerFactory.getLogger(TableCommitImpl.class);
private final FileStoreCommit commit;
private final List<CommitCallback> commitCallbacks;
- @Nullable private final FileStoreExpire expire;
+ @Nullable private final Runnable expireSnapshots;
@Nullable private final PartitionExpire partitionExpire;
@Nullable private final TagAutoCreation tagAutoCreation;
private final Lock lock;
@@ -96,7 +92,7 @@ public class TableCommitImpl implements InnerTableCommit {
public TableCommitImpl(
FileStoreCommit commit,
List<CommitCallback> commitCallbacks,
- @Nullable FileStoreExpire expire,
+ @Nullable Runnable expireSnapshots,
@Nullable PartitionExpire partitionExpire,
@Nullable TagAutoCreation tagAutoCreation,
Lock lock,
@@ -105,16 +101,13 @@ public class TableCommitImpl implements InnerTableCommit {
ExpireExecutionMode expireExecutionMode,
String tableName) {
commit.withLock(lock);
- if (expire != null) {
- expire.withLock(lock);
- }
if (partitionExpire != null) {
partitionExpire.withLock(lock);
}
this.commit = commit;
this.commitCallbacks = commitCallbacks;
- this.expire = expire;
+ this.expireSnapshots = expireSnapshots;
this.partitionExpire = partitionExpire;
this.tagAutoCreation = tagAutoCreation;
this.lock = lock;
@@ -340,9 +333,7 @@ public class TableCommitImpl implements InnerTableCommit {
consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
}
- if (expire != null) {
- expire.expire();
- }
+ expireSnapshots();
if (partitionExpire != null) {
partitionExpire.expire(partitionExpireIdentifier);
@@ -353,6 +344,12 @@ public class TableCommitImpl implements InnerTableCommit {
}
}
+ public void expireSnapshots() {
+ if (expireSnapshots != null) {
+ expireSnapshots.run();
+ }
+ }
+
@Override
public void close() throws Exception {
for (CommitCallback commitCallback : commitCallbacks) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 0a9b2bf5b..d9cc7f878 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -36,7 +36,6 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
@@ -46,6 +45,8 @@ import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.table.ExpireSnapshotsImpl;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
@@ -134,25 +135,23 @@ public class TestFileStore extends KeyValueFileStore {
return super.newCommit(commitUser);
}
- public FileStoreExpireImpl newExpire(
- int numRetainedMin, int numRetainedMax, long millisRetained) {
+ public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax,
long millisRetained) {
return newExpire(numRetainedMin, numRetainedMax, millisRetained, true);
}
- public FileStoreExpireImpl newExpire(
+ public ExpireSnapshots newExpire(
int numRetainedMin,
int numRetainedMax,
long millisRetained,
boolean snapshotExpireCleanEmptyDirectories) {
- return new FileStoreExpireImpl(
- numRetainedMin,
- numRetainedMax,
- millisRetained,
- snapshotManager(),
- newSnapshotDeletion(),
- new TagManager(fileIO, options.path()),
- Integer.MAX_VALUE,
- snapshotExpireCleanEmptyDirectories);
+ return new ExpireSnapshotsImpl(
+ snapshotManager(),
+ newSnapshotDeletion(),
+ new TagManager(fileIO, options.path()),
+ snapshotExpireCleanEmptyDirectories)
+ .retainMax(numRetainedMax)
+ .retainMin(numRetainedMin)
+ .olderThanMills(System.currentTimeMillis() - millisRetained);
}
public List<Snapshot> commitData(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 1d5eb7329..463e8ba08 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.table.ExpireSnapshotsImpl;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
@@ -44,7 +46,7 @@ import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Tests for {@link FileStoreExpireImpl}. After expiration, only useful files
should be retained.
+ * Tests for {@link ExpireSnapshotsImpl}. After expiration, only useful files
should be retained.
*/
public class CleanedFileStoreExpireTest extends FileStoreExpireTestBase {
@@ -55,7 +57,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
@Test
public void testExpireExtraFiles() throws IOException {
- FileStoreExpireImpl expire = store.newExpire(1, 3, Long.MAX_VALUE);
+ ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(1,
3, Long.MAX_VALUE);
// write test files
BinaryRow partition = gen.getPartition(gen.next());
@@ -98,7 +100,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
@Test
public void testNoSnapshot() {
- FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);
+ ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE);
expire.expire();
assertThat(snapshotManager.latestSnapshotId()).isNull();
@@ -110,7 +112,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
List<Integer> snapshotPositions = new ArrayList<>();
commit(2, allData, snapshotPositions);
int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
- FileStoreExpire expire = store.newExpire(1, latestSnapshotId + 1,
Long.MAX_VALUE);
+ ExpireSnapshots expire = store.newExpire(1, latestSnapshotId + 1,
Long.MAX_VALUE);
expire.expire();
for (int i = 1; i <= latestSnapshotId; i++) {
@@ -125,7 +127,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
List<Integer> snapshotPositions = new ArrayList<>();
commit(5, allData, snapshotPositions);
int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
- FileStoreExpire expire = store.newExpire(1, Integer.MAX_VALUE,
Long.MAX_VALUE);
+ ExpireSnapshots expire = store.newExpire(1, Integer.MAX_VALUE,
Long.MAX_VALUE);
expire.expire();
for (int i = 1; i <= latestSnapshotId; i++) {
@@ -144,7 +146,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
commit(numRetainedMin + random.nextInt(5), allData, snapshotPositions);
int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
Thread.sleep(100);
- FileStoreExpire expire = store.newExpire(numRetainedMin,
Integer.MAX_VALUE, 1);
+ ExpireSnapshots expire = store.newExpire(numRetainedMin,
Integer.MAX_VALUE, 1);
expire.expire();
for (int i = 1; i <= latestSnapshotId - numRetainedMin; i++) {
@@ -158,7 +160,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
@Test
public void testExpireWithNumber() throws Exception {
- FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);
+ ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE);
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
@@ -194,7 +196,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
@Test
public void testExpireWithTime() throws Exception {
- FileStoreExpire expire = store.newExpire(1, Integer.MAX_VALUE, 1000);
+ ExpireSnapshots expire = store.newExpire(1, Integer.MAX_VALUE, 1000);
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
@@ -203,8 +205,8 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
commit(5, allData, snapshotPositions);
long expireMillis = System.currentTimeMillis();
// expire twice to check for idempotence
- expire.expire();
- expire.expire();
+ expire.olderThanMills(expireMillis - 1000).expire();
+ expire.olderThanMills(expireMillis - 1000).expire();
int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
for (int i = 1; i <= latestSnapshotId; i++) {
@@ -252,7 +254,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
// the data file still exists after expire
- FileStoreExpire expire = store.newExpire(1, 1, Long.MAX_VALUE);
+ ExpireSnapshots expire = store.newExpire(1, 1, Long.MAX_VALUE);
expire.expire();
FileStoreTestUtils.assertPathExists(fileIO, dataFilePath2);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 759deb909..cf7535f76 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -195,7 +195,7 @@ public class OrphanFilesCleanTest {
expireOptions.set(CoreOptions.SNAPSHOT_EXPIRE_LIMIT, snapshotCount);
expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, snapshotCount
- expired);
expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, snapshotCount
- expired);
- table.copy(expireOptions.toMap()).store().newExpire().expire();
+ table.copy(expireOptions.toMap()).newCommit("").expireSnapshots();
// randomly delete tags
List<String> deleteTags = Collections.emptyList();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index a0794168f..ce93166a5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.Test;
@@ -38,14 +39,14 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Tests for {@link FileStoreExpireImpl}. Some files not in use may still
remain after the test due
- * to the testing methods.
+ * Tests for {@link ExpireSnapshots}. Some files not in use may still remain
after the test due to
+ * the testing methods.
*/
public class UncleanedFileStoreExpireTest extends FileStoreExpireTestBase {
@Test
public void testExpireWithMissingFiles() throws Exception {
- FileStoreExpire expire = store.newExpire(1, 1, 1);
+ ExpireSnapshots expire = store.newExpire(1, 1, 1);
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f8f448814..c07fc2029 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -37,7 +37,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
-import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.FileStoreTestUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -841,7 +840,7 @@ public abstract class FileStoreTableTestBase {
Options options = new Options();
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
- table.copy(options.toMap()).store().newExpire().expire();
+ table.copy(options.toMap()).newCommit("").expireSnapshots();
}
table.rollbackTo("test1");
@@ -1178,7 +1177,7 @@ public abstract class FileStoreTableTestBase {
TableCommitImpl commit =
table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"))
.newCommit(commitUser);
- FileStoreExpire expire = table.store().newExpire();
+ TableCommitImpl expire = table.newCommit(commitUser);
try (StreamTableWrite write = table.newWrite(commitUser)) {
for (int i = 0; i < 10; i++) {
@@ -1194,7 +1193,7 @@ public abstract class FileStoreTableTestBase {
int index = 0;
// trigger the first expire and the first two snapshots expired
- expire.expire();
+ expire.expireSnapshots();
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
for (int i = index; i < remainingSnapshot.size(); i++) {
@@ -1204,7 +1203,7 @@ public abstract class FileStoreTableTestBase {
.isEqualTo(remainingSnapshot.get(index).id());
// trigger the second expire and the second two snapshots expired
- expire.expire();
+ expire.expireSnapshots();
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
for (int i = index; i < remainingSnapshot.size(); i++) {
@@ -1215,7 +1214,7 @@ public abstract class FileStoreTableTestBase {
// trigger all remaining expires and only the last snapshot remaining
for (int i = 0; i < 5; i++) {
- expire.expire();
+ expire.expireSnapshots();
}
for (int i = 0; i < remainingSnapshot.size() - 1; i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index e4e890850..98816dd04 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -26,7 +26,6 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
@@ -60,7 +59,7 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
@Test
public void testIndexFileExpiration() throws Exception {
prepareExpireTable();
- FileStoreExpireImpl expire = (FileStoreExpireImpl)
table.store().newExpire();
+ ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl)
table.newExpireSnapshots();
long indexFileSize = indexFileSize();
long indexManifestSize = indexManifestSize();
@@ -89,7 +88,7 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
@Test
public void testIndexFileExpirationWithTag() throws Exception {
prepareExpireTable();
- FileStoreExpireImpl expire = (FileStoreExpireImpl)
table.store().newExpire();
+ ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl)
table.newExpireSnapshots();
table.createTag("tag3", 3);
table.createTag("tag5", 5);
@@ -115,7 +114,7 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
@Test
public void testIndexFileExpirationWhenDeletingTag() throws Exception {
prepareExpireTable();
- FileStoreExpireImpl expire = (FileStoreExpireImpl)
table.store().newExpire();
+ ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl)
table.newExpireSnapshots();
table.createTag("tag3", 3);
table.createTag("tag5", 5);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
similarity index 53%
rename from
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
index 6827740ce..db1d52170 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpire.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,14 +16,24 @@
* limitations under the License.
*/
-package org.apache.paimon.operation;
+package org.apache.paimon.flink.procedure;
-/** Expire operation which provides snapshots expire. */
-public interface FileStoreExpire {
+import org.apache.paimon.catalog.Catalog;
- /** With global lock. */
- FileStoreExpire withLock(Lock lock);
+import org.apache.flink.table.procedure.ProcedureContext;
- /** Expire snapshots. */
- void expire();
+/** A procedure to expire snapshots. */
+public class ExpireSnapshotsProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "expire_snapshots";
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
int retainMax)
+ throws Catalog.TableNotExistException {
+ return new String[] {
+ table(tableId).newExpireSnapshots().retainMax(retainMax).expire()
+ ""
+ };
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index fed5c43ac..fd7f74148 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -19,9 +19,11 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.configuration.PipelineOptions;
@@ -46,6 +48,10 @@ public abstract class ProcedureBase implements Procedure,
Factory {
return this;
}
+ protected Table table(String tableId) throws
Catalog.TableNotExistException {
+ return catalog.getTable(Identifier.fromString(tableId));
+ }
+
@Nullable
protected String nullable(String arg) {
return StringUtils.isBlank(arg) ? null : arg;
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 634bd2768..e4b74f5cd 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -42,3 +42,4 @@ org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
+org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 211fe1411..7e2432d08 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -233,7 +233,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
FileStoreTable table = (FileStoreTable) paimonTable("T");
- table.copy(expireOptions).store().newExpire().expire();
+ table.copy(expireOptions).newCommit("").expireSnapshots();
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.tag-name'='tag1') */"))
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index e0921ed86..af512da2a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CreateTagProcedure;
import org.apache.paimon.spark.procedure.DeleteTagProcedure;
+import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
import org.apache.paimon.spark.procedure.MigrateFileProcedure;
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
@@ -56,6 +57,7 @@ public class SparkProcedures {
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
procedureBuilders.put("remove_orphan_files",
RemoveOrphanFilesProcedure::builder);
+ procedureBuilders.put("expire_snapshots",
ExpireSnapshotsProcedure::builder);
return procedureBuilders.build();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
new file mode 100644
index 000000000..4786fc6b5
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.ExpireSnapshots;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
+
+/** A procedure to expire snapshots. */
+public class ExpireSnapshotsProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.optional("retain_max", IntegerType),
+ ProcedureParameter.optional("retain_min", IntegerType),
+ ProcedureParameter.optional("older_than", TimestampType),
+ ProcedureParameter.optional("max_deletes", IntegerType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField(
+ "deleted_snapshots_count", IntegerType, false,
Metadata.empty())
+ });
+
+ protected ExpireSnapshotsProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ Integer retainMax = args.isNullAt(1) ? null : args.getInt(1);
+ Integer retainMin = args.isNullAt(2) ? null : args.getInt(2);
+ Long olderThanMills = args.isNullAt(3) ? null : args.getLong(3) / 1000;
+ Integer maxDeletes = args.isNullAt(4) ? null : args.getInt(4);
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ ExpireSnapshots expireSnapshots =
table.newExpireSnapshots();
+ if (retainMax != null) {
+ expireSnapshots.retainMax(retainMax);
+ }
+ if (retainMin != null) {
+ expireSnapshots.retainMin(retainMin);
+ }
+ if (olderThanMills != null) {
+ expireSnapshots.olderThanMills(olderThanMills);
+ }
+ if (maxDeletes != null) {
+ expireSnapshots.maxDeletes(maxDeletes);
+ }
+ int deleted = expireSnapshots.expire();
+ return new InternalRow[] {newInternalRow(deleted)};
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<ExpireSnapshotsProcedure>() {
+ @Override
+ public ExpireSnapshotsProcedure doBuild() {
+ return new ExpireSnapshotsProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "ExpireSnapshotsProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index ce9e72386..2c31664c2 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -268,7 +268,7 @@ public class SparkTimeTravelITCase extends
SparkReadTestBase {
Map<String, String> expireOptions = new HashMap<>();
expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
- table.copy(expireOptions).store().newExpire().expire();
+ table.copy(expireOptions).newCommit("").expireSnapshots();
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
// time travel to tag2
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
new file mode 100644
index 000000000..fa6de113b
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest
{
+
+ import testImplicits._
+
+ test("Paimon Procedure: expire snapshots") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // define a change-log table and test `forEachBatch` api
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ // snapshot-1
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ // snapshot-2
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // snapshot-3
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+ // expire
+ checkAnswer(
+ spark.sql("CALL paimon.sys.expire_snapshots(table => 'test.T',
retain_max => 2)"),
+ Row(1) :: Nil)
+
+ checkAnswer(
+ spark.sql("SELECT snapshot_id FROM paimon.test.`T$snapshots`"),
+ Row(2L) :: Row(3L) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+}