This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 154126a68 [core] Introduce Table.snapshot(long snapshotId) (#3982)
154126a68 is described below

commit 154126a687f9d8f0800aaa9faab3837bb326f013
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 16 18:47:55 2024 +0800

    [core] Introduce Table.snapshot(long snapshotId) (#3982)
---
 .../src/main/java/org/apache/paimon/Snapshot.java  | 107 +--------------------
 .../org/apache/paimon/manifest/ManifestEntry.java  |  18 ++++
 .../org/apache/paimon/manifest/ManifestList.java   |  48 +++++++++
 .../paimon/operation/AbstractFileStoreScan.java    |   8 +-
 .../apache/paimon/operation/FileDeletionBase.java  |   4 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  12 ++-
 .../org/apache/paimon/operation/FileStoreScan.java |  13 +++
 .../paimon/table/AbstractFileStoreTable.java       |   5 +
 .../paimon/table/DelegatedFileStoreTable.java      |   6 ++
 .../org/apache/paimon/table/ReadonlyTable.java     |   9 ++
 .../main/java/org/apache/paimon/table/Table.java   |   5 +
 .../apache/paimon/table/system/AuditLogTable.java  |   5 +
 .../apache/paimon/table/system/BucketsTable.java   |   6 ++
 .../paimon/table/system/FileMonitorTable.java      |   6 ++
 .../apache/paimon/table/system/ManifestsTable.java |   2 +-
 .../paimon/table/system/ReadOptimizedTable.java    |   6 ++
 .../org/apache/paimon/utils/SnapshotManager.java   |   6 +-
 .../java/org/apache/paimon/utils/TagManager.java   |   6 +-
 .../test/java/org/apache/paimon/TestFileStore.java |  15 +--
 .../apache/paimon/operation/FileDeletionTest.java  |  14 +--
 .../operation/KeyValueFileStoreScanTest.java       |   2 +-
 .../paimon/operation/OrphanFilesCleanTest.java     |   4 +-
 .../paimon/table/system/ManifestsTableTest.java    |   2 +-
 23 files changed, 173 insertions(+), 136 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 6102c321d..a16349ce5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -18,13 +18,9 @@
 
 package org.apache.paimon;
 
+import org.apache.paimon.annotation.Public;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
-import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -35,11 +31,7 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonPro
 
 import javax.annotation.Nullable;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -63,7 +55,10 @@ import java.util.Objects;
  *       commitIdentifier (which is a long value). Json can automatically 
perform type conversion so
  *       there is no compatibility issue.
  * </ul>
+ *
+ * @since 0.9.0
  */
+@Public
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Snapshot {
 
@@ -351,85 +346,6 @@ public class Snapshot {
         return statistics;
     }
 
-    /**
-     * Return all {@link ManifestFileMeta} instances for either data or 
changelog manifests in this
-     * snapshot.
-     *
-     * @param manifestList a {@link ManifestList} instance used for reading 
files at snapshot.
-     * @return a list of ManifestFileMeta.
-     */
-    public List<ManifestFileMeta> allManifests(ManifestList manifestList) {
-        List<ManifestFileMeta> result = new ArrayList<>();
-        result.addAll(dataManifests(manifestList));
-        result.addAll(changelogManifests(manifestList));
-        return result;
-    }
-
-    /**
-     * Return a {@link ManifestFileMeta} for each data manifest in this 
snapshot.
-     *
-     * @param manifestList a {@link ManifestList} instance used for reading 
files at snapshot.
-     * @return a list of ManifestFileMeta.
-     */
-    public List<ManifestFileMeta> dataManifests(ManifestList manifestList) {
-        List<ManifestFileMeta> result = new ArrayList<>();
-        result.addAll(manifestList.read(baseManifestList));
-        result.addAll(deltaManifests(manifestList));
-        return result;
-    }
-
-    /**
-     * Return a {@link ManifestFileMeta} for each delta manifest in this 
snapshot.
-     *
-     * @param manifestList a {@link ManifestList} instance used for reading 
files at snapshot.
-     * @return a list of ManifestFileMeta.
-     */
-    public List<ManifestFileMeta> deltaManifests(ManifestList manifestList) {
-        return manifestList.read(deltaManifestList);
-    }
-
-    /**
-     * Return a {@link ManifestFileMeta} for each changelog manifest in this 
snapshot.
-     *
-     * @param manifestList a {@link ManifestList} instance used for reading 
files at snapshot.
-     * @return a list of ManifestFileMeta.
-     */
-    public List<ManifestFileMeta> changelogManifests(ManifestList 
manifestList) {
-        return changelogManifestList == null
-                ? Collections.emptyList()
-                : manifestList.read(changelogManifestList);
-    }
-
-    /**
-     * Return record count of all changes occurred in this snapshot given the 
scan.
-     *
-     * @param scan a {@link FileStoreScan} instance used for count of reading 
files at snapshot.
-     * @return total record count of Snapshot.
-     */
-    public Long totalRecordCount(FileStoreScan scan) {
-        return totalRecordCount == null
-                ? recordCount(scan.withSnapshot(id).plan().files())
-                : totalRecordCount;
-    }
-
-    public static long recordCount(List<ManifestEntry> manifestEntries) {
-        return manifestEntries.stream().mapToLong(manifest -> 
manifest.file().rowCount()).sum();
-    }
-
-    public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
-        return manifestEntries.stream()
-                .filter(manifestEntry -> 
FileKind.ADD.equals(manifestEntry.kind()))
-                .mapToLong(manifest -> manifest.file().rowCount())
-                .sum();
-    }
-
-    public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
-        return manifestEntries.stream()
-                .filter(manifestEntry -> 
FileKind.DELETE.equals(manifestEntry.kind()))
-                .mapToLong(manifest -> manifest.file().rowCount())
-                .sum();
-    }
-
     public String toJson() {
         return JsonSerdeUtil.toJson(this);
     }
@@ -440,25 +356,12 @@ public class Snapshot {
 
     public static Snapshot fromPath(FileIO fileIO, Path path) {
         try {
-            return fromPathThrowsException(fileIO, path);
+            return Snapshot.fromJson(fileIO.readFileUtf8(path));
         } catch (IOException e) {
             throw new RuntimeException("Fails to read snapshot from path " + 
path, e);
         }
     }
 
-    @Nullable
-    public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws 
IOException {
-        try {
-            return fromPathThrowsException(fileIO, path);
-        } catch (FileNotFoundException e) {
-            return null;
-        }
-    }
-
-    private static Snapshot fromPathThrowsException(FileIO fileIO, Path path) 
throws IOException {
-        return Snapshot.fromJson(fileIO.readFileUtf8(path));
-    }
-
     @Override
     public int hashCode() {
         return Objects.hash(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 013f7ebd1..65cef5517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -197,4 +197,22 @@ public class ManifestEntry implements FileEntry {
             return true;
         };
     }
+
+    public static long recordCount(List<ManifestEntry> manifestEntries) {
+        return manifestEntries.stream().mapToLong(manifest -> 
manifest.file().rowCount()).sum();
+    }
+
+    public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
+        return manifestEntries.stream()
+                .filter(manifestEntry -> 
FileKind.ADD.equals(manifestEntry.kind()))
+                .mapToLong(manifest -> manifest.file().rowCount())
+                .sum();
+    }
+
+    public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
+        return manifestEntries.stream()
+                .filter(manifestEntry -> 
FileKind.DELETE.equals(manifestEntry.kind()))
+                .mapToLong(manifest -> manifest.file().rowCount())
+                .sum();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index 304730f30..2980e998b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriterFactory;
@@ -32,6 +33,8 @@ import org.apache.paimon.utils.VersionedObjectSerializer;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -60,6 +63,51 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
                 cache);
     }
 
