This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 923cfb5db [core] Expiring snapshot shouldn't delete manifest and data 
files used by existing tags (#1278)
923cfb5db is described below

commit 923cfb5db60bf108cb79aa72607dc30f7bcace6d
Author: yuzelin <[email protected]>
AuthorDate: Fri Jun 9 16:33:27 2023 +0800

    [core] Expiring snapshot shouldn't delete manifest and data files used by 
existing tags (#1278)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   8 +-
 .../paimon/operation/FileStoreExpireImpl.java      |  33 +++-
 .../apache/paimon/operation/SnapshotDeletion.java  |  61 ++++---
 .../org/apache/paimon/operation/TagFileKeeper.java | 166 +++++++++++++++++++
 .../org/apache/paimon/utils/SnapshotManager.java   |   7 +-
 .../java/org/apache/paimon/utils/TagManager.java   |  41 +++++
 .../operation/CleanedFileStoreExpireTest.java      |   2 +-
 ...ireDeleteDirTest.java => FileDeletionTest.java} | 182 ++++++++++++++++++++-
 8 files changed, 463 insertions(+), 37 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index d697c6473..26e711121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -27,12 +27,14 @@ import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.TagFileKeeper;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import javax.annotation.Nullable;
 
@@ -151,7 +153,11 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.snapshotNumRetainMax(),
                 options.snapshotTimeRetain().toMillis(),
                 snapshotManager(),
-                newSnapshotDeletion());
+                newSnapshotDeletion(),
+                new TagFileKeeper(
+                        manifestListFactory().create(),
+                        manifestFileFactory().create(),
+                        new TagManager(fileIO, options.path())));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 2ad6942e1..c321523d6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import java.util.Map;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 /**
  * Default implementation of {@link FileStoreExpire}. It retains a certain 
number or period of
@@ -61,6 +63,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
     private final ConsumerManager consumerManager;
     private final SnapshotDeletion snapshotDeletion;
 
+    private final TagFileKeeper tagFileKeeper;
+
     private Lock lock;
 
     public FileStoreExpireImpl(
@@ -81,7 +85,11 @@ public class FileStoreExpireImpl implements FileStoreExpire {
                         fileIO,
                         pathFactory,
                         manifestFileFactory.create(),
-                        manifestListFactory.create()));
+                        manifestListFactory.create()),
+                new TagFileKeeper(
+                        manifestListFactory.create(),
+                        manifestFileFactory.create(),
+                        new TagManager(fileIO, snapshotManager.tablePath())));
     }
 
     public FileStoreExpireImpl(
@@ -89,7 +97,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
             int numRetainedMax,
             long millisRetained,
             SnapshotManager snapshotManager,
-            SnapshotDeletion snapshotDeletion) {
+            SnapshotDeletion snapshotDeletion,
+            TagFileKeeper tagFileKeeper) {
         this.numRetainedMin = numRetainedMin;
         this.numRetainedMax = numRetainedMax;
         this.millisRetained = millisRetained;
@@ -97,6 +106,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         this.consumerManager =
                 new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         this.snapshotDeletion = snapshotDeletion;
+        this.tagFileKeeper = tagFileKeeper;
     }
 
     @Override
@@ -171,6 +181,8 @@ public class FileStoreExpireImpl implements FileStoreExpire 
{
                     "Snapshot expire range is [" + beginInclusiveId + ", " + 
endExclusiveId + ")");
         }
 
+        tagFileKeeper.reloadTags();
+
         // delete merge tree files
         // deleted merge tree files in a snapshot are not used by the next 
snapshot, so the range of
         // id should be (beginInclusiveId, endExclusiveId]
@@ -181,7 +193,10 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
             }
             Snapshot snapshot = snapshotManager.snapshot(id);
             // expire merge tree files and collect changed buckets
-            
snapshotDeletion.deleteExpiredDataFiles(snapshot.deltaManifestList(), 
deletionBuckets);
+            snapshotDeletion.deleteExpiredDataFiles(
+                    snapshot.deltaManifestList(),
+                    deletionBuckets,
+                    tagFileKeeper.tagDataFileSkipper(id));
         }
 
         // delete changelog files
@@ -201,11 +216,13 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
         snapshotDeletion.tryDeleteDirectories(deletionBuckets);
 
         // delete manifests
-        Set<ManifestFileMeta> skipDeletion =
-                new HashSet<>(
-                        snapshotManager
-                                .snapshot(endExclusiveId)
-                                
.dataManifests(snapshotDeletion.manifestList()));
+        Set<String> skipDeletion =
+                snapshotManager.snapshot(endExclusiveId)
+                        
.dataManifests(snapshotDeletion().manifestList()).stream()
+                        .map(ManifestFileMeta::fileName)
+                        .collect(Collectors.toCollection(HashSet::new));
+        skipDeletion.addAll(
+                tagFileKeeper.collectManifestSkippingSet(beginInclusiveId, 
endExclusiveId));
         for (long id = beginInclusiveId; id < endExclusiveId; id++) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ready to delete manifests in snapshot #" + id);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index b13c4f4f7..8b0a4c48f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -29,7 +29,7 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Triple;
+import org.apache.paimon.utils.Pair;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
 
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /** Delete snapshot files. */
@@ -77,11 +78,17 @@ public class SnapshotDeletion {
      *
      * @param manifestListName name of manifest list
      * @param deletionBuckets partition-buckets of which some data files have 
been deleted
+     * @param dataFileSkipper if the test result of a data file is true, the 
data file will be
+     *     skipped when deleting
      */
     public void deleteExpiredDataFiles(
-            String manifestListName, Map<BinaryRow, Set<Integer>> 
deletionBuckets) {
+            String manifestListName,
+            Map<BinaryRow, Set<Integer>> deletionBuckets,
+            Predicate<TagFileKeeper.DataFileInfo> dataFileSkipper) {
         doDeleteExpiredDataFiles(
-                getManifestEntriesFromManifestList(manifestListName), 
deletionBuckets);
+                getManifestEntriesFromManifestList(manifestListName),
+                deletionBuckets,
+                dataFileSkipper);
     }
 
     /**
@@ -148,9 +155,9 @@ public class SnapshotDeletion {
      * file.
      *
      * @param skipped manifest file deletion skipping set, deleted manifest 
file will be added to
-     *     this set too.
+     *     this set too. NOTE: changelog manifests won't be checked.
      */
