This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 154126a68 [core] Introduce Table.snapshot(long snapshotId) (#3982)
154126a68 is described below
commit 154126a687f9d8f0800aaa9faab3837bb326f013
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 16 18:47:55 2024 +0800
[core] Introduce Table.snapshot(long snapshotId) (#3982)
---
.../src/main/java/org/apache/paimon/Snapshot.java | 107 +--------------------
.../org/apache/paimon/manifest/ManifestEntry.java | 18 ++++
.../org/apache/paimon/manifest/ManifestList.java | 48 +++++++++
.../paimon/operation/AbstractFileStoreScan.java | 8 +-
.../apache/paimon/operation/FileDeletionBase.java | 4 +-
.../paimon/operation/FileStoreCommitImpl.java | 12 ++-
.../org/apache/paimon/operation/FileStoreScan.java | 13 +++
.../paimon/table/AbstractFileStoreTable.java | 5 +
.../paimon/table/DelegatedFileStoreTable.java | 6 ++
.../org/apache/paimon/table/ReadonlyTable.java | 9 ++
.../main/java/org/apache/paimon/table/Table.java | 5 +
.../apache/paimon/table/system/AuditLogTable.java | 5 +
.../apache/paimon/table/system/BucketsTable.java | 6 ++
.../paimon/table/system/FileMonitorTable.java | 6 ++
.../apache/paimon/table/system/ManifestsTable.java | 2 +-
.../paimon/table/system/ReadOptimizedTable.java | 6 ++
.../org/apache/paimon/utils/SnapshotManager.java | 6 +-
.../java/org/apache/paimon/utils/TagManager.java | 6 +-
.../test/java/org/apache/paimon/TestFileStore.java | 15 +--
.../apache/paimon/operation/FileDeletionTest.java | 14 +--
.../operation/KeyValueFileStoreScanTest.java | 2 +-
.../paimon/operation/OrphanFilesCleanTest.java | 4 +-
.../paimon/table/system/ManifestsTableTest.java | 2 +-
23 files changed, 173 insertions(+), 136 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 6102c321d..a16349ce5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -18,13 +18,9 @@
package org.apache.paimon;
+import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
-import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -35,11 +31,7 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonPro
import javax.annotation.Nullable;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -63,7 +55,10 @@ import java.util.Objects;
* commitIdentifier (which is a long value). Json can automatically
perform type conversion so
* there is no compatibility issue.
* </ul>
+ *
+ * @since 0.9.0
*/
+@Public
@JsonIgnoreProperties(ignoreUnknown = true)
public class Snapshot {
@@ -351,85 +346,6 @@ public class Snapshot {
return statistics;
}
- /**
- * Return all {@link ManifestFileMeta} instances for either data or
changelog manifests in this
- * snapshot.
- *
- * @param manifestList a {@link ManifestList} instance used for reading
files at snapshot.
- * @return a list of ManifestFileMeta.
- */
- public List<ManifestFileMeta> allManifests(ManifestList manifestList) {
- List<ManifestFileMeta> result = new ArrayList<>();
- result.addAll(dataManifests(manifestList));
- result.addAll(changelogManifests(manifestList));
- return result;
- }
-
- /**
- * Return a {@link ManifestFileMeta} for each data manifest in this
snapshot.
- *
- * @param manifestList a {@link ManifestList} instance used for reading
files at snapshot.
- * @return a list of ManifestFileMeta.
- */
- public List<ManifestFileMeta> dataManifests(ManifestList manifestList) {
- List<ManifestFileMeta> result = new ArrayList<>();
- result.addAll(manifestList.read(baseManifestList));
- result.addAll(deltaManifests(manifestList));
- return result;
- }
-
- /**
- * Return a {@link ManifestFileMeta} for each delta manifest in this
snapshot.
- *
- * @param manifestList a {@link ManifestList} instance used for reading
files at snapshot.
- * @return a list of ManifestFileMeta.
- */
- public List<ManifestFileMeta> deltaManifests(ManifestList manifestList) {
- return manifestList.read(deltaManifestList);
- }
-
- /**
- * Return a {@link ManifestFileMeta} for each changelog manifest in this
snapshot.
- *
- * @param manifestList a {@link ManifestList} instance used for reading
files at snapshot.
- * @return a list of ManifestFileMeta.
- */
- public List<ManifestFileMeta> changelogManifests(ManifestList
manifestList) {
- return changelogManifestList == null
- ? Collections.emptyList()
- : manifestList.read(changelogManifestList);
- }
-
- /**
- * Return record count of all changes occurred in this snapshot given the
scan.
- *
- * @param scan a {@link FileStoreScan} instance used for count of reading
files at snapshot.
- * @return total record count of Snapshot.
- */
- public Long totalRecordCount(FileStoreScan scan) {
- return totalRecordCount == null
- ? recordCount(scan.withSnapshot(id).plan().files())
- : totalRecordCount;
- }
-
- public static long recordCount(List<ManifestEntry> manifestEntries) {
- return manifestEntries.stream().mapToLong(manifest ->
manifest.file().rowCount()).sum();
- }
-
- public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
- return manifestEntries.stream()
- .filter(manifestEntry ->
FileKind.ADD.equals(manifestEntry.kind()))
- .mapToLong(manifest -> manifest.file().rowCount())
- .sum();
- }
-
- public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
- return manifestEntries.stream()
- .filter(manifestEntry ->
FileKind.DELETE.equals(manifestEntry.kind()))
- .mapToLong(manifest -> manifest.file().rowCount())
- .sum();
- }
-
public String toJson() {
return JsonSerdeUtil.toJson(this);
}
@@ -440,25 +356,12 @@ public class Snapshot {
public static Snapshot fromPath(FileIO fileIO, Path path) {
try {
- return fromPathThrowsException(fileIO, path);
+ return Snapshot.fromJson(fileIO.readFileUtf8(path));
} catch (IOException e) {
throw new RuntimeException("Fails to read snapshot from path " +
path, e);
}
}
- @Nullable
- public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws
IOException {
- try {
- return fromPathThrowsException(fileIO, path);
- } catch (FileNotFoundException e) {
- return null;
- }
- }
-
- private static Snapshot fromPathThrowsException(FileIO fileIO, Path path)
throws IOException {
- return Snapshot.fromJson(fileIO.readFileUtf8(path));
- }
-
@Override
public int hashCode() {
return Objects.hash(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 013f7ebd1..65cef5517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -197,4 +197,22 @@ public class ManifestEntry implements FileEntry {
return true;
};
}
+
+ public static long recordCount(List<ManifestEntry> manifestEntries) {
+ return manifestEntries.stream().mapToLong(manifest ->
manifest.file().rowCount()).sum();
+ }
+
+ public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
+ return manifestEntries.stream()
+ .filter(manifestEntry ->
FileKind.ADD.equals(manifestEntry.kind()))
+ .mapToLong(manifest -> manifest.file().rowCount())
+ .sum();
+ }
+
+ public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
+ return manifestEntries.stream()
+ .filter(manifestEntry ->
FileKind.DELETE.equals(manifestEntry.kind()))
+ .mapToLong(manifest -> manifest.file().rowCount())
+ .sum();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index 304730f30..2980e998b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -18,6 +18,7 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
@@ -32,6 +33,8 @@ import org.apache.paimon.utils.VersionedObjectSerializer;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -60,6 +63,51 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
cache);
}
+ /**
+ * Return all {@link ManifestFileMeta} instances for either data or
changelog manifests in this
+ * snapshot.
+ *
+ * @return a list of ManifestFileMeta.
+ */
+ public List<ManifestFileMeta> readAllManifests(Snapshot snapshot) {
+ List<ManifestFileMeta> result = new ArrayList<>();
+ result.addAll(readDataManifests(snapshot));
+ result.addAll(readChangelogManifests(snapshot));
+ return result;
+ }
+
+ /**
+ * Return a {@link ManifestFileMeta} for each data manifest in this
snapshot.
+ *
+ * @return a list of ManifestFileMeta.
+ */
+ public List<ManifestFileMeta> readDataManifests(Snapshot snapshot) {
+ List<ManifestFileMeta> result = new ArrayList<>();
+ result.addAll(read(snapshot.baseManifestList()));
+ result.addAll(readDeltaManifests(snapshot));
+ return result;
+ }
+
+ /**
+ * Return a {@link ManifestFileMeta} for each delta manifest in this
snapshot.
+ *
+ * @return a list of ManifestFileMeta.
+ */
+ public List<ManifestFileMeta> readDeltaManifests(Snapshot snapshot) {
+ return read(snapshot.deltaManifestList());
+ }
+
+ /**
+ * Return a {@link ManifestFileMeta} for each changelog manifest in this
snapshot.
+ *
+ * @return a list of ManifestFileMeta.
+ */
+ public List<ManifestFileMeta> readChangelogManifests(Snapshot snapshot) {
+ return snapshot.changelogManifestList() == null
+ ? Collections.emptyList()
+ : read(snapshot.changelogManifestList());
+ }
+
/**
* Write several {@link ManifestFileMeta}s into a manifest list.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 358a2722f..4bf83e520 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -403,18 +403,18 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
switch (scanMode) {
case ALL:
- return snapshot.dataManifests(manifestList);
+ return manifestList.readDataManifests(snapshot);
case DELTA:
- return snapshot.deltaManifests(manifestList);
+ return manifestList.readDeltaManifests(snapshot);
case CHANGELOG:
if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) {
- return snapshot.changelogManifests(manifestList);
+ return manifestList.readChangelogManifests(snapshot);
}
// compatible with Paimon 0.2, we'll read extraFiles in
DataFileMeta
// see comments on DataFileMeta#extraFiles
if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
- return snapshot.deltaManifests(manifestList);
+ return manifestList.readDeltaManifests(snapshot);
}
throw new IllegalStateException(
String.format(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 0ce94bb9b..303a074b0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -347,7 +347,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
* It is possible that a job was killed during expiration and some
manifest files have been
* deleted, so if the clean methods need to get manifests of a snapshot to
be cleaned, we should
* try to read manifests and return empty list if failed instead of
calling {@link
- * Snapshot#dataManifests} directly.
+ * ManifestList#readDataManifests} directly.
*/
protected List<ManifestFileMeta> tryReadManifestList(String
manifestListName) {
try {
@@ -424,7 +424,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
// data manifests
skippingSet.add(skippingSnapshot.baseManifestList());
skippingSet.add(skippingSnapshot.deltaManifestList());
- skippingSnapshot.dataManifests(manifestList).stream()
+ manifestList.readDataManifests(skippingSnapshot).stream()
.map(ManifestFileMeta::fileName)
.forEach(skippingSet::add);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3a8d7195c..21aeee38d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -78,6 +78,9 @@ import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static org.apache.paimon.manifest.ManifestEntry.recordCount;
+import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
+import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
@@ -839,9 +842,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
Long currentWatermark = watermark;
String previousIndexManifest = null;
if (latestSnapshot != null) {
- previousTotalRecordCount =
latestSnapshot.totalRecordCount(scan);
+ previousTotalRecordCount =
scan.totalRecordCount(latestSnapshot);
List<ManifestFileMeta> previousManifests =
- latestSnapshot.dataManifests(manifestList);
+ manifestList.readDataManifests(latestSnapshot);
// read all previous manifest files
oldMetas.addAll(previousManifests);
// read the last snapshot to complete the bucket's offsets
when logOffsets does not
@@ -872,8 +875,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
previousChangesListName = manifestList.write(newMetas);
// the added records subtract the deleted records from
- long deltaRecordCount =
- Snapshot.recordCountAdd(tableFiles) -
Snapshot.recordCountDelete(tableFiles);
+ long deltaRecordCount = recordCountAdd(tableFiles) -
recordCountDelete(tableFiles);
long totalRecordCount = previousTotalRecordCount +
deltaRecordCount;
// write new changes into manifest files
@@ -928,7 +930,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
logOffsets,
totalRecordCount,
deltaRecordCount,
- Snapshot.recordCount(changelogFiles),
+ recordCount(changelogFiles),
currentWatermark,
statsFileName);
} catch (Throwable e) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index f5249efa9..744f2fe14 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -41,6 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.manifest.ManifestEntry.recordCount;
+
/** Scan operation which produces a plan. */
public interface FileStoreScan {
@@ -77,6 +79,17 @@ public interface FileStoreScan {
/** Produce a {@link Plan}. */
Plan plan();
+ /**
+ * Return record count of all changes occurred in this snapshot given the
scan.
+ *
+ * @return total record count of Snapshot.
+ */
+ default Long totalRecordCount(Snapshot snapshot) {
+ return snapshot.totalRecordCount() == null
+ ? (Long)
recordCount(withSnapshot(snapshot.id()).plan().files())
+ : snapshot.totalRecordCount();
+ }
+
/**
* Read {@link SimpleFileEntry}s, SimpleFileEntry only retains some
critical information, so it
* cannot perform filtering based on statistical information.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a3b1735dd..3b61d3edb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -121,6 +121,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
return snapshot == null ? OptionalLong.empty() :
OptionalLong.of(snapshot);
}
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return store().snapshotManager().snapshot(snapshotId);
+ }
+
@Override
public String name() {
return identifier().getObjectName();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 1ff64f2c1..a82d18247 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -128,6 +129,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
return wrapped.latestSnapshotId();
}
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return wrapped.snapshot(snapshotId);
+ }
+
@Override
public void rollbackTo(long snapshotId) {
wrapped.rollbackTo(snapshotId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 4854f983d..6373e9944 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
@@ -112,6 +113,14 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default Snapshot snapshot(long snapshotId) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support snapshot.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default void rollbackTo(long snapshotId) {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 55cf25aea..c588a5f0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.stats.Statistics;
@@ -78,6 +79,10 @@ public interface Table extends Serializable {
@Experimental
OptionalLong latestSnapshotId();
+ /** Get the {@link Snapshot} from snapshot id. */
+ @Experimental
+ Snapshot snapshot(long snapshotId);
+
/** Rollback table's state to a specific snapshot. */
@Experimental
void rollbackTo(long snapshotId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 559649331..526a71d10 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -106,6 +106,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return wrapped.latestSnapshotId();
}
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return wrapped.snapshot(snapshotId);
+ }
+
@Override
public String name() {
return wrapped.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 7b67f00bf..282839e43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -109,6 +110,11 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
return wrapped.latestSnapshotId();
}
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return wrapped.snapshot(snapshotId);
+ }
+
@Override
public Path location() {
return wrapped.location();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index bd4efc80e..cc3d7b5c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
@@ -96,6 +97,11 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
return wrapped.latestSnapshotId();
}
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return wrapped.snapshot(snapshotId);
+ }
+
@Override
public Path location() {
return wrapped.location();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 15210792b..6184dbdad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -213,6 +213,6 @@ public class ManifestsTable implements ReadonlyTable {
fileStorePathFactory,
null)
.create();
- return snapshot.allManifests(manifestList);
+ return manifestList.readAllManifests(snapshot);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index e7b3bd171..9cb9abe1a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.DefaultValueAssigner;
@@ -66,6 +67,11 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
return wrapped.latestSnapshotId();
}
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return wrapped.snapshot(snapshotId);
+ }
+
@Override
public String name() {
return wrapped.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 9cce9233f..d64164d53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -407,9 +407,9 @@ public class SnapshotManager implements Serializable {
List<Snapshot> snapshots = new ArrayList<>();
for (Path path : paths) {
- Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
- if (snapshot != null) {
- snapshots.add(snapshot);
+ try {
+ snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
+ } catch (FileNotFoundException ignored) {
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 259a5bdbc..56ed8dacb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
@@ -303,9 +304,10 @@ public class TagManager {
}
// If the tag file is not found, it might be deleted by
// other processes, so just skip this tag
- Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path);
- if (snapshot != null) {
+ try {
+ Snapshot snapshot =
Snapshot.fromJson(fileIO.readFileUtf8(path));
tags.computeIfAbsent(snapshot, s -> new
ArrayList<>()).add(tagName);
+ } catch (FileNotFoundException ignored) {
}
}
} catch (IOException e) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index bd6950d77..303879337 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -620,7 +620,7 @@ public class TestFileStore extends KeyValueFileStore {
}
// manifests
- List<ManifestFileMeta> manifests = snapshot.allManifests(manifestList);
+ List<ManifestFileMeta> manifests =
manifestList.readAllManifests(snapshot);
manifests.forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
// data file
@@ -639,7 +639,8 @@ public class TestFileStore extends KeyValueFileStore {
// but it can only be cleaned after this snapshot expired, so we
should add it to the file
// use list.
if (changelogDecoupled && !produceChangelog) {
- entries =
scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files();
+ entries =
+
scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files();
for (ManifestEntry entry : entries) {
// append delete file are delayed to delete
if (entry.kind() == FileKind.DELETE
@@ -686,9 +687,9 @@ public class TestFileStore extends KeyValueFileStore {
// manifests
List<ManifestFileMeta> manifests =
- new ArrayList<>(changelog.changelogManifests(manifestList));
+ new
ArrayList<>(manifestList.readChangelogManifests(changelog));
if (!produceChangelog) {
- manifests.addAll(changelog.dataManifests(manifestList));
+ manifests.addAll(manifestList.readDataManifests(changelog));
}
manifests.forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
@@ -701,7 +702,9 @@ public class TestFileStore extends KeyValueFileStore {
// delta file
if (!produceChangelog) {
for (ManifestEntry entry :
-
scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) {
+
scan.withManifestList(manifestList.readDeltaManifests(changelog))
+ .plan()
+ .files()) {
if (entry.file().fileSource().orElse(FileSource.APPEND) ==
FileSource.APPEND) {
result.add(
new Path(
@@ -712,7 +715,7 @@ public class TestFileStore extends KeyValueFileStore {
} else {
// changelog
for (ManifestEntry entry :
-
scan.withManifestList(changelog.changelogManifests(manifestList))
+
scan.withManifestList(manifestList.readChangelogManifests(changelog))
.plan()
.files()) {
result.add(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 3fb1d36ac..6ae74c669 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -315,7 +315,7 @@ public class FileDeletionTest {
for (String tagName : Arrays.asList("tag1", "tag2")) {
Snapshot snapshot = tagManager.taggedSnapshot(tagName);
List<Path> manifestFilePaths =
- snapshot.dataManifests(manifestList).stream()
+ manifestList.readDataManifests(snapshot).stream()
.map(ManifestFileMeta::fileName)
.map(pathFactory::toManifestFilePath)
.collect(Collectors.toList());
@@ -370,7 +370,7 @@ public class FileDeletionTest {
Snapshot tag1 = tagManager.taggedSnapshot("tag1");
ManifestList manifestList = store.manifestListFactory().create();
List<Path> manifestFilePaths =
- tag1.dataManifests(manifestList).stream()
+ manifestList.readDataManifests(tag1).stream()
.map(ManifestFileMeta::fileName)
.map(pathFactory::toManifestFilePath)
.collect(Collectors.toList());
@@ -409,9 +409,9 @@ public class FileDeletionTest {
ManifestList manifestList = store.manifestListFactory().create();
Snapshot snapshot1 = snapshotManager.snapshot(1);
- List<ManifestFileMeta> snapshot1Data =
snapshot1.dataManifests(manifestList);
+ List<ManifestFileMeta> snapshot1Data =
manifestList.readDataManifests(snapshot1);
Snapshot snapshot3 = snapshotManager.snapshot(3);
- List<ManifestFileMeta> snapshot3Data =
snapshot3.dataManifests(manifestList);
+ List<ManifestFileMeta> snapshot3Data =
manifestList.readDataManifests(snapshot3);
List<String> manifestLists =
Arrays.asList(snapshot1.baseManifestList(),
snapshot1.deltaManifestList());
@@ -486,7 +486,7 @@ public class FileDeletionTest {
ManifestList manifestList = store.manifestListFactory().create();
Snapshot snapshot2 = snapshotManager.snapshot(2);
- List<ManifestFileMeta> snapshot2Data =
snapshot2.dataManifests(manifestList);
+ List<ManifestFileMeta> snapshot2Data =
manifestList.readDataManifests(snapshot2);
List<String> manifestLists =
Arrays.asList(snapshot2.baseManifestList(),
snapshot2.deltaManifestList());
@@ -521,8 +521,8 @@ public class FileDeletionTest {
// check manifests
Snapshot tag1 = tagManager.taggedSnapshot("tag1");
Snapshot tag3 = tagManager.taggedSnapshot("tag3");
- List<ManifestFileMeta> existing = tag1.dataManifests(manifestList);
- existing.addAll(tag3.dataManifests(manifestList));
+ List<ManifestFileMeta> existing = manifestList.readDataManifests(tag1);
+ existing.addAll(manifestList.readDataManifests(tag3));
for (ManifestFileMeta manifestFileMeta : snapshot2Data) {
Path path =
pathFactory.toManifestFilePath(manifestFileMeta.fileName());
if (existing.contains(manifestFileMeta)) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index c58136339..e4b74c13c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -263,7 +263,7 @@ public class KeyValueFileStoreScanTest {
ManifestList manifestList = store.manifestListFactory().create();
long wantedSnapshotId =
random.nextLong(snapshotManager.latestSnapshotId()) + 1;
Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
- List<ManifestFileMeta> wantedManifests =
wantedSnapshot.dataManifests(manifestList);
+ List<ManifestFileMeta> wantedManifests =
manifestList.readDataManifests(wantedSnapshot);
FileStoreScan scan = store.newScan();
scan.withManifestList(wantedManifests);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 7e912a839..efb9fb27a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -395,8 +395,8 @@ public class OrphanFilesCleanTest {
List<Path> manifests = new ArrayList<>();
ManifestList manifestList =
table.store().manifestListFactory().create();
FileStorePathFactory pathFactory = table.store().pathFactory();
- snapshot1
- .allManifests(manifestList)
+ manifestList
+ .readAllManifests(snapshot1)
.forEach(m ->
manifests.add(pathFactory.toManifestFilePath(m.fileName())));
Path manifest = manifests.get(RANDOM.nextInt(manifests.size()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index ffd4716e7..edca0831d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -128,7 +128,7 @@ public class ManifestsTableTest extends TableTestBase {
}
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
- List<ManifestFileMeta> allManifestMeta =
snapshot.allManifests(manifestList);
+ List<ManifestFileMeta> allManifestMeta =
manifestList.readAllManifests(snapshot);
List<InternalRow> expectedRow = new ArrayList<>();
for (ManifestFileMeta manifestFileMeta : allManifestMeta) {