This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 923cfb5db [core] Expiring snapshot shouldn't delete manifest and data
files used by existing tags (#1278)
923cfb5db is described below
commit 923cfb5db60bf108cb79aa72607dc30f7bcace6d
Author: yuzelin <[email protected]>
AuthorDate: Fri Jun 9 16:33:27 2023 +0800
[core] Expiring snapshot shouldn't delete manifest and data files used by
existing tags (#1278)
---
.../java/org/apache/paimon/AbstractFileStore.java | 8 +-
.../paimon/operation/FileStoreExpireImpl.java | 33 +++-
.../apache/paimon/operation/SnapshotDeletion.java | 61 ++++---
.../org/apache/paimon/operation/TagFileKeeper.java | 166 +++++++++++++++++++
.../org/apache/paimon/utils/SnapshotManager.java | 7 +-
.../java/org/apache/paimon/utils/TagManager.java | 41 +++++
.../operation/CleanedFileStoreExpireTest.java | 2 +-
...ireDeleteDirTest.java => FileDeletionTest.java} | 182 ++++++++++++++++++++-
8 files changed, 463 insertions(+), 37 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index d697c6473..26e711121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -27,12 +27,14 @@ import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.TagFileKeeper;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
@@ -151,7 +153,11 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.snapshotNumRetainMax(),
options.snapshotTimeRetain().toMillis(),
snapshotManager(),
- newSnapshotDeletion());
+ newSnapshotDeletion(),
+ new TagFileKeeper(
+ manifestListFactory().create(),
+ manifestFileFactory().create(),
+ new TagManager(fileIO, options.path())));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 2ad6942e1..c321523d6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
/**
* Default implementation of {@link FileStoreExpire}. It retains a certain
number or period of
@@ -61,6 +63,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
private final ConsumerManager consumerManager;
private final SnapshotDeletion snapshotDeletion;
+ private final TagFileKeeper tagFileKeeper;
+
private Lock lock;
public FileStoreExpireImpl(
@@ -81,7 +85,11 @@ public class FileStoreExpireImpl implements FileStoreExpire {
fileIO,
pathFactory,
manifestFileFactory.create(),
- manifestListFactory.create()));
+ manifestListFactory.create()),
+ new TagFileKeeper(
+ manifestListFactory.create(),
+ manifestFileFactory.create(),
+ new TagManager(fileIO, snapshotManager.tablePath())));
}
public FileStoreExpireImpl(
@@ -89,7 +97,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
int numRetainedMax,
long millisRetained,
SnapshotManager snapshotManager,
- SnapshotDeletion snapshotDeletion) {
+ SnapshotDeletion snapshotDeletion,
+ TagFileKeeper tagFileKeeper) {
this.numRetainedMin = numRetainedMin;
this.numRetainedMax = numRetainedMax;
this.millisRetained = millisRetained;
@@ -97,6 +106,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
this.snapshotDeletion = snapshotDeletion;
+ this.tagFileKeeper = tagFileKeeper;
}
@Override
@@ -171,6 +181,8 @@ public class FileStoreExpireImpl implements FileStoreExpire
{
"Snapshot expire range is [" + beginInclusiveId + ", " +
endExclusiveId + ")");
}
+ tagFileKeeper.reloadTags();
+
// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next
snapshot, so the range of
// id should be (beginInclusiveId, endExclusiveId]
@@ -181,7 +193,10 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
}
Snapshot snapshot = snapshotManager.snapshot(id);
// expire merge tree files and collect changed buckets
-
snapshotDeletion.deleteExpiredDataFiles(snapshot.deltaManifestList(),
deletionBuckets);
+ snapshotDeletion.deleteExpiredDataFiles(
+ snapshot.deltaManifestList(),
+ deletionBuckets,
+ tagFileKeeper.tagDataFileSkipper(id));
}
// delete changelog files
@@ -201,11 +216,13 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
snapshotDeletion.tryDeleteDirectories(deletionBuckets);
// delete manifests
- Set<ManifestFileMeta> skipDeletion =
- new HashSet<>(
- snapshotManager
- .snapshot(endExclusiveId)
-
.dataManifests(snapshotDeletion.manifestList()));
+ Set<String> skipDeletion =
+ snapshotManager.snapshot(endExclusiveId)
+
.dataManifests(snapshotDeletion().manifestList()).stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toCollection(HashSet::new));
+ skipDeletion.addAll(
+ tagFileKeeper.collectManifestSkippingSet(beginInclusiveId,
endExclusiveId));
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete manifests in snapshot #" + id);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index b13c4f4f7..8b0a4c48f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -29,7 +29,7 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Triple;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/** Delete snapshot files. */
@@ -77,11 +78,17 @@ public class SnapshotDeletion {
*
* @param manifestListName name of manifest list
* @param deletionBuckets partition-buckets of which some data files have
been deleted
+ * @param dataFileSkipper if the test result of a data file is true, the
data file will be
+ * skipped when deleting
*/
public void deleteExpiredDataFiles(
- String manifestListName, Map<BinaryRow, Set<Integer>>
deletionBuckets) {
+ String manifestListName,
+ Map<BinaryRow, Set<Integer>> deletionBuckets,
+ Predicate<TagFileKeeper.DataFileInfo> dataFileSkipper) {
doDeleteExpiredDataFiles(
- getManifestEntriesFromManifestList(manifestListName),
deletionBuckets);
+ getManifestEntriesFromManifestList(manifestListName),
+ deletionBuckets,
+ dataFileSkipper);
}
/**
@@ -148,9 +155,9 @@ public class SnapshotDeletion {
* file.
*
* @param skipped manifest file deletion skipping set, deleted manifest
file will be added to
- * this set too.
+ * this set too. NOTE: changelog manifests won't be checked.
*/
- public void deleteManifestFiles(Set<ManifestFileMeta> skipped, Snapshot
snapshot) {
+ public void deleteManifestFiles(Set<String> skipped, Snapshot snapshot) {
// cannot call `toExpire.dataManifests` directly, it is possible that
a job is
// killed during expiration, so some manifest files may have been
deleted
List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
@@ -159,9 +166,10 @@ public class SnapshotDeletion {
// delete manifest
for (ManifestFileMeta manifest : toExpireManifests) {
- if (!skipped.contains(manifest)) {
- manifestFile.delete(manifest.fileName());
- skipped.add(manifest);
+ String fileName = manifest.fileName();
+ if (!skipped.contains(fileName)) {
+ manifestFile.delete(fileName);
+ skipped.add(fileName);
}
}
if (snapshot.changelogManifestList() != null) {
@@ -172,8 +180,12 @@ public class SnapshotDeletion {
}
// delete manifest lists
- manifestList.delete(snapshot.baseManifestList());
- manifestList.delete(snapshot.deltaManifestList());
+ if (!skipped.contains(snapshot.baseManifestList())) {
+ manifestList.delete(snapshot.baseManifestList());
+ }
+ if (!skipped.contains(snapshot.deltaManifestList())) {
+ manifestList.delete(snapshot.deltaManifestList());
+ }
if (snapshot.changelogManifestList() != null) {
manifestList.delete(snapshot.changelogManifestList());
}
@@ -192,11 +204,13 @@ public class SnapshotDeletion {
@VisibleForTesting
void doDeleteExpiredDataFiles(
- Iterable<ManifestEntry> dataFileLog, Map<BinaryRow, Set<Integer>>
deletionBuckets) {
+ Iterable<ManifestEntry> dataFileLog,
+ Map<BinaryRow, Set<Integer>> deletionBuckets,
+ Predicate<TagFileKeeper.DataFileInfo> dataFileSkipper) {
// we cannot delete a data file directly when we meet a DELETE entry,
because that
// file might be upgraded
- // data file path -> (partition, bucket, extra file paths)
- Map<Path, Triple<BinaryRow, Integer, List<Path>>> dataFileToDelete =
new HashMap<>();
+ // data file path -> (data file info, extra file paths)
+ Map<Path, Pair<TagFileKeeper.DataFileInfo, List<Path>>>
dataFileToDelete = new HashMap<>();
for (ManifestEntry entry : dataFileLog) {
Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
Path dataFilePath = new Path(bucketPath, entry.file().fileName());
@@ -210,7 +224,8 @@ public class SnapshotDeletion {
extraFiles.add(new Path(bucketPath, file));
}
dataFileToDelete.put(
- dataFilePath, Triple.of(entry.partition(),
entry.bucket(), extraFiles));
+ dataFilePath,
+ Pair.of(TagFileKeeper.DataFileInfo.of(entry),
extraFiles));
break;
default:
throw new UnsupportedOperationException(
@@ -219,12 +234,18 @@ public class SnapshotDeletion {
}
dataFileToDelete.forEach(
- (path, triple) -> {
- // delete data files
- fileIO.deleteQuietly(path);
- triple.f2.forEach(fileIO::deleteQuietly);
- // record changed buckets
- deletionBuckets.computeIfAbsent(triple.f0, p -> new
HashSet<>()).add(triple.f1);
+ (path, pair) -> {
+ TagFileKeeper.DataFileInfo info = pair.getLeft();
+ // check whether we should skip the data file
+ if (!dataFileSkipper.test(info)) {
+ // delete data files
+ fileIO.deleteQuietly(path);
+ pair.getRight().forEach(fileIO::deleteQuietly);
+ // record changed buckets
+ deletionBuckets
+ .computeIfAbsent(info.partition, p -> new
HashSet<>())
+ .add(info.bucket);
+ }
});
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
new file mode 100644
index 000000000..20e2d32aa
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.utils.ParallellyExecuteUtils;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/** Util class to provide methods to prevent tag files to be deleted when
expiring snapshots. */
+public class TagFileKeeper {
+
+ private final ManifestList manifestList;
+ private final ManifestFile manifestFile;
+ private final TagManager tagManager;
+
+ private long cachedTag = -1;
+ private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles;
+
+ private List<Snapshot> taggedSnapshots;
+
+ public TagFileKeeper(
+ ManifestList manifestList, ManifestFile manifestFile, TagManager
tagManager) {
+ this.manifestList = manifestList;
+ this.manifestFile = manifestFile;
+ this.tagManager = tagManager;
+ this.cachedTagDataFiles = new HashMap<>();
+ }
+
+ /** Caller should determine whether to reload. */
+ public void reloadTags() {
+ taggedSnapshots = tagManager.taggedSnapshots();
+ }
+
+ public Predicate<DataFileInfo> tagDataFileSkipper(long expiringSnapshotId)
{
+ int index = findPreviousTag(expiringSnapshotId, taggedSnapshots);
+ if (index >= 0) {
+ tryRefresh(taggedSnapshots.get(index));
+ }
+ return dataFileInfo -> index >= 0 && contains(dataFileInfo);
+ }
+
+ public Set<String> collectManifestSkippingSet(long beginInclusive, long
endExclusive) {
+ Set<String> manifests = new HashSet<>();
+ int right = findPreviousTag(endExclusive, taggedSnapshots);
+ if (right >= 0) {
+ int left = Math.max(findPreviousOrEqualTag(beginInclusive,
taggedSnapshots), 0);
+ for (int i = left; i <= right; i++) {
+ Snapshot snapshot = taggedSnapshots.get(i);
+
+ for (ManifestFileMeta file :
snapshot.dataManifests(manifestList)) {
+ manifests.add(file.fileName());
+ }
+
+ manifests.add(snapshot.baseManifestList());
+ manifests.add(snapshot.deltaManifestList());
+ }
+ }
+ return manifests;
+ }
+
+ private void tryRefresh(Snapshot taggedSnapshot) {
+ if (cachedTag != taggedSnapshot.id()) {
+ refresh(taggedSnapshot);
+ cachedTag = taggedSnapshot.id();
+ }
+ }
+
+ private void refresh(Snapshot taggedSnapshot) {
+ cachedTagDataFiles.clear();
+
+ Iterable<ManifestEntry> entries =
+ ParallellyExecuteUtils.parallelismBatchIterable(
+ files ->
+ files.parallelStream()
+ .flatMap(m ->
manifestFile.read(m.fileName()).stream())
+ .collect(Collectors.toList()),
+ taggedSnapshot.dataManifests(manifestList),
+ null);
+
+ for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
+ cachedTagDataFiles
+ .computeIfAbsent(entry.partition(), p -> new HashMap<>())
+ .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
+ .add(entry.file().fileName());
+ }
+ }
+
+ private boolean contains(DataFileInfo dataFileInfo) {
+ Map<Integer, Set<String>> buckets =
cachedTagDataFiles.get(dataFileInfo.partition);
+ if (buckets != null) {
+ Set<String> fileNames = buckets.get(dataFileInfo.bucket);
+ if (fileNames != null) {
+ return fileNames.contains(dataFileInfo.fileName);
+ }
+ }
+ return false;
+ }
+
+ private int findPreviousTag(long targetSnapshotId, List<Snapshot>
taggedSnapshots) {
+ for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+ if (taggedSnapshots.get(i).id() < targetSnapshotId) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private int findPreviousOrEqualTag(long targetSnapshotId, List<Snapshot>
taggedSnapshots) {
+ for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+ if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /** To accommodate information of a data file. */
+ static class DataFileInfo {
+
+ public final BinaryRow partition;
+ public final int bucket;
+ public final String fileName;
+
+ DataFileInfo(BinaryRow partition, int bucket, String fileName) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.fileName = fileName;
+ }
+
+ static DataFileInfo of(ManifestEntry manifestEntry) {
+ return new DataFileInfo(
+ manifestEntry.partition(),
+ manifestEntry.bucket(),
+ manifestEntry.file().fileName());
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 3ab7e6bf6..f6cea1dfc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
@@ -369,8 +370,10 @@ public class SnapshotManager implements Serializable {
deletion.tryDeleteDirectories(deletionBuckets);
// delete manifest files.
- Set<ManifestFileMeta> manifestSkipped =
- new
HashSet<>(snapshot(snapshotId).dataManifests(deletion.manifestList()));
+ Set<String> manifestSkipped =
+
snapshot(snapshotId).dataManifests(deletion.manifestList()).stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toCollection(HashSet::new));
for (Snapshot snapshot : snapshots) {
deletion.deleteManifestFiles(manifestSkipped, snapshot);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index b2a14c247..60133d456 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -20,9 +20,15 @@ package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -37,6 +43,11 @@ public class TagManager {
this.tablePath = tablePath;
}
+ /** Return the root Directory of tags. */
+ public Path tagDirectory() {
+ return new Path(tablePath + "/tag");
+ }
+
/** Return the path of a tag. */
public Path tagPath(String tagName) {
return new Path(tablePath + "/tag/" + tagName);
@@ -82,4 +93,34 @@ public class TagManager {
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
return Snapshot.fromPath(fileIO, tagPath(tagName));
}
+
+ /** Get all tagged snapshots sorted by commit time. */
+ public List<Snapshot> taggedSnapshots() {
+ Path tagDirectory = tagDirectory();
+ try {
+ if (!fileIO.exists(tagDirectory)) {
+ return Collections.emptyList();
+ }
+
+ FileStatus[] statuses = fileIO.listStatus(tagDirectory);
+
+ if (statuses == null) {
+ throw new RuntimeException(
+ String.format(
+ "The return value is null of the listStatus
for the '%s' directory.",
+ tagDirectory));
+ }
+
+ return Arrays.stream(statuses)
+ .map(FileStatus::getPath)
+ .map(path -> Snapshot.fromPath(fileIO, path))
+ .sorted(Comparator.comparingLong(Snapshot::id))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to get tagged snapshots in tag directory
'%s'.", tagDirectory),
+ e);
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index f1f5eb173..20612d344 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -90,7 +90,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
// expire
expire.snapshotDeletion()
- .doDeleteExpiredDataFiles(Arrays.asList(add, delete), new
HashMap<>());
+ .doDeleteExpiredDataFiles(Arrays.asList(add, delete), new
HashMap<>(), f -> false);
// check
assertThat(fileIO.exists(myDataFile)).isFalse();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
similarity index 58%
rename from
paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index f9a48e069..c7469c735 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -30,12 +30,16 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -54,13 +58,14 @@ import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+import static org.assertj.core.api.Assertions.assertThat;
/**
- * Tests for {@link FileStoreExpireImpl}. After expiration, empty data file
directories (buckets and
- * partitions) are deleted. It didn't extend {@link FileStoreExpireTestBase}
because there are not
- * too many codes can be reused.
+ * Tests for file deletion when expiring snapshot and deleting tag. It also
tests that after
+ * expiration, empty data file directories (buckets and partitions) are
deleted. It didn't extend
+ * {@link FileStoreExpireTestBase} because there are not too many codes can be
reused.
*/
-public class FileStoreExpireDeleteDirTest {
+public class FileDeletionTest {
@TempDir java.nio.file.Path tempDir;
@@ -180,7 +185,174 @@ public class FileStoreExpireDeleteDirTest {
assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
}
+ /**
+ * This test checks FileStoreExpire won't delete data files and manifests
that are used by tags.
+ * Test process:
+ *
+ * <ul>
+ * <li>1. Generate snapshot 1 with +A, +B.
+ * <li>2. Generate snapshot 2 with -A, then create tag1.
+ * <li>3. Generate snapshot 3 with +C.
+ * <li>4. Generate snapshot 4 with -B, then create tag2.
+ * <li>5. Generate snapshot 5 with +D.
+ * <li>6. Generate snapshot 6 with -D.
+ * <li>5. Expire snapshot 1 to 5 respectively.
+ * </ul>
+ *
+ * <p>To identify different data files, this test use 4 buckets to store
A, B, C and D, so we
+ * can check bucket path to assert whether the data file is reserved. The
expiration result
+ * should be:
+ *
+ * <ul>
+ * <li>Expiring snapshot 1 will delete file A because snapshot 2 has
committed -A and A is not
+ * used by tag1.
+ * <li>Expiring snapshot 2 won't delete any file because snapshot 3
hasn't committed DELETE
+ * files.
+ * <li>Expiring snapshot 3 won't delete B although snapshot 4 has
committed -B and is not used
+ * by tag2 because B is used by tag1.
+ * <li>Expiring snapshot 4 won't delete any file like snapshot 2.
+ * <li>Expiring snapshot 5 will delete file D because snapshot 6 has
committed -D and D is not
+ * used by tag2.
+ * </ul>
+ */
+ @Test
+ public void testExpireWithExistingTags() throws Exception {
+ TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4);
+ TagManager tagManager = new TagManager(fileIO, store.options().path());
+ SnapshotManager snapshotManager = store.snapshotManager();
+ TestKeyValueGenerator gen =
+ new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ BinaryRow partition = gen.getPartition(gen.next());
+
+ // step 1: commit A to bucket 0 and B to bucket 1
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
+ for (int bucket : Arrays.asList(0, 1)) {
+ List<KeyValue> kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, bucket, writers);
+ }
+ commitData(store, commitIdentifier++, writers);
+
+ // step 2: commit -A (by clean bucket 0) and create tag1
+ cleanBucket(store, gen.getPartition(gen.next()), 0);
+ tagManager.createTag(snapshotManager.snapshot(2), "tag1");
+ assertThat(tagManager.tagExists("tag1")).isTrue();
+
+ // step 3: commit C to bucket 2
+ writers.clear();
+ List<KeyValue> kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 2, writers);
+ commitData(store, commitIdentifier++, writers);
+
+ // step 4: commit -B (by clean bucket 1) and create tag2
+ cleanBucket(store, partition, 1);
+ tagManager.createTag(snapshotManager.snapshot(4), "tag2");
+ assertThat(tagManager.tagExists("tag2")).isTrue();
+
+ // step 5: commit D to bucket 3
+ writers.clear();
+ kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 3, writers);
+ commitData(store, commitIdentifier++, writers);
+
+ // step 6: commit -D (by clean bucket 3)
+ cleanBucket(store, partition, 3);
+
+ // check before expiring
+ FileStorePathFactory pathFactory = store.pathFactory();
+ for (int i = 0; i < 4; i++) {
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, i));
+ }
+
+ // check expiring results
+ store.newExpire(1, 1, Long.MAX_VALUE).expire();
+
+ // expiring snapshot 1 will delete file A
+ assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
+ // expiring snapshot 2 & 3 won't delete file B
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+ // expiring snapshot 4 & 5 will delete file D
+ assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 3));
+ // file C survives
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, 2));
+
+ // check manifests
+ ManifestList manifestList = store.manifestListFactory().create();
+ for (String tagName : Arrays.asList("tag1", "tag2")) {
+ Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+ List<Path> manifestFilePaths =
+ snapshot.dataManifests(manifestList).stream()
+ .map(ManifestFileMeta::fileName)
+ .map(pathFactory::toManifestFilePath)
+ .collect(Collectors.toList());
+ for (Path path : manifestFilePaths) {
+ assertPathExists(fileIO, path);
+ }
+
+ assertPathExists(fileIO,
pathFactory.toManifestListPath(snapshot.baseManifestList()));
+ assertPathExists(fileIO,
pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+ }
+ }
+
+ @Test
+ public void testExpireWithUpgradeAndTags() throws Exception {
+ TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ TagManager tagManager = new TagManager(fileIO, store.options().path());
+ SnapshotManager snapshotManager = store.snapshotManager();
+ TestKeyValueGenerator gen =
+ new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ BinaryRow partition = gen.getPartition(gen.next());
+
+ // snapshot 1: commit A to bucket 0
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
+ List<KeyValue> kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 0, writers);
+ commitData(store, commitIdentifier++, writers);
+
+ // snapshot 2: compact
+ writers.values().stream()
+ .flatMap(m -> m.values().stream())
+ .forEach(
+ writer -> {
+ try {
+ writer.compact(true);
+ writer.sync();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ FileStoreTestUtils.commitData(store, commitIdentifier++, writers);
+
+ // snapshot 3: commit -A (by clean bucket 0)
+ cleanBucket(store, gen.getPartition(gen.next()), 0);
+
+ tagManager.createTag(snapshotManager.snapshot(1), "tag1");
+ store.newExpire(1, 1, Long.MAX_VALUE).expire();
+
+ // check data file and manifests
+ FileStorePathFactory pathFactory = store.pathFactory();
+ assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
+
+ Snapshot tag1 = tagManager.taggedSnapshot("tag1");
+ ManifestList manifestList = store.manifestListFactory().create();
+ List<Path> manifestFilePaths =
+ tag1.dataManifests(manifestList).stream()
+ .map(ManifestFileMeta::fileName)
+ .map(pathFactory::toManifestFilePath)
+ .collect(Collectors.toList());
+ for (Path path : manifestFilePaths) {
+ assertPathExists(fileIO, path);
+ }
+
+ assertPathExists(fileIO,
pathFactory.toManifestListPath(tag1.baseManifestList()));
+ assertPathExists(fileIO,
pathFactory.toManifestListPath(tag1.deltaManifestList()));
+ }
+
private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode
mode) throws Exception {
+ return createStore(mode, 2);
+ }
+
+ private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode
mode, int buckets)
+ throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
CoreOptions.ChangelogProducer changelogProducer;
@@ -220,7 +392,7 @@ public class FileStoreExpireDeleteDirTest {
return new TestFileStore.Builder(
"avro",
root,
- 2,
+ buckets,
partitionType,
TestKeyValueGenerator.KEY_TYPE,
rowType,