-    public void deleteManifestFiles(Set<ManifestFileMeta> skipped, Snapshot 
snapshot) {
+    public void deleteManifestFiles(Set<String> skipped, Snapshot snapshot) {
         // cannot call `toExpire.dataManifests` directly, it is possible that 
a job is
         // killed during expiration, so some manifest files may have been 
deleted
         List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
@@ -159,9 +166,10 @@ public class SnapshotDeletion {
 
         // delete manifest
         for (ManifestFileMeta manifest : toExpireManifests) {
-            if (!skipped.contains(manifest)) {
-                manifestFile.delete(manifest.fileName());
-                skipped.add(manifest);
+            String fileName = manifest.fileName();
+            if (!skipped.contains(fileName)) {
+                manifestFile.delete(fileName);
+                skipped.add(fileName);
             }
         }
         if (snapshot.changelogManifestList() != null) {
@@ -172,8 +180,12 @@ public class SnapshotDeletion {
         }
 
         // delete manifest lists
-        manifestList.delete(snapshot.baseManifestList());
-        manifestList.delete(snapshot.deltaManifestList());
+        if (!skipped.contains(snapshot.baseManifestList())) {
+            manifestList.delete(snapshot.baseManifestList());
+        }
+        if (!skipped.contains(snapshot.deltaManifestList())) {
+            manifestList.delete(snapshot.deltaManifestList());
+        }
         if (snapshot.changelogManifestList() != null) {
             manifestList.delete(snapshot.changelogManifestList());
         }
@@ -192,11 +204,13 @@ public class SnapshotDeletion {
 
     @VisibleForTesting
     void doDeleteExpiredDataFiles(
-            Iterable<ManifestEntry> dataFileLog, Map<BinaryRow, Set<Integer>> 
deletionBuckets) {
+            Iterable<ManifestEntry> dataFileLog,
+            Map<BinaryRow, Set<Integer>> deletionBuckets,
+            Predicate<TagFileKeeper.DataFileInfo> dataFileSkipper) {
         // we cannot delete a data file directly when we meet a DELETE entry, 
because that
         // file might be upgraded
-        // data file path -> (partition, bucket, extra file paths)
-        Map<Path, Triple<BinaryRow, Integer, List<Path>>> dataFileToDelete = 
new HashMap<>();
+        // data file path -> (data file info, extra file paths)
+        Map<Path, Pair<TagFileKeeper.DataFileInfo, List<Path>>> 
dataFileToDelete = new HashMap<>();
         for (ManifestEntry entry : dataFileLog) {
             Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
             Path dataFilePath = new Path(bucketPath, entry.file().fileName());
@@ -210,7 +224,8 @@ public class SnapshotDeletion {
                         extraFiles.add(new Path(bucketPath, file));
                     }
                     dataFileToDelete.put(
-                            dataFilePath, Triple.of(entry.partition(), 
entry.bucket(), extraFiles));
+                            dataFilePath,
+                            Pair.of(TagFileKeeper.DataFileInfo.of(entry), 
extraFiles));
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -219,12 +234,18 @@ public class SnapshotDeletion {
         }
 
         dataFileToDelete.forEach(
-                (path, triple) -> {
-                    // delete data files
-                    fileIO.deleteQuietly(path);
-                    triple.f2.forEach(fileIO::deleteQuietly);
-                    // record changed buckets
-                    deletionBuckets.computeIfAbsent(triple.f0, p -> new 
HashSet<>()).add(triple.f1);
+                (path, pair) -> {
+                    TagFileKeeper.DataFileInfo info = pair.getLeft();
+                    // check whether we should skip the data file
+                    if (!dataFileSkipper.test(info)) {
+                        // delete data files
+                        fileIO.deleteQuietly(path);
+                        pair.getRight().forEach(fileIO::deleteQuietly);
+                        // record changed buckets
+                        deletionBuckets
+                                .computeIfAbsent(info.partition, p -> new 
HashSet<>())
+                                .add(info.bucket);
+                    }
                 });
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
new file mode 100644
index 000000000..20e2d32aa
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagFileKeeper.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.utils.ParallellyExecuteUtils;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/** Util class to provide methods to prevent tag files to be deleted when 
expiring snapshots. */
+public class TagFileKeeper {
+
+    private final ManifestList manifestList;
+    private final ManifestFile manifestFile;
+    private final TagManager tagManager;
+
+    private long cachedTag = -1;
+    private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles;
+
+    private List<Snapshot> taggedSnapshots;
+
+    public TagFileKeeper(
+            ManifestList manifestList, ManifestFile manifestFile, TagManager 
tagManager) {
+        this.manifestList = manifestList;
+        this.manifestFile = manifestFile;
+        this.tagManager = tagManager;
+        this.cachedTagDataFiles = new HashMap<>();
+    }
+
+    /** Caller should determine whether to reload. */
+    public void reloadTags() {
+        taggedSnapshots = tagManager.taggedSnapshots();
+    }
+
+    public Predicate<DataFileInfo> tagDataFileSkipper(long expiringSnapshotId) 
{
+        int index = findPreviousTag(expiringSnapshotId, taggedSnapshots);
+        if (index >= 0) {
+            tryRefresh(taggedSnapshots.get(index));
+        }
+        return dataFileInfo -> index >= 0 && contains(dataFileInfo);
+    }
+
+    public Set<String> collectManifestSkippingSet(long beginInclusive, long 
endExclusive) {
+        Set<String> manifests = new HashSet<>();
+        int right = findPreviousTag(endExclusive, taggedSnapshots);
+        if (right >= 0) {
+            int left = Math.max(findPreviousOrEqualTag(beginInclusive, 
taggedSnapshots), 0);
+            for (int i = left; i <= right; i++) {
+                Snapshot snapshot = taggedSnapshots.get(i);
+
+                for (ManifestFileMeta file : 
snapshot.dataManifests(manifestList)) {
+                    manifests.add(file.fileName());
+                }
+
+                manifests.add(snapshot.baseManifestList());
+                manifests.add(snapshot.deltaManifestList());
+            }
+        }
+        return manifests;
+    }
+
+    private void tryRefresh(Snapshot taggedSnapshot) {
+        if (cachedTag != taggedSnapshot.id()) {
+            refresh(taggedSnapshot);
+            cachedTag = taggedSnapshot.id();
+        }
+    }
+
+    private void refresh(Snapshot taggedSnapshot) {
+        cachedTagDataFiles.clear();
+
+        Iterable<ManifestEntry> entries =
+                ParallellyExecuteUtils.parallelismBatchIterable(
+                        files ->
+                                files.parallelStream()
+                                        .flatMap(m -> 
manifestFile.read(m.fileName()).stream())
+                                        .collect(Collectors.toList()),
+                        taggedSnapshot.dataManifests(manifestList),
+                        null);
+
+        for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
+            cachedTagDataFiles
+                    .computeIfAbsent(entry.partition(), p -> new HashMap<>())
+                    .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
+                    .add(entry.file().fileName());
+        }
+    }
+
+    private boolean contains(DataFileInfo dataFileInfo) {
+        Map<Integer, Set<String>> buckets = 
cachedTagDataFiles.get(dataFileInfo.partition);
+        if (buckets != null) {
+            Set<String> fileNames = buckets.get(dataFileInfo.bucket);
+            if (fileNames != null) {
+                return fileNames.contains(dataFileInfo.fileName);
+            }
+        }
+        return false;
+    }
+
+    private int findPreviousTag(long targetSnapshotId, List<Snapshot> 
taggedSnapshots) {
+        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+            if (taggedSnapshots.get(i).id() < targetSnapshotId) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private int findPreviousOrEqualTag(long targetSnapshotId, List<Snapshot> 
taggedSnapshots) {
+        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+            if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    /** To accommodate information of a data file. */
+    static class DataFileInfo {
+
+        public final BinaryRow partition;
+        public final int bucket;
+        public final String fileName;
+
+        DataFileInfo(BinaryRow partition, int bucket, String fileName) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.fileName = fileName;
+        }
+
+        static DataFileInfo of(ManifestEntry manifestEntry) {
+            return new DataFileInfo(
+                    manifestEntry.partition(),
+                    manifestEntry.bucket(),
+                    manifestEntry.file().fileName());
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 3ab7e6bf6..f6cea1dfc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 
@@ -369,8 +370,10 @@ public class SnapshotManager implements Serializable {
         deletion.tryDeleteDirectories(deletionBuckets);
 
         // delete manifest files.
-        Set<ManifestFileMeta> manifestSkipped =
-                new 
HashSet<>(snapshot(snapshotId).dataManifests(deletion.manifestList()));
+        Set<String> manifestSkipped =
+                
snapshot(snapshotId).dataManifests(deletion.manifestList()).stream()
+                        .map(ManifestFileMeta::fileName)
+                        .collect(Collectors.toCollection(HashSet::new));
         for (Snapshot snapshot : snapshots) {
             deletion.deleteManifestFiles(manifestSkipped, snapshot);
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index b2a14c247..60133d456 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -20,9 +20,15 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -37,6 +43,11 @@ public class TagManager {
         this.tablePath = tablePath;
     }
 
+    /** Return the root Directory of tags. */
+    public Path tagDirectory() {
+        return new Path(tablePath + "/tag");
+    }
+
     /** Return the path of a tag. */
     public Path tagPath(String tagName) {
         return new Path(tablePath + "/tag/" + tagName);
@@ -82,4 +93,34 @@ public class TagManager {
         checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
         return Snapshot.fromPath(fileIO, tagPath(tagName));
     }
+
+    /** Get all tagged snapshots sorted by commit time. */
+    public List<Snapshot> taggedSnapshots() {
+        Path tagDirectory = tagDirectory();
+        try {
+            if (!fileIO.exists(tagDirectory)) {
+                return Collections.emptyList();
+            }
+
+            FileStatus[] statuses = fileIO.listStatus(tagDirectory);
+
+            if (statuses == null) {
+                throw new RuntimeException(
+                        String.format(
+                                "The return value is null of the listStatus 
for the '%s' directory.",
+                                tagDirectory));
+            }
+
+            return Arrays.stream(statuses)
+                    .map(FileStatus::getPath)
+                    .map(path -> Snapshot.fromPath(fileIO, path))
+                    .sorted(Comparator.comparingLong(Snapshot::id))
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to get tagged snapshots in tag directory 
'%s'.", tagDirectory),
+                    e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index f1f5eb173..20612d344 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -90,7 +90,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
 
         // expire
         expire.snapshotDeletion()
-                .doDeleteExpiredDataFiles(Arrays.asList(add, delete), new 
HashMap<>());
+                .doDeleteExpiredDataFiles(Arrays.asList(add, delete), new 
HashMap<>(), f -> false);
 
         // check
         assertThat(fileIO.exists(myDataFile)).isFalse();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
 b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
similarity index 58%
rename from 
paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index f9a48e069..c7469c735 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireDeleteDirTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -30,12 +30,16 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -54,13 +58,14 @@ import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
 import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
 import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
 import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Tests for {@link FileStoreExpireImpl}. After expiration, empty data file 
directories (buckets and
- * partitions) are deleted. It didn't extend {@link FileStoreExpireTestBase} 
because there are not
- * too many codes can be reused.
+ * Tests for file deletion when expiring snapshot and deleting tag. It also 
tests that after
+ * expiration, empty data file directories (buckets and partitions) are 
deleted. It didn't extend
+ * {@link FileStoreExpireTestBase} because there are not too many codes can be 
reused.
  */
-public class FileStoreExpireDeleteDirTest {
+public class FileDeletionTest {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -180,7 +185,174 @@ public class FileStoreExpireDeleteDirTest {
         assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
     }
 
+    /**
+     * This test checks FileStoreExpire won't delete data files and manifests 
that are used by tags.
+     * Test process:
+     *
+     * <ul>
+     *   <li>1. Generate snapshot 1 with +A, +B.
+     *   <li>2. Generate snapshot 2 with -A, then create tag1.
+     *   <li>3. Generate snapshot 3 with +C.
+     *   <li>4. Generate snapshot 4 with -B, then create tag2.
+     *   <li>5. Generate snapshot 5 with +D.
+     *   <li>6. Generate snapshot 6 with -D.
+     *   <li>5. Expire snapshot 1 to 5 respectively.
+     * </ul>
+     *
+     * <p>To identify different data files, this test use 4 buckets to store 
A, B, C and D, so we
+     * can check bucket path to assert whether the data file is reserved. The 
expiration result
+     * should be:
+     *
+     * <ul>
+     *   <li>Expiring snapshot 1 will delete file A because snapshot 2 has 
committed -A and A is not
+     *       used by tag1.
+     *   <li>Expiring snapshot 2 won't delete any file because snapshot 3 
hasn't committed DELETE
+     *       files.
+     *   <li>Expiring snapshot 3 won't delete B although snapshot 4 has 
committed -B and is not used
+     *       by tag2 because B is used by tag1.
+     *   <li>Expiring snapshot 4 won't delete any file like snapshot 2.
+     *   <li>Expiring snapshot 5 will delete file D because snapshot 6 has 
committed -D and D is not
+     *       used by tag2.
+     * </ul>
+     */
+    @Test
+    public void testExpireWithExistingTags() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4);
+        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        SnapshotManager snapshotManager = store.snapshotManager();
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // step 1: commit A to bucket 0 and B to bucket 1
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        for (int bucket : Arrays.asList(0, 1)) {
+            List<KeyValue> kvs = partitionedData(5, gen);
+            writeData(store, kvs, partition, bucket, writers);
+        }
+        commitData(store, commitIdentifier++, writers);
+
+        // step 2: commit -A (by clean bucket 0) and create tag1
+        cleanBucket(store, gen.getPartition(gen.next()), 0);
+        tagManager.createTag(snapshotManager.snapshot(2), "tag1");
+        assertThat(tagManager.tagExists("tag1")).isTrue();
+
+        // step 3: commit C to bucket 2
+        writers.clear();
+        List<KeyValue> kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 2, writers);
+        commitData(store, commitIdentifier++, writers);
+
+        // step 4: commit -B (by clean bucket 1) and create tag2
+        cleanBucket(store, partition, 1);
+        tagManager.createTag(snapshotManager.snapshot(4), "tag2");
+        assertThat(tagManager.tagExists("tag2")).isTrue();
+
+        // step 5: commit D to bucket 3
+        writers.clear();
+        kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 3, writers);
+        commitData(store, commitIdentifier++, writers);
+
+        // step 6: commit -D (by clean bucket 3)
+        cleanBucket(store, partition, 3);
+
+        // check before expiring
+        FileStorePathFactory pathFactory = store.pathFactory();
+        for (int i = 0; i < 4; i++) {
+            assertPathExists(fileIO, pathFactory.bucketPath(partition, i));
+        }
+
+        // check expiring results
+        store.newExpire(1, 1, Long.MAX_VALUE).expire();
+
+        // expiring snapshot 1 will delete file A
+        assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
+        // expiring snapshot 2 & 3 won't delete file B
+        assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
+        // expiring snapshot 4 & 5 will delete file D
+        assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 3));
+        // file C survives
+        assertPathExists(fileIO, pathFactory.bucketPath(partition, 2));
+
+        // check manifests
+        ManifestList manifestList = store.manifestListFactory().create();
+        for (String tagName : Arrays.asList("tag1", "tag2")) {
+            Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+            List<Path> manifestFilePaths =
+                    snapshot.dataManifests(manifestList).stream()
+                            .map(ManifestFileMeta::fileName)
+                            .map(pathFactory::toManifestFilePath)
+                            .collect(Collectors.toList());
+            for (Path path : manifestFilePaths) {
+                assertPathExists(fileIO, path);
+            }
+
+            assertPathExists(fileIO, 
pathFactory.toManifestListPath(snapshot.baseManifestList()));
+            assertPathExists(fileIO, 
pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+        }
+    }
+
+    @Test
+    public void testExpireWithUpgradeAndTags() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        SnapshotManager snapshotManager = store.snapshotManager();
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // snapshot 1: commit A to bucket 0
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        List<KeyValue> kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 0, writers);
+        commitData(store, commitIdentifier++, writers);
+
+        // snapshot 2: compact
+        writers.values().stream()
+                .flatMap(m -> m.values().stream())
+                .forEach(
+                        writer -> {
+                            try {
+                                writer.compact(true);
+                                writer.sync();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        FileStoreTestUtils.commitData(store, commitIdentifier++, writers);
+
+        // snapshot 3: commit -A (by clean bucket 0)
+        cleanBucket(store, gen.getPartition(gen.next()), 0);
+
+        tagManager.createTag(snapshotManager.snapshot(1), "tag1");
+        store.newExpire(1, 1, Long.MAX_VALUE).expire();
+
+        // check data file and manifests
+        FileStorePathFactory pathFactory = store.pathFactory();
+        assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
+
+        Snapshot tag1 = tagManager.taggedSnapshot("tag1");
+        ManifestList manifestList = store.manifestListFactory().create();
+        List<Path> manifestFilePaths =
+                tag1.dataManifests(manifestList).stream()
+                        .map(ManifestFileMeta::fileName)
+                        .map(pathFactory::toManifestFilePath)
+                        .collect(Collectors.toList());
+        for (Path path : manifestFilePaths) {
+            assertPathExists(fileIO, path);
+        }
+
+        assertPathExists(fileIO, 
pathFactory.toManifestListPath(tag1.baseManifestList()));
+        assertPathExists(fileIO, 
pathFactory.toManifestListPath(tag1.deltaManifestList()));
+    }
+
     private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode 
mode) throws Exception {
+        return createStore(mode, 2);
+    }
+
+    private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode 
mode, int buckets)
+            throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
         CoreOptions.ChangelogProducer changelogProducer;
@@ -220,7 +392,7 @@ public class FileStoreExpireDeleteDirTest {
         return new TestFileStore.Builder(
                         "avro",
                         root,
-                        2,
+                        buckets,
                         partitionType,
                         TestKeyValueGenerator.KEY_TYPE,
                         rowType,


Reply via email to