This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9191e2e1f5 [core] Optimize memory usage for expiring snapshots and 
tags (#4655)
9191e2e1f5 is described below

commit 9191e2e1f556bc91af7941046869a516c15d5fe8
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 9 13:55:11 2024 +0800

    [core] Optimize memory usage for expiring snapshots and tags (#4655)
    
    This closes #4655.
---
 .../apache/paimon/manifest/ExpireFileEntry.java    | 86 +++++++++++++++++++
 .../java/org/apache/paimon/manifest/FileEntry.java |  2 +
 .../org/apache/paimon/manifest/ManifestEntry.java  |  5 ++
 .../org/apache/paimon/manifest/ManifestFile.java   | 10 +++
 .../apache/paimon/manifest/SimpleFileEntry.java    |  5 ++
 .../apache/paimon/operation/ChangelogDeletion.java |  4 +-
 .../apache/paimon/operation/FileDeletionBase.java  | 99 ++++++++++------------
 .../apache/paimon/operation/SnapshotDeletion.java  | 12 +--
 .../org/apache/paimon/operation/TagDeletion.java   | 17 ++--
 .../apache/paimon/table/ExpireChangelogImpl.java   |  4 +-
 .../apache/paimon/table/ExpireSnapshotsImpl.java   |  4 +-
 .../org/apache/paimon/table/RollbackHelper.java    |  4 +-
 .../java/org/apache/paimon/utils/TagManager.java   |  4 +-
 .../paimon/operation/ExpireSnapshotsTest.java      |  5 +-
 14 files changed, 183 insertions(+), 78 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
new file mode 100644
index 0000000000..060360623c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** A {@link SimpleFileEntry} with {@link #fileSource}. */
+public class ExpireFileEntry extends SimpleFileEntry {
+
+    @Nullable private final FileSource fileSource;
+
+    public ExpireFileEntry(
+            FileKind kind,
+            BinaryRow partition,
+            int bucket,
+            int level,
+            String fileName,
+            List<String> extraFiles,
+            @Nullable byte[] embeddedIndex,
+            BinaryRow minKey,
+            BinaryRow maxKey,
+            @Nullable FileSource fileSource) {
+        super(kind, partition, bucket, level, fileName, extraFiles, 
embeddedIndex, minKey, maxKey);
+        this.fileSource = fileSource;
+    }
+
+    public Optional<FileSource> fileSource() {
+        return Optional.ofNullable(fileSource);
+    }
+
+    public static ExpireFileEntry from(ManifestEntry entry) {
+        return new ExpireFileEntry(
+                entry.kind(),
+                entry.partition(),
+                entry.bucket(),
+                entry.level(),
+                entry.fileName(),
+                entry.file().extraFiles(),
+                entry.file().embeddedIndex(),
+                entry.minKey(),
+                entry.maxKey(),
+                entry.file().fileSource().orElse(null));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        ExpireFileEntry that = (ExpireFileEntry) o;
+        return fileSource == that.fileSource;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), fileSource);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 91e07a369d..a2569beac6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -60,6 +60,8 @@ public interface FileEntry {
 
     BinaryRow maxKey();
 
+    List<String> extraFiles();
+
     /**
      * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
      * file.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index ee5dc2c344..626e0a5d46 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -102,6 +102,11 @@ public class ManifestEntry implements FileEntry {
         return file.maxKey();
     }
 
+    @Override
+    public List<String> extraFiles() {
+        return file.extraFiles();
+    }
+
     public int totalBuckets() {
         return totalBuckets;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 128f5262a5..1aba2ef195 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -39,6 +39,7 @@ import org.apache.paimon.utils.VersionedObjectSerializer;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -84,6 +85,15 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> 
{
         return suggestedFileSize;
     }
 
+    public List<ExpireFileEntry> readExpireFileEntries(String fileName, 
@Nullable Long fileSize) {
+        List<ManifestEntry> entries = read(fileName, fileSize);
+        List<ExpireFileEntry> result = new ArrayList<>(entries.size());
+        for (ManifestEntry entry : entries) {
+            result.add(ExpireFileEntry.from(entry));
+        }
+        return result;
+    }
+
     /**
      * Write several {@link ManifestEntry}s into manifest files.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index 8d33ede0c4..fdaed2b85a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -117,6 +117,11 @@ public class SimpleFileEntry implements FileEntry {
         return maxKey;
     }
 
+    @Override
+    public List<String> extraFiles() {
+        return extraFiles;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
index c20405ff26..069e57bb3d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
@@ -23,8 +23,8 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
@@ -60,7 +60,7 @@ public class ChangelogDeletion extends 
FileDeletionBase<Changelog> {
     }
 
     @Override
-    public void cleanUnusedDataFiles(Changelog changelog, 
Predicate<ManifestEntry> skipper) {
+    public void cleanUnusedDataFiles(Changelog changelog, 
Predicate<ExpireFileEntry> skipper) {
         if (changelog.changelogManifestList() != null) {
             deleteAddedDataFiles(changelog.changelogManifestList());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 303a074b0c..cfecd767b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -24,10 +24,11 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileEntry.Identifier;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
@@ -46,7 +47,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 /**
  * Base class for file deletion including methods for clean data files, 
manifest files and empty
@@ -110,7 +109,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
      * @param skipper if the test result of a data file is true, it will be 
skipped when deleting;
      *     else it will be deleted
      */
-    public abstract void cleanUnusedDataFiles(T snapshot, 
Predicate<ManifestEntry> skipper);
+    public abstract void cleanUnusedDataFiles(T snapshot, 
Predicate<ExpireFileEntry> skipper);
 
     /**
      * Clean metadata files that will not be used anymore of a snapshot, 
including data manifests,
@@ -164,21 +163,23 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
         deletionBuckets.clear();
     }
 
-    protected void recordDeletionBuckets(ManifestEntry entry) {
+    protected void recordDeletionBuckets(ExpireFileEntry entry) {
         deletionBuckets
                 .computeIfAbsent(entry.partition(), p -> new HashSet<>())
                 .add(entry.bucket());
     }
 
-    public void cleanUnusedDataFiles(String manifestList, 
Predicate<ManifestEntry> skipper) {
+    public void cleanUnusedDataFiles(String manifestList, 
Predicate<ExpireFileEntry> skipper) {
         // try read manifests
-        List<String> manifestFileNames = 
readManifestFileNames(tryReadManifestList(manifestList));
-        List<ManifestEntry> manifestEntries;
+        List<ManifestFileMeta> manifests = tryReadManifestList(manifestList);
+        List<ExpireFileEntry> manifestEntries;
         // data file path -> (original manifest entry, extra file paths)
-        Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new 
HashMap<>();
-        for (String manifest : manifestFileNames) {
+        Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete = new 
HashMap<>();
+        for (ManifestFileMeta manifest : manifests) {
             try {
-                manifestEntries = manifestFile.read(manifest);
+                manifestEntries =
+                        manifestFile.readExpireFileEntries(
+                                manifest.fileName(), manifest.fileSize());
             } catch (Exception e) {
                 // cancel deletion if any exception occurs
                 LOG.warn("Failed to read some manifest files. Cancel 
deletion.", e);
@@ -192,12 +193,12 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
     }
 
     protected void doCleanUnusedDataFile(
-            Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
-            Predicate<ManifestEntry> skipper) {
+            Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
+            Predicate<ExpireFileEntry> skipper) {
         List<Path> actualDataFileToDelete = new ArrayList<>();
         dataFileToDelete.forEach(
                 (path, pair) -> {
-                    ManifestEntry entry = pair.getLeft();
+                    ExpireFileEntry entry = pair.getLeft();
                     // check whether we should skip the data file
                     if (!skipper.test(entry)) {
                         // delete data files
@@ -211,20 +212,20 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
     }
 
     protected void getDataFileToDelete(
-            Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
-            List<ManifestEntry> dataFileEntries) {
+            Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
+            List<ExpireFileEntry> dataFileEntries) {
         // we cannot delete a data file directly when we meet a DELETE entry, 
because that
         // file might be upgraded
-        for (ManifestEntry entry : dataFileEntries) {
+        for (ExpireFileEntry entry : dataFileEntries) {
             Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
-            Path dataFilePath = new Path(bucketPath, entry.file().fileName());
+            Path dataFilePath = new Path(bucketPath, entry.fileName());
             switch (entry.kind()) {
                 case ADD:
                     dataFileToDelete.remove(dataFilePath);
                     break;
                 case DELETE:
-                    List<Path> extraFiles = new 
ArrayList<>(entry.file().extraFiles().size());
-                    for (String file : entry.file().extraFiles()) {
+                    List<Path> extraFiles = new 
ArrayList<>(entry.extraFiles().size());
+                    for (String file : entry.extraFiles()) {
                         extraFiles.add(new Path(bucketPath, file));
                     }
                     dataFileToDelete.put(dataFilePath, Pair.of(entry, 
extraFiles));
@@ -242,27 +243,28 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
      * @param manifestListName name of manifest list
      */
     public void deleteAddedDataFiles(String manifestListName) {
-        List<String> manifestFileNames =
-                readManifestFileNames(tryReadManifestList(manifestListName));
-        for (String file : manifestFileNames) {
+        List<ManifestFileMeta> manifests = 
tryReadManifestList(manifestListName);
+        for (ManifestFileMeta manifest : manifests) {
             try {
-                List<ManifestEntry> manifestEntries = manifestFile.read(file);
+                List<ExpireFileEntry> manifestEntries =
+                        manifestFile.readExpireFileEntries(
+                                manifest.fileName(), manifest.fileSize());
                 deleteAddedDataFiles(manifestEntries);
             } catch (Exception e) {
                 // We want to delete the data file, so just ignore the 
unavailable files
-                LOG.info("Failed to read manifest " + file + ". Ignore it.", 
e);
+                LOG.info("Failed to read manifest " + manifest.fileName() + ". 
Ignore it.", e);
             }
         }
     }
 
-    private void deleteAddedDataFiles(List<ManifestEntry> manifestEntries) {
+    private void deleteAddedDataFiles(List<ExpireFileEntry> manifestEntries) {
         List<Path> dataFileToDelete = new ArrayList<>();
-        for (ManifestEntry entry : manifestEntries) {
+        for (ExpireFileEntry entry : manifestEntries) {
             if (entry.kind() == FileKind.ADD) {
                 dataFileToDelete.add(
                         new Path(
                                 pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
-                                entry.file().fileName()));
+                                entry.fileName()));
                 recordDeletionBuckets(entry);
             }
         }
@@ -327,7 +329,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
         cleanUnusedStatisticsManifests(snapshot, skippingSet);
     }
 
-    public Predicate<ManifestEntry> createDataFileSkipperForTags(
+    public Predicate<ExpireFileEntry> createDataFileSkipperForTags(
             List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws 
Exception {
         int index = SnapshotManager.findPreviousSnapshot(taggedSnapshots, 
expiringSnapshotId);
         // refresh tag data files
@@ -358,18 +360,6 @@ public abstract class FileDeletionBase<T extends Snapshot> 
{
         }
     }
 
-    protected List<String> tryReadDataManifests(Snapshot snapshot) {
-        List<ManifestFileMeta> manifestFileMetas = 
tryReadManifestList(snapshot.baseManifestList());
-        
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
-        return readManifestFileNames(manifestFileMetas);
-    }
-
-    protected List<String> readManifestFileNames(List<ManifestFileMeta> 
manifestFileMetas) {
-        return manifestFileMetas.stream()
-                .map(ManifestFileMeta::fileName)
-                .collect(Collectors.toCollection(LinkedList::new));
-    }
-
     /**
      * NOTE: This method is used for building data file skipping set. If 
failed to read some
      * manifests, it will throw exception which callers must handle.
@@ -377,23 +367,26 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
     protected void addMergedDataFiles(
             Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot 
snapshot)
             throws IOException {
-        for (ManifestEntry entry : readMergedDataFiles(snapshot)) {
+        for (ExpireFileEntry entry : readMergedDataFiles(snapshot)) {
             dataFiles
                     .computeIfAbsent(entry.partition(), p -> new HashMap<>())
                     .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
-                    .add(entry.file().fileName());
+                    .add(entry.fileName());
         }
     }
 
-    protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot) 
throws IOException {
+    protected Collection<ExpireFileEntry> readMergedDataFiles(Snapshot 
snapshot)
+            throws IOException {
         // read data manifests
-        List<String> files = tryReadDataManifests(snapshot);
+
+        List<ManifestFileMeta> manifests = 
tryReadManifestList(snapshot.baseManifestList());
+        manifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
 
         // read and merge manifest entries
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
-        for (String manifest : files) {
-            List<ManifestEntry> entries;
-            entries = manifestFile.readWithIOException(manifest);
+        Map<Identifier, ExpireFileEntry> map = new HashMap<>();
+        for (ManifestFileMeta manifest : manifests) {
+            List<ExpireFileEntry> entries =
+                    manifestFile.readExpireFileEntries(manifest.fileName(), 
manifest.fileSize());
             FileEntry.mergeEntries(entries, map);
         }
 
@@ -401,12 +394,12 @@ public abstract class FileDeletionBase<T extends 
Snapshot> {
     }
 
     protected boolean containsDataFile(
-            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry 
testee) {
-        Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
+            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, 
ExpireFileEntry entry) {
+        Map<Integer, Set<String>> buckets = dataFiles.get(entry.partition());
         if (buckets != null) {
-            Set<String> fileNames = buckets.get(testee.bucket());
+            Set<String> fileNames = buckets.get(entry.bucket());
             if (fileNames != null) {
-                return fileNames.contains(testee.file().fileName());
+                return fileNames.contains(entry.fileName());
             }
         }
         return false;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index d86907ecea..7d55b64c8e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -23,8 +23,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.stats.StatsFileHandler;
@@ -65,15 +65,15 @@ public class SnapshotDeletion extends 
FileDeletionBase<Snapshot> {
     }
 
     @Override
-    public void cleanUnusedDataFiles(Snapshot snapshot, 
Predicate<ManifestEntry> skipper) {
+    public void cleanUnusedDataFiles(Snapshot snapshot, 
Predicate<ExpireFileEntry> skipper) {
         if (changelogDecoupled && !produceChangelog) {
             // Skip clean the 'APPEND' data files.If we do not have the file 
source information
             // eg: the old version table file, we just skip clean this here, 
let it done by
             // ExpireChangelogImpl
-            Predicate<ManifestEntry> enriched =
+            Predicate<ExpireFileEntry> enriched =
                     manifestEntry ->
                             skipper.test(manifestEntry)
-                                    || 
(manifestEntry.file().fileSource().orElse(FileSource.APPEND)
+                                    || 
(manifestEntry.fileSource().orElse(FileSource.APPEND)
                                             == FileSource.APPEND);
             cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched);
         } else {
@@ -92,8 +92,8 @@ public class SnapshotDeletion extends 
FileDeletionBase<Snapshot> {
     }
 
     @VisibleForTesting
-    void cleanUnusedDataFile(List<ManifestEntry> dataFileLog) {
-        Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new 
HashMap<>();
+    void cleanUnusedDataFile(List<ExpireFileEntry> dataFileLog) {
+        Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete = new 
HashMap<>();
         getDataFileToDelete(dataFileToDelete, dataFileLog);
         doCleanUnusedDataFile(dataFileToDelete, f -> false);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index a6cd338d58..2722ed0c7e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.stats.StatsFileHandler;
@@ -68,8 +68,8 @@ public class TagDeletion extends FileDeletionBase<Snapshot> {
     }
 
     @Override
-    public void cleanUnusedDataFiles(Snapshot taggedSnapshot, 
Predicate<ManifestEntry> skipper) {
-        Collection<ManifestEntry> manifestEntries;
+    public void cleanUnusedDataFiles(Snapshot taggedSnapshot, 
Predicate<ExpireFileEntry> skipper) {
+        Collection<ExpireFileEntry> manifestEntries;
         try {
             manifestEntries = readMergedDataFiles(taggedSnapshot);
         } catch (IOException e) {
@@ -78,11 +78,11 @@ public class TagDeletion extends FileDeletionBase<Snapshot> 
{
         }
 
         Set<Path> dataFileToDelete = new HashSet<>();
-        for (ManifestEntry entry : manifestEntries) {
+        for (ExpireFileEntry entry : manifestEntries) {
             if (!skipper.test(entry)) {
                 Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
-                dataFileToDelete.add(new Path(bucketPath, 
entry.file().fileName()));
-                for (String file : entry.file().extraFiles()) {
+                dataFileToDelete.add(new Path(bucketPath, entry.fileName()));
+                for (String file : entry.extraFiles()) {
                     dataFileToDelete.add(new Path(bucketPath, file));
                 }
 
@@ -98,11 +98,12 @@ public class TagDeletion extends FileDeletionBase<Snapshot> 
{
         cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
     }
 
-    public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) 
throws Exception {
+    public Predicate<ExpireFileEntry> dataFileSkipper(Snapshot fromSnapshot) 
throws Exception {
         return dataFileSkipper(Collections.singletonList(fromSnapshot));
     }
 
-    public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot> 
fromSnapshots) throws Exception {
+    public Predicate<ExpireFileEntry> dataFileSkipper(List<Snapshot> 
fromSnapshots)
+            throws Exception {
         Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
         for (Snapshot snapshot : fromSnapshots) {
             addMergedDataFiles(skipped, snapshot);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index ce54975450..1ffa7485ae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.Changelog;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.utils.Preconditions;
@@ -147,7 +147,7 @@ public class ExpireChangelogImpl implements ExpireSnapshots 
{
                 LOG.debug("Ready to delete changelog files from changelog #" + 
id);
             }
             Changelog changelog = snapshotManager.longLivedChangelog(id);
-            Predicate<ManifestEntry> skipper;
+            Predicate<ExpireFileEntry> skipper;
             try {
                 skipper = 
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
             } catch (Exception e) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 2c83b63c97..dc1c2d6bdb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -22,7 +22,7 @@ import org.apache.paimon.Changelog;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.utils.Preconditions;
@@ -176,7 +176,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots 
{
                 continue;
             }
             // expire merge tree files and collect changed buckets
-            Predicate<ManifestEntry> skipper;
+            Predicate<ExpireFileEntry> skipper;
             try {
                 skipper = 
snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
             } catch (Exception e) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java 
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 1eb4ccf001..29fecec113 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.Changelog;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
@@ -205,7 +205,7 @@ public class RollbackHelper {
         }
 
         // delete data files
-        Predicate<ManifestEntry> dataFileSkipper = null;
+        Predicate<ExpireFileEntry> dataFileSkipper = null;
         boolean success = true;
         try {
             dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 1e05a100d7..4019395d8d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -23,7 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.Tag;
@@ -255,7 +255,7 @@ public class TagManager {
         skippedSnapshots.add(right);
 
         // delete data files and empty directories
-        Predicate<ManifestEntry> dataFileSkipper = null;
+        Predicate<ExpireFileEntry> dataFileSkipper = null;
         boolean success = true;
         try {
             dataFileSkipper = tagDeletion.dataFileSkipper(skippedSnapshots);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 96dce3d784..9dc9834373 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -218,7 +219,9 @@ public class ExpireSnapshotsTest {
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
 
         // expire
-        expire.snapshotDeletion().cleanUnusedDataFile(Arrays.asList(add, 
delete));
+        expire.snapshotDeletion()
+                .cleanUnusedDataFile(
+                        Arrays.asList(ExpireFileEntry.from(add), 
ExpireFileEntry.from(delete)));
 
         // check
         assertThat(fileIO.exists(myDataFile)).isFalse();

Reply via email to