This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d9326c667 [core] supports asynchronous deletion of expired snapshot
files (#1823)
d9326c667 is described below
commit d9326c6670f2af9af0ccc80106fb22d3ad961d78
Author: liming.1018 <[email protected]>
AuthorDate: Fri Sep 15 11:58:44 2023 +0800
[core] supports asynchronous deletion of expired snapshot files (#1823)
---
.../shortcodes/generated/core_configuration.html | 13 ++
.../main/java/org/apache/paimon/CoreOptions.java | 49 +++++++
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../apache/paimon/operation/FileDeletionBase.java | 56 +++++--
.../paimon/operation/FileStoreExpireImpl.java | 12 +-
.../apache/paimon/operation/SnapshotDeletion.java | 10 +-
.../org/apache/paimon/operation/TagDeletion.java | 7 +-
.../paimon/table/AbstractFileStoreTable.java | 3 +-
.../apache/paimon/table/sink/TableCommitImpl.java | 52 ++++++-
.../test/java/org/apache/paimon/TestFileStore.java | 24 ++-
.../paimon/table/FileStoreTableTestBase.java | 161 +++++++++++++++++++++
11 files changed, 360 insertions(+), 30 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index cf02d0bb9..6ade8c0b5 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -491,6 +491,19 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Duration</td>
<td>The maximum time of completed snapshots to retain.</td>
</tr>
+ <tr>
+ <td><h5>snapshot.expire.execution-mode</h5></td>
+ <td style="word-wrap: break-word;">sync</td>
+ <td>Enum</td>
+ <td>Specifies the execution mode of expire.<br /><br />Possible
values:<ul><li>"sync": Execute expire synchronously. If there are too many
files, it may take a long time and block stream processing.</li><li>"async":
Execute expire asynchronously. If the generation of snapshots is greater than
the deletion, there will be a backlog of files.</li></ul></td>
+ </tr>
+ </tr>
+ <tr>
+ <td><h5>snapshot.expire.limit</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>The maximum number of snapshots allowed to expire at a
time.</td>
+ </tr>
<tr>
<td><h5>sort-engine</h5></td>
<td style="word-wrap: break-word;">loser-tree</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 13cf60ec3..7e1f8498b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -194,6 +194,19 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The maximum time of completed snapshots
to retain.");
+ public static final ConfigOption<ExpireExecutionMode>
SNAPSHOT_EXPIRE_EXECUTION_MODE =
+ key("snapshot.expire.execution-mode")
+ .enumType(ExpireExecutionMode.class)
+ .defaultValue(ExpireExecutionMode.SYNC)
+ .withDescription("Specifies the execution mode of
expire.");
+
+ public static final ConfigOption<Integer> SNAPSHOT_EXPIRE_LIMIT =
+ key("snapshot.expire.limit")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The maximum number of snapshots allowed to expire
at a time.");
+
public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
key("continuous.discovery-interval")
.durationType()
@@ -983,6 +996,14 @@ public class CoreOptions implements Serializable {
return options.get(SNAPSHOT_TIME_RETAINED);
}
+ public ExpireExecutionMode snapshotExpireExecutionMode() {
+ return options.get(SNAPSHOT_EXPIRE_EXECUTION_MODE);
+ }
+
+ public int snapshotExpireLimit() {
+ return options.get(SNAPSHOT_EXPIRE_LIMIT);
+ }
+
public int manifestMergeMinCount() {
return options.get(MANIFEST_MERGE_MIN_COUNT);
}
@@ -1849,4 +1870,32 @@ public class CoreOptions implements Serializable {
return text(description);
}
}
+
+ /** The execution mode for expire. */
+ public enum ExpireExecutionMode implements DescribedEnum {
+ SYNC(
+ "sync",
+ "Execute expire synchronously. If there are too many files, it
may take a long time and block stream processing."),
+ ASYNC(
+ "async",
+ "Execute expire asynchronously. If the generation of snapshots
is greater than the deletion, there will be a backlog of files.");
+
+ private final String value;
+ private final String description;
+
+ ExpireExecutionMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 48cb6e231..d78013ffd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -176,7 +176,8 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.snapshotTimeRetain().toMillis(),
snapshotManager(),
newSnapshotDeletion(),
- newTagManager());
+ newTagManager(),
+ options.snapshotExpireLimit());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index a9ad4cac2..8de8dfa85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -47,6 +49,9 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -64,6 +69,7 @@ public abstract class FileDeletionBase {
protected final ManifestList manifestList;
protected final IndexFileHandler indexFileHandler;
protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
+ protected final Executor ioExecutor;
public FileDeletionBase(
FileIO fileIO,
@@ -78,6 +84,7 @@ public abstract class FileDeletionBase {
this.indexFileHandler = indexFileHandler;
this.deletionBuckets = new HashMap<>();
+ this.ioExecutor = FileUtils.COMMON_IO_FORK_JOIN_POOL;
}
/**
@@ -107,10 +114,12 @@ public abstract class FileDeletionBase {
// All directory paths are deduplicated and sorted by hierarchy level
Map<Integer, Set<Path>> deduplicate = new HashMap<>();
for (Map.Entry<BinaryRow, Set<Integer>> entry :
deletionBuckets.entrySet()) {
+ List<Path> toDeleteEmptyDirectory = new ArrayList<>();
// try to delete bucket directories
for (Integer bucket : entry.getValue()) {
- tryDeleteEmptyDirectory(pathFactory.bucketPath(entry.getKey(),
bucket));
+
toDeleteEmptyDirectory.add(pathFactory.bucketPath(entry.getKey(), bucket));
}
+ deleteFiles(toDeleteEmptyDirectory, this::tryDeleteEmptyDirectory);
List<Path> hierarchicalPaths =
pathFactory.getHierarchicalPartitionPath(entry.getKey());
int hierarchies = hierarchicalPaths.size();
@@ -144,31 +153,34 @@ public abstract class FileDeletionBase {
protected void cleanUnusedManifests(
Snapshot snapshot, Set<String> skippingSet, boolean
deleteChangelog) {
// clean base and delta manifests
+ List<String> toDeleteManifests = new ArrayList<>();
List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList()));
toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
for (ManifestFileMeta manifest : toExpireManifests) {
String fileName = manifest.fileName();
if (!skippingSet.contains(fileName)) {
- manifestFile.delete(fileName);
+ toDeleteManifests.add(fileName);
// to avoid other snapshots trying to delete again
skippingSet.add(fileName);
}
}
+ deleteFiles(toDeleteManifests, manifestFile::delete);
+ toDeleteManifests.clear();
if (!skippingSet.contains(snapshot.baseManifestList())) {
- manifestList.delete(snapshot.baseManifestList());
+ toDeleteManifests.add(snapshot.baseManifestList());
}
if (!skippingSet.contains(snapshot.deltaManifestList())) {
- manifestList.delete(snapshot.deltaManifestList());
+ toDeleteManifests.add(snapshot.deltaManifestList());
}
+ deleteFiles(toDeleteManifests, manifestList::delete);
// clean changelog manifests
if (deleteChangelog && snapshot.changelogManifestList() != null) {
- for (ManifestFileMeta manifest :
- tryReadManifestList(snapshot.changelogManifestList())) {
- manifestFile.delete(manifest.fileName());
- }
+ deleteFiles(
+ tryReadManifestList(snapshot.changelogManifestList()),
+ manifest -> manifestFile.delete(manifest.fileName()));
manifestList.delete(snapshot.changelogManifestList());
}
@@ -176,11 +188,11 @@ public abstract class FileDeletionBase {
String indexManifest = snapshot.indexManifest();
// check exists, it may have been deleted by other snapshots
if (indexManifest != null &&
indexFileHandler.existsManifest(indexManifest)) {
- for (IndexManifestEntry entry :
indexFileHandler.readManifest(indexManifest)) {
- if (!skippingSet.contains(entry.indexFile().fileName())) {
- indexFileHandler.deleteIndexFile(entry);
- }
- }
+ List<IndexManifestEntry> indexManifestEntries =
+ indexFileHandler.readManifest(indexManifest);
+ indexManifestEntries.removeIf(
+ entry ->
skippingSet.contains(entry.indexFile().fileName()));
+ deleteFiles(indexManifestEntries,
indexFileHandler::deleteIndexFile);
if (!skippingSet.contains(indexManifest)) {
indexFileHandler.deleteManifest(indexManifest);
@@ -305,4 +317,22 @@ public abstract class FileDeletionBase {
return false;
}
}
+
+ protected <T> void deleteFiles(Collection<T> files, Consumer<T> deletion) {
+ if (files.isEmpty()) {
+ return;
+ }
+
+ List<CompletableFuture<Void>> deletionFutures = new
ArrayList<>(files.size());
+ for (T file : files) {
+ deletionFutures.add(
+ CompletableFuture.runAsync(() -> deletion.accept(file),
ioExecutor));
+ }
+
+ try {
+ CompletableFuture.allOf(deletionFutures.toArray(new
CompletableFuture[0])).get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index cd52589d0..8b177bdf9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
@@ -56,6 +57,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
private final SnapshotDeletion snapshotDeletion;
private final TagManager tagManager;
+ private final int expireLimit;
private Lock lock;
@@ -65,13 +67,17 @@ public class FileStoreExpireImpl implements FileStoreExpire
{
long millisRetained,
SnapshotManager snapshotManager,
SnapshotDeletion snapshotDeletion,
- TagManager tagManager) {
+ TagManager tagManager,
+ int expireLimit) {
Preconditions.checkArgument(
numRetainedMin >= 1,
"The minimum number of completed snapshots to retain should be
>= 1.");
Preconditions.checkArgument(
numRetainedMax >= numRetainedMin,
"The maximum number of snapshots to retain should be >= the
minimum number.");
+ Preconditions.checkArgument(
+ expireLimit > 1,
+ String.format("The %s should be > 1.",
CoreOptions.SNAPSHOT_EXPIRE_LIMIT.key()));
this.numRetainedMin = numRetainedMin;
this.numRetainedMax = numRetainedMax;
this.millisRetained = millisRetained;
@@ -80,6 +86,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
this.snapshotDeletion = snapshotDeletion;
this.tagManager = tagManager;
+ this.expireLimit = expireLimit;
}
@Override
@@ -152,6 +159,9 @@ public class FileStoreExpireImpl implements FileStoreExpire
{
break;
}
}
+
+ endExclusiveId = Math.min(beginInclusiveId + expireLimit,
endExclusiveId);
+
if (LOG.isDebugEnabled()) {
LOG.debug(
"Snapshot expire range is [" + beginInclusiveId + ", " +
endExclusiveId + ")");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index 123fdabc5..8b9740c33 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -94,18 +94,20 @@ public class SnapshotDeletion extends FileDeletionBase {
}
}
+ List<Path> actualDataFileToDelete = new ArrayList<>();
dataFileToDelete.forEach(
(path, pair) -> {
ManifestEntry entry = pair.getLeft();
// check whether we should skip the data file
if (!skipper.test(entry)) {
// delete data files
- fileIO.deleteQuietly(path);
- pair.getRight().forEach(fileIO::deleteQuietly);
+ actualDataFileToDelete.add(path);
+ actualDataFileToDelete.addAll(pair.getRight());
recordDeletionBuckets(entry);
}
});
+ deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly);
}
/**
@@ -118,15 +120,17 @@ public class SnapshotDeletion extends FileDeletionBase {
}
public void deleteAddedDataFiles(Iterable<ManifestEntry> manifestEntries) {
+ List<Path> dataFileToDelete = new ArrayList<>();
for (ManifestEntry entry : manifestEntries) {
if (entry.kind() == FileKind.ADD) {
- fileIO.deleteQuietly(
+ dataFileToDelete.add(
new Path(
pathFactory.bucketPath(entry.partition(),
entry.bucket()),
entry.file().fileName()));
recordDeletionBuckets(entry);
}
}
+ deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
}
public Predicate<ManifestEntry> dataFileSkipper(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 8615cbc86..77e3d7c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -60,17 +61,19 @@ public class TagDeletion extends FileDeletionBase {
public void cleanUnusedDataFiles(
Iterable<ManifestEntry> entries, Predicate<ManifestEntry> skipper)
{
+ List<Path> dataFileToDelete = new ArrayList<>();
for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
if (!skipper.test(entry)) {
Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
- fileIO.deleteQuietly(new Path(bucketPath,
entry.file().fileName()));
+ dataFileToDelete.add(new Path(bucketPath,
entry.file().fileName()));
for (String file : entry.file().extraFiles()) {
- fileIO.deleteQuietly(new Path(bucketPath, file));
+ dataFileToDelete.add(new Path(bucketPath, file));
}
recordDeletionBuckets(entry);
}
}
+ deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
}
public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index ea939bf49..eb15566b2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -264,7 +264,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
coreOptions().writeOnly() ? null :
store().newTagCreationManager(),
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
- new ConsumerManager(fileIO, path));
+ new ConsumerManager(fileIO, path),
+ coreOptions().snapshotExpireExecutionMode());
}
private List<CommitCallback> createCommitCallbacks() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e5ba0eba1..7293dce3a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.FileStoreCommit;
@@ -25,8 +26,14 @@ import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.tag.TagAutoCreation;
+import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.IOUtils;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.time.Duration;
@@ -37,8 +44,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.utils.Preconditions.checkState;
/**
@@ -46,6 +57,7 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
* snapshot commit and expiration.
*/
public class TableCommitImpl implements InnerTableCommit {
+ private static final Logger LOG =
LoggerFactory.getLogger(TableCommitImpl.class);
private final FileStoreCommit commit;
private final List<CommitCallback> commitCallbacks;
@@ -61,6 +73,9 @@ public class TableCommitImpl implements InnerTableCommit {
private boolean batchCommitted = false;
+ private ExecutorService expireMainExecutor;
+ private AtomicReference<Throwable> expireError;
+
public TableCommitImpl(
FileStoreCommit commit,
List<CommitCallback> commitCallbacks,
@@ -69,7 +84,8 @@ public class TableCommitImpl implements InnerTableCommit {
@Nullable TagAutoCreation tagAutoCreation,
Lock lock,
@Nullable Duration consumerExpireTime,
- ConsumerManager consumerManager) {
+ ConsumerManager consumerManager,
+ ExpireExecutionMode expireExecutionMode) {
commit.withLock(lock);
if (expire != null) {
expire.withLock(lock);
@@ -87,6 +103,14 @@ public class TableCommitImpl implements InnerTableCommit {
this.consumerExpireTime = consumerExpireTime;
this.consumerManager = consumerManager;
+
+ this.expireMainExecutor =
+ expireExecutionMode == ExpireExecutionMode.SYNC
+ ? MoreExecutors.newDirectExecutorService()
+ : Executors.newSingleThreadExecutor(
+ new ExecutorThreadFactory(
+ Thread.currentThread().getName() +
"expire-main-thread"));
+ this.expireError = new AtomicReference<>(null);
}
@Override
@@ -145,7 +169,7 @@ public class TableCommitImpl implements InnerTableCommit {
commit.commit(committable, new HashMap<>());
}
if (!committables.isEmpty()) {
- expire(committables.get(committables.size() - 1).identifier());
+ expire(committables.get(committables.size() - 1).identifier(),
expireMainExecutor);
}
} else {
ManifestCommittable committable;
@@ -162,7 +186,7 @@ public class TableCommitImpl implements InnerTableCommit {
committable = new ManifestCommittable(Long.MAX_VALUE);
}
commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
- expire(committable.identifier());
+ expire(committable.identifier(), expireMainExecutor);
}
commitCallbacks.forEach(c -> c.call(committables));
@@ -195,6 +219,22 @@ public class TableCommitImpl implements InnerTableCommit {
return retryCommittables.size();
}
+ private void expire(long partitionExpireIdentifier, ExecutorService
executor) {
+ if (expireError.get() != null) {
+ throw new RuntimeException(expireError.get());
+ }
+
+ executor.execute(
+ () -> {
+ try {
+ expire(partitionExpireIdentifier);
+ } catch (Throwable t) {
+ LOG.error("Executing expire encountered an error.", t);
+ expireError.compareAndSet(null, t);
+ }
+ });
+ }
+
private void expire(long partitionExpireIdentifier) {
// expire consumer first to avoid preventing snapshot expiration
if (consumerExpireTime != null) {
@@ -220,10 +260,16 @@ public class TableCommitImpl implements InnerTableCommit {
IOUtils.closeQuietly(commitCallback);
}
IOUtils.closeQuietly(lock);
+ expireMainExecutor.shutdownNow();
}
@Override
public void abort(List<CommitMessage> commitMessages) {
commit.abort(commitMessages);
}
+
+ @VisibleForTesting
+ public ExecutorService getExpireMainExecutor() {
+ return expireMainExecutor;
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index f545f7a28..6b3b3f8a0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -138,7 +138,8 @@ public class TestFileStore extends KeyValueFileStore {
millisRetained,
snapshotManager(),
newSnapshotDeletion(),
- new TagManager(fileIO, options.path()));
+ new TagManager(fileIO, options.path()),
+ Integer.MAX_VALUE);
}
public List<Snapshot> commitData(
@@ -492,12 +493,23 @@ public class TestFileStore extends KeyValueFileStore {
}
public Set<Path> getFilesInUse(long snapshotId) {
- Set<Path> result = new HashSet<>();
+ return getFilesInUse(
+ snapshotId,
+ snapshotManager(),
+ newScan(),
+ fileIO,
+ pathFactory(),
+ manifestListFactory().create());
+ }
- SnapshotManager snapshotManager = snapshotManager();
- FileStorePathFactory pathFactory = pathFactory();
- ManifestList manifestList = manifestListFactory().create();
- FileStoreScan scan = newScan();
+ public static Set<Path> getFilesInUse(
+ long snapshotId,
+ SnapshotManager snapshotManager,
+ FileStoreScan scan,
+ FileIO fileIO,
+ FileStorePathFactory pathFactory,
+ ManifestList manifestList) {
+ Set<Path> result = new HashSet<>();
Path snapshotPath = snapshotManager.snapshotPath(snapshotId);
Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 8d7f94140..b87311c4c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -18,8 +18,10 @@
package org.apache.paimon.table;
+import org.apache.paimon.AbstractFileStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
@@ -35,6 +37,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.operation.FileStoreExpire;
+import org.apache.paimon.operation.FileStoreTestUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -48,6 +52,7 @@ import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.table.source.ReadBuilder;
@@ -66,6 +71,7 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -75,13 +81,17 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -91,7 +101,10 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_EXECUTION_MODE;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
@@ -907,6 +920,154 @@ public abstract class FileStoreTableTestBase {
IllegalArgumentException.class, "Tag 'tag1'
doesn't exist."));
}
+ @Test
+ @Timeout(120)
+ public void testAsyncExpireExecutionMode() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ Map<String, String> options = new HashMap<>();
+ options.put(SNAPSHOT_EXPIRE_EXECUTION_MODE.key(),
ExpireExecutionMode.ASYNC.toString());
+ options.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+ options.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+ options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2");
+
+ TableCommitImpl commit = table.copy(options).newCommit(commitUser);
+ ExecutorService executor = commit.getExpireMainExecutor();
+ CountDownLatch before = new CountDownLatch(1);
+ CountDownLatch after = new CountDownLatch(1);
+
+ executor.execute(
+ () -> {
+ try {
+ before.await();
+ } catch (Exception ignore) {
+ // ignore
+ }
+ });
+
+ try (StreamTableWrite write = table.newWrite(commitUser)) {
+ for (int i = 0; i < 10; i++) {
+ write.write(rowData(i, 10 * i, 100L * i));
+ commit.commit(i, write.prepareCommit(false, i));
+ }
+ }
+
+ executor.execute(after::countDown);
+
+ SnapshotManager snapshotManager = table.snapshotManager();
+ long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+ AbstractFileStore<?> store = (AbstractFileStore<?>) table.store();
+ Set<Path> filesInUse =
+ TestFileStore.getFilesInUse(
+ latestSnapshotId,
+ snapshotManager,
+ store.newScan(),
+ table.fileIO(),
+ store.pathFactory(),
+ store.manifestListFactory().create());
+
+ List<Path> unusedFileList =
+ Files.walk(Paths.get(tempDir.toString()))
+ .filter(Files::isRegularFile)
+ .filter(p ->
!p.getFileName().toString().startsWith("snapshot"))
+ .filter(p ->
!p.getFileName().toString().startsWith("schema"))
+ .filter(p ->
!p.getFileName().toString().equals(SnapshotManager.LATEST))
+ .filter(p ->
!p.getFileName().toString().equals(SnapshotManager.EARLIEST))
+ .map(p -> new Path(TraceableFileIO.SCHEME + "://" +
p.toString()))
+ .filter(p -> !filesInUse.contains(p))
+ .collect(Collectors.toList());
+
+ // no expire happens, all files are preserved
+ for (int i = 1; i <= latestSnapshotId; i++) {
+ assertThat(snapshotManager.snapshotExists(i)).isTrue();
+ }
+ for (Path file : unusedFileList) {
+ FileStoreTestUtils.assertPathExists(table.fileIO(), file);
+ }
+
+ // waiting for all expire, only keeping files in use.
+ before.countDown();
+ after.await();
+
+ for (int i = 1; i < latestSnapshotId - 1; i++) {
+ assertThat(snapshotManager.snapshotExists(i)).isFalse();
+ }
+ for (Path file : unusedFileList) {
+ FileStoreTestUtils.assertPathNotExists(table.fileIO(), file);
+ }
+ assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
+
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(latestSnapshotId);
+
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestSnapshotId);
+
+ commit.close();
+ }
+
+ @Test
+ @Timeout(120)
+ public void testExpireWithLimit() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ Map<String, String> options = new HashMap<>();
+ options.put(SNAPSHOT_EXPIRE_EXECUTION_MODE.key(),
ExpireExecutionMode.ASYNC.toString());
+ options.put(SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+ options.put(SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+ options.put(SNAPSHOT_EXPIRE_LIMIT.key(), "2");
+
+ table = table.copy(options);
+ TableCommitImpl commit =
+ table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"))
+ .newCommit(commitUser);
+ FileStoreExpire expire = table.store().newExpire();
+
+ try (StreamTableWrite write = table.newWrite(commitUser)) {
+ for (int i = 0; i < 10; i++) {
+ write.write(rowData(i, 10 * i, 100L * i));
+ commit.commit(i, write.prepareCommit(false, i));
+ }
+ }
+
+ SnapshotManager snapshotManager = table.snapshotManager();
+ List<Snapshot> remainingSnapshot = new ArrayList<>();
+ snapshotManager.snapshots().forEachRemaining(remainingSnapshot::add);
+ long latestSnapshotId = snapshotManager.latestSnapshotId();
+ int index = 0;
+
+ // trigger the first expire and the first two snapshots expired
+ expire.expire();
+
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+ for (int i = index; i < remainingSnapshot.size(); i++) {
+
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(i).id())).isTrue();
+ }
+ assertThat(snapshotManager.earliestSnapshotId())
+ .isEqualTo(remainingSnapshot.get(index).id());
+
+ // trigger the second expire and the second two snapshots expired
+ expire.expire();
+
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(index++).id())).isFalse();
+ for (int i = index; i < remainingSnapshot.size(); i++) {
+
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(i).id())).isTrue();
+ }
+ assertThat(snapshotManager.earliestSnapshotId())
+ .isEqualTo(remainingSnapshot.get(index).id());
+
+ // trigger all remaining expires and only the last snapshot remaining
+ for (int i = 0; i < 5; i++) {
+ expire.expire();
+ }
+
+ for (int i = 0; i < remainingSnapshot.size() - 1; i++) {
+
assertThat(snapshotManager.snapshotExists(remainingSnapshot.get(i).id())).isFalse();
+ }
+ assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
+
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(latestSnapshotId);
+
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestSnapshotId);
+
+ commit.close();
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,