+    /**
+     * Return all {@link ManifestFileMeta} instances for either data or 
changelog manifests in this
+     * snapshot.
+     *
+     * @return a list of ManifestFileMeta.
+     */
+    public List<ManifestFileMeta> readAllManifests(Snapshot snapshot) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        result.addAll(readDataManifests(snapshot));
+        result.addAll(readChangelogManifests(snapshot));
+        return result;
+    }
+
+    /**
+     * Return a {@link ManifestFileMeta} for each data manifest in this 
snapshot.
+     *
+     * @return a list of ManifestFileMeta.
+     */
+    public List<ManifestFileMeta> readDataManifests(Snapshot snapshot) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        result.addAll(read(snapshot.baseManifestList()));
+        result.addAll(readDeltaManifests(snapshot));
+        return result;
+    }
+
+    /**
+     * Return a {@link ManifestFileMeta} for each delta manifest in this 
snapshot.
+     *
+     * @return a list of ManifestFileMeta.
+     */
+    public List<ManifestFileMeta> readDeltaManifests(Snapshot snapshot) {
+        return read(snapshot.deltaManifestList());
+    }
+
+    /**
+     * Return a {@link ManifestFileMeta} for each changelog manifest in this 
snapshot.
+     *
+     * @return a list of ManifestFileMeta.
+     */
+    public List<ManifestFileMeta> readChangelogManifests(Snapshot snapshot) {
+        return snapshot.changelogManifestList() == null
+                ? Collections.emptyList()
+                : read(snapshot.changelogManifestList());
+    }
+
     /**
      * Write several {@link ManifestFileMeta}s into a manifest list.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 358a2722f..4bf83e520 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -403,18 +403,18 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
         switch (scanMode) {
             case ALL:
-                return snapshot.dataManifests(manifestList);
+                return manifestList.readDataManifests(snapshot);
             case DELTA:
-                return snapshot.deltaManifests(manifestList);
+                return manifestList.readDeltaManifests(snapshot);
             case CHANGELOG:
                 if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) {
-                    return snapshot.changelogManifests(manifestList);
+                    return manifestList.readChangelogManifests(snapshot);
                 }
 
                 // compatible with Paimon 0.2, we'll read extraFiles in 
DataFileMeta
                 // see comments on DataFileMeta#extraFiles
                 if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
-                    return snapshot.deltaManifests(manifestList);
+                    return manifestList.readDeltaManifests(snapshot);
                 }
                 throw new IllegalStateException(
                         String.format(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 0ce94bb9b..303a074b0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -347,7 +347,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
      * It is possible that a job was killed during expiration and some 
manifest files have been
      * deleted, so if the clean methods need to get manifests of a snapshot to 
be cleaned, we should
      * try to read manifests and return empty list if failed instead of 
calling {@link
-     * Snapshot#dataManifests} directly.
+     * ManifestList#readDataManifests} directly.
      */
     protected List<ManifestFileMeta> tryReadManifestList(String 
manifestListName) {
         try {
@@ -424,7 +424,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
             // data manifests
             skippingSet.add(skippingSnapshot.baseManifestList());
             skippingSet.add(skippingSnapshot.deltaManifestList());
-            skippingSnapshot.dataManifests(manifestList).stream()
+            manifestList.readDataManifests(skippingSnapshot).stream()
                     .map(ManifestFileMeta::fileName)
                     .forEach(skippingSet::add);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3a8d7195c..21aeee38d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -78,6 +78,9 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static org.apache.paimon.manifest.ManifestEntry.recordCount;
+import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
+import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
@@ -839,9 +842,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             Long currentWatermark = watermark;
             String previousIndexManifest = null;
             if (latestSnapshot != null) {
-                previousTotalRecordCount = 
latestSnapshot.totalRecordCount(scan);
+                previousTotalRecordCount = 
scan.totalRecordCount(latestSnapshot);
                 List<ManifestFileMeta> previousManifests =
-                        latestSnapshot.dataManifests(manifestList);
+                        manifestList.readDataManifests(latestSnapshot);
                 // read all previous manifest files
                 oldMetas.addAll(previousManifests);
                 // read the last snapshot to complete the bucket's offsets 
when logOffsets does not
@@ -872,8 +875,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             previousChangesListName = manifestList.write(newMetas);
 
             // the added records subtract the deleted records from
-            long deltaRecordCount =
-                    Snapshot.recordCountAdd(tableFiles) - 
Snapshot.recordCountDelete(tableFiles);
+            long deltaRecordCount = recordCountAdd(tableFiles) - 
recordCountDelete(tableFiles);
             long totalRecordCount = previousTotalRecordCount + 
deltaRecordCount;
 
             // write new changes into manifest files
@@ -928,7 +930,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             logOffsets,
                             totalRecordCount,
                             deltaRecordCount,
-                            Snapshot.recordCount(changelogFiles),
+                            recordCount(changelogFiles),
                             currentWatermark,
                             statsFileName);
         } catch (Throwable e) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index f5249efa9..744f2fe14 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -41,6 +41,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.manifest.ManifestEntry.recordCount;
+
 /** Scan operation which produces a plan. */
 public interface FileStoreScan {
 
@@ -77,6 +79,17 @@ public interface FileStoreScan {
     /** Produce a {@link Plan}. */
     Plan plan();
 
+    /**
+     * Return record count of all changes occurred in this snapshot given the 
scan.
+     *
+     * @return total record count of Snapshot.
+     */
+    default Long totalRecordCount(Snapshot snapshot) {
+        return snapshot.totalRecordCount() == null
+                ? (Long) 
recordCount(withSnapshot(snapshot.id()).plan().files())
+                : snapshot.totalRecordCount();
+    }
+
     /**
      * Read {@link SimpleFileEntry}s, SimpleFileEntry only retains some 
critical information, so it
      * cannot perform filtering based on statistical information.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a3b1735dd..3b61d3edb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -121,6 +121,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return snapshot == null ? OptionalLong.empty() : 
OptionalLong.of(snapshot);
     }
 
+    @Override
+    public Snapshot snapshot(long snapshotId) {
+        return store().snapshotManager().snapshot(snapshotId);
+    }
+
     @Override
     public String name() {
         return identifier().getObjectName();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 1ff64f2c1..a82d18247 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -128,6 +129,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         return wrapped.latestSnapshotId();
     }
 
+    @Override
+    public Snapshot snapshot(long snapshotId) {
+        return wrapped.snapshot(snapshotId);
+    }
+
     @Override
     public void rollbackTo(long snapshotId) {
         wrapped.rollbackTo(snapshotId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 4854f983d..6373e9944 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.InnerTableCommit;
@@ -112,6 +113,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default Snapshot snapshot(long snapshotId) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support snapshot.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void rollbackTo(long snapshotId) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 55cf25aea..c588a5f0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.stats.Statistics;
@@ -78,6 +79,10 @@ public interface Table extends Serializable {
     @Experimental
     OptionalLong latestSnapshotId();
 
+    /** Get the {@link Snapshot} from snapshot id. */
+    @Experimental
+    Snapshot snapshot(long snapshotId);
+
     /** Rollback table's state to a specific snapshot. */
     @Experimental
     void rollbackTo(long snapshotId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 559649331..526a71d10 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -106,6 +106,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         return wrapped.latestSnapshotId();
     }
 
+    @Override
+    public Snapshot snapshot(long snapshotId) {
+        return wrapped.snapshot(snapshotId);
+    }
+
     @Override
     public String name() {
         return wrapped.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 7b67f00bf..282839e43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.system;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -109,6 +110,11 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
         return wrapped.latestSnapshotId();
     }
 
+    @Override
+    public Snapshot snapshot(long snapshotId) {
+        return wrapped.snapshot(snapshotId);
+    }
+
     @Override
     public Path location() {
         return wrapped.location();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index bd4efc80e..cc3d7b5c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.system;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
@@ -96,6 +97,11 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
         return wrapped.latestSnapshotId();
     }
 
+    @Override
+    public Snapshot snapshot(long snapshotId) {
+        return wrapped.snapshot(snapshotId);
+    }
+
     @Override
     public Path location() {
         return wrapped.location();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 15210792b..6184dbdad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -213,6 +213,6 @@ public class ManifestsTable implements ReadonlyTable {
                                 fileStorePathFactory,
                                 null)
                         .create();
-        return snapshot.allManifests(manifestList);
+        return manifestList.readAllManifests(snapshot);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index e7b3bd171..9cb9abe1a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.system;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.DefaultValueAssigner;
@@ -66,6 +67,11 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
         return wrapped.latestSnapshotId();
     }
 
+    @Override
+    public Snapshot snapshot(long snapshotId) {
+        return wrapped.snapshot(snapshotId);
+    }
+
     @Override
     public String name() {
         return wrapped.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 9cce9233f..d64164d53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -407,9 +407,9 @@ public class SnapshotManager implements Serializable {
 
         List<Snapshot> snapshots = new ArrayList<>();
         for (Path path : paths) {
-            Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
-            if (snapshot != null) {
-                snapshots.add(snapshot);
+            try {
+                snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
+            } catch (FileNotFoundException ignored) {
             }
         }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 259a5bdbc..56ed8dacb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.LocalDateTime;
@@ -303,9 +304,10 @@ public class TagManager {
                 }
                 // If the tag file is not found, it might be deleted by
                 // other processes, so just skip this tag
-                Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
-                if (snapshot != null) {
+                try {
+                    Snapshot snapshot = 
Snapshot.fromJson(fileIO.readFileUtf8(path));
                     tags.computeIfAbsent(snapshot, s -> new 
ArrayList<>()).add(tagName);
+                } catch (FileNotFoundException ignored) {
                 }
             }
         } catch (IOException e) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index bd6950d77..303879337 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -620,7 +620,7 @@ public class TestFileStore extends KeyValueFileStore {
         }
 
         // manifests
-        List<ManifestFileMeta> manifests = snapshot.allManifests(manifestList);
+        List<ManifestFileMeta> manifests = 
manifestList.readAllManifests(snapshot);
         manifests.forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
 
         // data file
@@ -639,7 +639,8 @@ public class TestFileStore extends KeyValueFileStore {
         // but it can only be cleaned after this snapshot expired, so we 
should add it to the file
         // use list.
         if (changelogDecoupled && !produceChangelog) {
-            entries = 
scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files();
+            entries =
+                    
scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files();
             for (ManifestEntry entry : entries) {
                 // append delete file are delayed to delete
                 if (entry.kind() == FileKind.DELETE
@@ -686,9 +687,9 @@ public class TestFileStore extends KeyValueFileStore {
 
         // manifests
         List<ManifestFileMeta> manifests =
-                new ArrayList<>(changelog.changelogManifests(manifestList));
+                new 
ArrayList<>(manifestList.readChangelogManifests(changelog));
         if (!produceChangelog) {
-            manifests.addAll(changelog.dataManifests(manifestList));
+            manifests.addAll(manifestList.readDataManifests(changelog));
         }
 
         manifests.forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
@@ -701,7 +702,9 @@ public class TestFileStore extends KeyValueFileStore {
         // delta file
         if (!produceChangelog) {
             for (ManifestEntry entry :
-                    
scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) {
+                    
scan.withManifestList(manifestList.readDeltaManifests(changelog))
+                            .plan()
+                            .files()) {
                 if (entry.file().fileSource().orElse(FileSource.APPEND) == 
FileSource.APPEND) {
                     result.add(
                             new Path(
@@ -712,7 +715,7 @@ public class TestFileStore extends KeyValueFileStore {
         } else {
             // changelog
             for (ManifestEntry entry :
-                    
scan.withManifestList(changelog.changelogManifests(manifestList))
+                    
scan.withManifestList(manifestList.readChangelogManifests(changelog))
                             .plan()
                             .files()) {
                 result.add(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 3fb1d36ac..6ae74c669 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -315,7 +315,7 @@ public class FileDeletionTest {
         for (String tagName : Arrays.asList("tag1", "tag2")) {
             Snapshot snapshot = tagManager.taggedSnapshot(tagName);
             List<Path> manifestFilePaths =
-                    snapshot.dataManifests(manifestList).stream()
+                    manifestList.readDataManifests(snapshot).stream()
                             .map(ManifestFileMeta::fileName)
                             .map(pathFactory::toManifestFilePath)
                             .collect(Collectors.toList());
@@ -370,7 +370,7 @@ public class FileDeletionTest {
         Snapshot tag1 = tagManager.taggedSnapshot("tag1");
         ManifestList manifestList = store.manifestListFactory().create();
         List<Path> manifestFilePaths =
-                tag1.dataManifests(manifestList).stream()
+                manifestList.readDataManifests(tag1).stream()
                         .map(ManifestFileMeta::fileName)
                         .map(pathFactory::toManifestFilePath)
                         .collect(Collectors.toList());
@@ -409,9 +409,9 @@ public class FileDeletionTest {
 
         ManifestList manifestList = store.manifestListFactory().create();
         Snapshot snapshot1 = snapshotManager.snapshot(1);
-        List<ManifestFileMeta> snapshot1Data = 
snapshot1.dataManifests(manifestList);
+        List<ManifestFileMeta> snapshot1Data = 
manifestList.readDataManifests(snapshot1);
         Snapshot snapshot3 = snapshotManager.snapshot(3);
-        List<ManifestFileMeta> snapshot3Data = 
snapshot3.dataManifests(manifestList);
+        List<ManifestFileMeta> snapshot3Data = 
manifestList.readDataManifests(snapshot3);
 
         List<String> manifestLists =
                 Arrays.asList(snapshot1.baseManifestList(), 
snapshot1.deltaManifestList());
@@ -486,7 +486,7 @@ public class FileDeletionTest {
 
         ManifestList manifestList = store.manifestListFactory().create();
         Snapshot snapshot2 = snapshotManager.snapshot(2);
-        List<ManifestFileMeta> snapshot2Data = 
snapshot2.dataManifests(manifestList);
+        List<ManifestFileMeta> snapshot2Data = 
manifestList.readDataManifests(snapshot2);
 
         List<String> manifestLists =
                 Arrays.asList(snapshot2.baseManifestList(), 
snapshot2.deltaManifestList());
@@ -521,8 +521,8 @@ public class FileDeletionTest {
         // check manifests
         Snapshot tag1 = tagManager.taggedSnapshot("tag1");
         Snapshot tag3 = tagManager.taggedSnapshot("tag3");
-        List<ManifestFileMeta> existing = tag1.dataManifests(manifestList);
-        existing.addAll(tag3.dataManifests(manifestList));
+        List<ManifestFileMeta> existing = manifestList.readDataManifests(tag1);
+        existing.addAll(manifestList.readDataManifests(tag3));
         for (ManifestFileMeta manifestFileMeta : snapshot2Data) {
             Path path = 
pathFactory.toManifestFilePath(manifestFileMeta.fileName());
             if (existing.contains(manifestFileMeta)) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index c58136339..e4b74c13c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -263,7 +263,7 @@ public class KeyValueFileStoreScanTest {
         ManifestList manifestList = store.manifestListFactory().create();
         long wantedSnapshotId = 
random.nextLong(snapshotManager.latestSnapshotId()) + 1;
         Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
-        List<ManifestFileMeta> wantedManifests = 
wantedSnapshot.dataManifests(manifestList);
+        List<ManifestFileMeta> wantedManifests = 
manifestList.readDataManifests(wantedSnapshot);
 
         FileStoreScan scan = store.newScan();
         scan.withManifestList(wantedManifests);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 7e912a839..efb9fb27a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -395,8 +395,8 @@ public class OrphanFilesCleanTest {
         List<Path> manifests = new ArrayList<>();
         ManifestList manifestList = 
table.store().manifestListFactory().create();
         FileStorePathFactory pathFactory = table.store().pathFactory();
-        snapshot1
-                .allManifests(manifestList)
+        manifestList
+                .readAllManifests(snapshot1)
                 .forEach(m -> 
manifests.add(pathFactory.toManifestFilePath(m.fileName())));
 
         Path manifest = manifests.get(RANDOM.nextInt(manifests.size()));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index ffd4716e7..edca0831d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -128,7 +128,7 @@ public class ManifestsTableTest extends TableTestBase {
         }
 
         Snapshot snapshot = snapshotManager.snapshot(snapshotId);
-        List<ManifestFileMeta> allManifestMeta = 
snapshot.allManifests(manifestList);
+        List<ManifestFileMeta> allManifestMeta = 
manifestList.readAllManifests(snapshot);
 
         List<InternalRow> expectedRow = new ArrayList<>();
         for (ManifestFileMeta manifestFileMeta : allManifestMeta) {


Reply via email to