This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a7486b0dbc [rest] Introduce snapshot loading to REST Catalog (#5147)
a7486b0dbc is described below

commit a7486b0dbc93e6ecd515e728569dff3428dbbef5
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Feb 25 15:41:47 2025 +0800

    [rest] Introduce snapshot loading to REST Catalog (#5147)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  7 ++-
 .../org/apache/paimon/catalog/CatalogUtils.java    |  6 ++-
 .../SupportsSnapshots.java}                        | 23 ++++++---
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 48 +++++++++++++++---
 .../org/apache/paimon/rest/RESTTokenFileIO.java    |  7 ++-
 .../java/org/apache/paimon/rest/ResourcePaths.java |  4 ++
 .../rest/responses/ErrorResponseResourceType.java  |  3 +-
 .../rest/responses/GetTableSnapshotResponse.java   | 47 ++++++++++++++++++
 .../org/apache/paimon/schema/SchemaManager.java    |  3 +-
 .../apache/paimon/table/CatalogEnvironment.java    | 17 ++++++-
 .../org/apache/paimon/tag/SnapshotLoaderImpl.java  | 57 ++++++++++++++++++++++
 .../SnapshotLoader.java}                           | 20 +++++---
 .../org/apache/paimon/utils/SnapshotManager.java   | 30 +++++++-----
 .../test/java/org/apache/paimon/SnapshotTest.java  | 17 ++++++-
 .../append/AppendOnlyTableCompactionTest.java      |  3 +-
 .../paimon/operation/PartitionExpireTest.java      |  2 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 24 +++++++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 22 +++++++++
 .../paimon/table/FileStoreTableTestBase.java       |  9 ++--
 .../table/IncrementalTimeStampTableTest.java       |  9 ++--
 .../apache/paimon/table/system/FilesTableTest.java |  3 +-
 .../paimon/table/system/ManifestsTableTest.java    |  3 +-
 .../paimon/table/system/SnapshotsTableTest.java    |  3 +-
 .../apache/paimon/utils/SnapshotManagerTest.java   | 18 ++++---
 .../paimon/flink/ContinuousFileStoreITCase.java    |  3 +-
 .../org/apache/paimon/flink/CatalogITCaseBase.java |  6 ++-
 .../paimon/flink/ContinuousFileStoreITCase.java    |  3 +-
 .../apache/paimon/flink/RescaleBucketITCase.java   |  3 +-
 .../paimon/flink/sink/CommitterOperatorTest.java   |  3 +-
 .../paimon/flink/sink/StoreMultiCommitterTest.java |  6 +--
 .../flink/source/TestChangelogDataReadWrite.java   |  3 +-
 paimon-open-api/rest-catalog-open-api.yaml         | 42 ++++++++++++++++
 .../paimon/open/api/RESTCatalogController.java     | 26 ++++++++++
 33 files changed, 410 insertions(+), 70 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index b3398035b5..b06c542629 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -168,7 +168,12 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @Override
     public SnapshotManager snapshotManager() {
-        return new SnapshotManager(fileIO, options.path(), options.branch(), 
snapshotCache);
+        return new SnapshotManager(
+                fileIO,
+                options.path(),
+                options.branch(),
+                catalogEnvironment.snapshotLoader(),
+                snapshotCache);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 14b49521bf..de7ec83755 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -193,7 +193,11 @@ public class CatalogUtils {
 
         CatalogEnvironment catalogEnv =
                 new CatalogEnvironment(
-                        identifier, metadata.uuid(), catalog.catalogLoader(), 
commitFactory);
+                        identifier,
+                        metadata.uuid(),
+                        catalog.catalogLoader(),
+                        commitFactory,
+                        catalog instanceof SupportsSnapshots);
         Path path = new Path(schema.options().get(PATH.key()));
         FileStoreTable table =
                 FileStoreTableFactory.create(dataFileIO.apply(path), path, 
schema, catalogEnv);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
 b/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java
similarity index 58%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
copy to 
paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java
index 5dc6cffade..1df80c131d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java
@@ -16,12 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest.responses;
+package org.apache.paimon.catalog;
 
-/** The type of resource that caused the error. */
-public enum ErrorResponseResourceType {
-    DATABASE,
-    TABLE,
-    COLUMN,
-    VIEW
+import org.apache.paimon.Snapshot;
+
+import java.util.Optional;
+
+/** A {@link Catalog} supports loading table snapshots. */
+public interface SupportsSnapshots {
+
+    /**
+     * Return the snapshot of table identified by the given {@link Identifier}.
+     *
+     * @param identifier Path of the table
+     * @return The requested snapshot of the table
+     * @throws Catalog.TableNotExistException if the target does not exist
+     */
+    Optional<Snapshot> loadSnapshot(Identifier identifier) throws 
Catalog.TableNotExistException;
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index fedaa7251d..341379e523 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.SupportsSnapshots;
 import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -58,6 +59,7 @@ import 
org.apache.paimon.rest.responses.CreateDatabaseResponse;
 import org.apache.paimon.rest.responses.ErrorResponseResourceType;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -83,6 +85,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -100,7 +103,7 @@ import static 
org.apache.paimon.rest.auth.AuthSession.createAuthSession;
 import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
 
 /** A catalog implementation for REST. */
-public class RESTCatalog implements Catalog {
+public class RESTCatalog implements Catalog, SupportsSnapshots {
 
     public static final String HEADER_PREFIX = "header.";
 
@@ -297,11 +300,44 @@ public class RESTCatalog implements Catalog {
         }
     }
 
-    protected GetTableTokenResponse loadTableToken(Identifier identifier) {
-        return client.get(
-                resourcePaths.tableToken(identifier.getDatabaseName(), 
identifier.getObjectName()),
-                GetTableTokenResponse.class,
-                restAuthFunction);
+    protected GetTableTokenResponse loadTableToken(Identifier identifier)
+            throws TableNotExistException {
+        GetTableTokenResponse response;
+        try {
+            response =
+                    client.get(
+                            resourcePaths.tableToken(
+                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
+                            GetTableTokenResponse.class,
+                            restAuthFunction);
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+        return response;
+    }
+
+    @Override
+    public Optional<Snapshot> loadSnapshot(Identifier identifier) throws 
TableNotExistException {
+        GetTableSnapshotResponse response;
+        try {
+            response =
+                    client.get(
+                            resourcePaths.tableSnapshot(
+                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
+                            GetTableSnapshotResponse.class,
+                            restAuthFunction);
+        } catch (NoSuchResourceException e) {
+            if (e.resourceType() == ErrorResponseResourceType.SNAPSHOT) {
+                return Optional.empty();
+            }
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+
+        return Optional.of(response.getSnapshot());
     }
 
     public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index 42dddf3411..dfb4c6546d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
@@ -182,7 +183,11 @@ public class RESTTokenFileIO implements FileIO {
     private void refreshToken() {
         GetTableTokenResponse response;
         if (catalogInstance != null) {
-            response = catalogInstance.loadTableToken(identifier);
+            try {
+                response = catalogInstance.loadTableToken(identifier);
+            } catch (Catalog.TableNotExistException e) {
+                throw new RuntimeException(e);
+            }
         } else {
             try (RESTCatalog catalog = catalogLoader.load()) {
                 response = catalog.loadTableToken(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 69560f36e3..de6a35d010 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -74,6 +74,10 @@ public class ResourcePaths {
         return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, "token");
     }
 
+    public String tableSnapshot(String databaseName, String tableName) {
+        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, "snapshot");
+    }
+
     public String partitions(String databaseName, String tableName) {
         return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, "partitions");
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
index 5dc6cffade..cb05c6c6c5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
@@ -23,5 +23,6 @@ public enum ErrorResponseResourceType {
     DATABASE,
     TABLE,
     COLUMN,
-    VIEW
+    VIEW,
+    SNAPSHOT
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableSnapshotResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableSnapshotResponse.java
new file mode 100644
index 0000000000..c1d25d0b40
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableSnapshotResponse.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.rest.RESTResponse;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response for table snapshot. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GetTableSnapshotResponse implements RESTResponse {
+
+    private static final String FIELD_SNAPSHOT = "snapshot";
+
+    @JsonProperty(FIELD_SNAPSHOT)
+    private final Snapshot snapshot;
+
+    @JsonCreator
+    public GetTableSnapshotResponse(@JsonProperty(FIELD_SNAPSHOT) Snapshot 
snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @JsonGetter(FIELD_SNAPSHOT)
+    public Snapshot getSnapshot() {
+        return snapshot;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 98e3b48fe4..6213503a47 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -270,7 +270,8 @@ public class SchemaManager implements Serializable {
     public TableSchema commitChanges(List<SchemaChange> changes)
             throws Catalog.TableNotExistException, 
Catalog.ColumnAlreadyExistException,
                     Catalog.ColumnNotExistException {
-        SnapshotManager snapshotManager = new SnapshotManager(fileIO, 
tableRoot, branch);
+        SnapshotManager snapshotManager =
+                new SnapshotManager(fileIO, tableRoot, branch, null, null);
         boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);
 
         while (true) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 9ae7a8b028..8490dfb562 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -22,6 +22,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.tag.SnapshotLoaderImpl;
+import org.apache.paimon.utils.SnapshotLoader;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -37,20 +39,23 @@ public class CatalogEnvironment implements Serializable {
     @Nullable private final String uuid;
     @Nullable private final CatalogLoader catalogLoader;
     @Nullable private final SnapshotCommit.Factory commitFactory;
+    private final boolean supportsSnapshots;
 
     public CatalogEnvironment(
             @Nullable Identifier identifier,
             @Nullable String uuid,
             @Nullable CatalogLoader catalogLoader,
-            @Nullable SnapshotCommit.Factory commitFactory) {
+            @Nullable SnapshotCommit.Factory commitFactory,
+            boolean supportsSnapshots) {
         this.identifier = identifier;
         this.uuid = uuid;
         this.catalogLoader = catalogLoader;
         this.commitFactory = commitFactory;
+        this.supportsSnapshots = supportsSnapshots;
     }
 
     public static CatalogEnvironment empty() {
-        return new CatalogEnvironment(null, null, null, null);
+        return new CatalogEnvironment(null, null, null, null, false);
     }
 
     @Nullable
@@ -79,6 +84,14 @@ public class CatalogEnvironment implements Serializable {
         return PartitionHandler.create(catalogLoader.load(), identifier);
     }
 
+    @Nullable
+    public SnapshotLoader snapshotLoader() {
+        if (catalogLoader == null || !supportsSnapshots) {
+            return null;
+        }
+        return new SnapshotLoaderImpl(catalogLoader, identifier);
+    }
+
     @VisibleForTesting
     public SnapshotCommit.Factory commitFactory() {
         return commitFactory;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/SnapshotLoaderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/SnapshotLoaderImpl.java
new file mode 100644
index 0000000000..bd275ee29c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/SnapshotLoaderImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.SupportsSnapshots;
+import org.apache.paimon.utils.SnapshotLoader;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Implementation of {@link SnapshotLoader}. */
+public class SnapshotLoaderImpl implements SnapshotLoader {
+
+    private final CatalogLoader catalogLoader;
+    private final Identifier identifier;
+
+    public SnapshotLoaderImpl(CatalogLoader catalogLoader, Identifier 
identifier) {
+        this.catalogLoader = catalogLoader;
+        this.identifier = identifier;
+    }
+
+    @Override
+    public Optional<Snapshot> load() throws IOException {
+        try (Catalog catalog = catalogLoader.load()) {
+            return ((SupportsSnapshots) catalog).loadSnapshot(identifier);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public SnapshotLoader copyWithBranch(String branch) {
+        return new SnapshotLoaderImpl(
+                catalogLoader,
+                new Identifier(identifier.getDatabaseName(), 
identifier.getTableName(), branch));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
 b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotLoader.java
similarity index 69%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
copy to paimon-core/src/main/java/org/apache/paimon/utils/SnapshotLoader.java
index 5dc6cffade..b0957ba574 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotLoader.java
@@ -16,12 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest.responses;
+package org.apache.paimon.utils;
 
-/** The type of resource that caused the error. */
-public enum ErrorResponseResourceType {
-    DATABASE,
-    TABLE,
-    COLUMN,
-    VIEW
+import org.apache.paimon.Snapshot;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Optional;
+
+/** Loader to load latest snapshot. */
+public interface SnapshotLoader extends Serializable {
+
+    Optional<Snapshot> load() throws IOException;
+
+    SnapshotLoader copyWithBranch(String branch);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 8b7c91d1f8..5147a19838 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,7 +55,6 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
-import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.BranchManager.branchPath;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
@@ -78,30 +78,28 @@ public class SnapshotManager implements Serializable {
     private final FileIO fileIO;
     private final Path tablePath;
     private final String branch;
+    @Nullable private final SnapshotLoader snapshotLoader;
     @Nullable private final Cache<Path, Snapshot> cache;
 
-    public SnapshotManager(FileIO fileIO, Path tablePath) {
-        this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
-    }
-
-    /** Specify the default branch for data writing. */
-    public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String 
branchName) {
-        this(fileIO, tablePath, branchName, null);
-    }
-
     public SnapshotManager(
             FileIO fileIO,
             Path tablePath,
             @Nullable String branchName,
+            @Nullable SnapshotLoader snapshotLoader,
             @Nullable Cache<Path, Snapshot> cache) {
         this.fileIO = fileIO;
         this.tablePath = tablePath;
         this.branch = BranchManager.normalizeBranch(branchName);
+        this.snapshotLoader = snapshotLoader;
         this.cache = cache;
     }
 
     public SnapshotManager copyWithBranch(String branchName) {
-        return new SnapshotManager(fileIO, tablePath, branchName);
+        SnapshotLoader newSnapshotLoader = null;
+        if (snapshotLoader != null) {
+            newSnapshotLoader = snapshotLoader.copyWithBranch(branchName);
+        }
+        return new SnapshotManager(fileIO, tablePath, branchName, 
newSnapshotLoader, cache);
     }
 
     public FileIO fileIO() {
@@ -209,12 +207,22 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Snapshot latestSnapshot() {
+        if (snapshotLoader != null) {
+            try {
+                return snapshotLoader.load().orElse(null);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
         Long snapshotId = latestSnapshotId();
         return snapshotId == null ? null : snapshot(snapshotId);
     }
 
     public @Nullable Long latestSnapshotId() {
         try {
+            if (snapshotLoader != null) {
+                return snapshotLoader.load().map(Snapshot::id).orElse(null);
+            }
             return findLatest(snapshotDirectory(), SNAPSHOT_PREFIX, 
this::snapshotPath);
         } catch (IOException e) {
             throw new RuntimeException("Failed to find latest snapshot id", e);
diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java 
b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
index d8266c1c6d..08a5dba91e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
@@ -18,9 +18,16 @@
 
 package org.apache.paimon;
 
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.SnapshotManager;
+
 import org.junit.jupiter.api.Test;
 
-class SnapshotTest {
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+
+/** Test for snapshots. */
+public class SnapshotTest {
 
     @Test
     public void testJsonIgnoreProperties() {
@@ -41,4 +48,12 @@ class SnapshotTest {
                         + "  \"unknownKey\" : 22222\n"
                         + "}");
     }
+
+    public static SnapshotManager newSnapshotManager(FileIO fileIO, Path 
tablePath) {
+        return newSnapshotManager(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+    }
+
+    public static SnapshotManager newSnapshotManager(FileIO fileIO, Path 
tablePath, String branch) {
+        return new SnapshotManager(fileIO, tablePath, branch, null, null);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
index 20f2382b09..4489fe9a41 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
@@ -48,6 +48,7 @@ import java.util.Random;
 import java.util.UUID;
 
 import static java.util.Collections.singletonMap;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for append table compaction. */
@@ -69,7 +70,7 @@ public class AppendOnlyTableCompactionTest {
         FileIO fileIO = new LocalFileIO();
         path = new org.apache.paimon.fs.Path(tempDir.toString());
         tableSchema = new SchemaManager(fileIO, path).createTable(schema());
-        snapshotManager = new SnapshotManager(fileIO, path);
+        snapshotManager = newSnapshotManager(fileIO, path);
         recreate();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 74d79d452a..8f223e4225 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -133,7 +133,7 @@ public class PartitionExpireTest {
                 };
 
         CatalogEnvironment env =
-                new CatalogEnvironment(null, null, null, null) {
+                new CatalogEnvironment(null, null, null, null, false) {
 
                     @Override
                     public PartitionHandler partitionHandler() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index e9b5b7f0a0..04f514ef2e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -45,6 +45,7 @@ import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.ErrorResponseResourceType;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -74,6 +75,7 @@ import java.util.Optional;
 import java.util.UUID;
 
 import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
+import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
 
 /** Mock REST server for testing. */
 public class RESTCatalogServer {
@@ -159,6 +161,10 @@ public class RESTCatalogServer {
                                 resources.length == 4
                                         && "tables".equals(resources[1])
                                         && "token".equals(resources[3]);
+                        boolean isTableSnapshot =
+                                resources.length == 4
+                                        && "tables".equals(resources[1])
+                                        && "snapshot".equals(resources[3]);
                         boolean isPartitions =
                                 resources.length == 4
                                         && "tables".equals(resources[1])
@@ -243,6 +249,24 @@ public class RESTCatalogServer {
                                     .setBody(
                                             OBJECT_MAPPER.writeValueAsString(
                                                     getTableTokenResponse));
+                        } else if (isTableSnapshot) {
+                            if (!"my_snapshot_table".equals(resources[2])) {
+                                response =
+                                        new ErrorResponse(
+                                                
ErrorResponseResourceType.SNAPSHOT,
+                                                databaseName,
+                                                "No Snapshot",
+                                                404);
+                                return mockResponse(response, 404);
+                            }
+                            GetTableSnapshotResponse getTableSnapshotResponse =
+                                    new GetTableSnapshotResponse(
+                                            createSnapshotWithMillis(10086, 
100));
+                            return new MockResponse()
+                                    .setResponseCode(200)
+                                    .setBody(
+                                            OBJECT_MAPPER.writeValueAsString(
+                                                    getTableSnapshotResponse));
                         } else if (isTableRename) {
                             return renameTableApiHandler(catalog, request);
                         } else if (isTableCommit) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index a0d17057a2..20e3127a8a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
@@ -45,8 +47,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -147,6 +151,24 @@ class RESTCatalogTest extends CatalogTestBase {
         }
     }
 
+    @Test
+    void testSnapshotFromREST() throws Catalog.TableNotExistException {
+        Options options = new Options();
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        options.set(RESTCatalogOptions.TOKEN, initToken);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
+        RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
+
+        Optional<Snapshot> snapshot =
+                catalog.loadSnapshot(Identifier.create("test_db_a", 
"my_snapshot_table"));
+        assertThat(snapshot).isPresent();
+        assertThat(snapshot.get().id()).isEqualTo(10086);
+        assertThat(snapshot.get().timeMillis()).isEqualTo(100);
+
+        snapshot = catalog.loadSnapshot(Identifier.create("test_db_a", 
"unknown"));
+        assertThat(snapshot).isEmpty();
+    }
+
     @Override
     protected boolean supportsFormatTable() {
         return true;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 34e05dd8bd..aa96c3b7df 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -117,6 +117,7 @@ import static 
org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.WRITE_ONLY;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -667,7 +668,7 @@ public abstract class FileStoreTableTestBase {
         }
 
         SnapshotManager snapshotManager =
-                new SnapshotManager(FileIOFinder.find(tablePath), 
table.location());
+                newSnapshotManager(FileIOFinder.find(tablePath), 
table.location());
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         assertThat(latestSnapshotId).isNotNull();
         for (int i = 1; i <= latestSnapshotId; i++) {
@@ -1111,7 +1112,7 @@ public abstract class FileStoreTableTestBase {
             // snapshot 2
             write.write(rowData(2, 20, 200L));
             commit.commit(1, write.prepareCommit(false, 2));
-            SnapshotManager snapshotManager = new SnapshotManager(new 
TraceableFileIO(), tablePath);
+            SnapshotManager snapshotManager = newSnapshotManager(new 
TraceableFileIO(), tablePath);
             // The snapshot 1 is expired.
             assertThat(snapshotManager.snapshotExists(1)).isFalse();
             table.createTag("test-tag-2", 1);
@@ -1186,7 +1187,7 @@ public abstract class FileStoreTableTestBase {
 
         // verify snapshot in test-branch is equal to snapshot 2
         SnapshotManager snapshotManager =
-                new SnapshotManager(new TraceableFileIO(), tablePath, 
"test-branch");
+                newSnapshotManager(new TraceableFileIO(), tablePath, 
"test-branch");
         Snapshot branchSnapshot =
                 Snapshot.fromPath(new TraceableFileIO(), 
snapshotManager.snapshotPath(2));
         assertThat(branchSnapshot.equals(snapshot2)).isTrue();
@@ -1334,7 +1335,7 @@ public abstract class FileStoreTableTestBase {
                         "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
 
         // verify snapshot in branch1 and main branch is same
-        SnapshotManager snapshotManager = new SnapshotManager(new 
TraceableFileIO(), tablePath);
+        SnapshotManager snapshotManager = newSnapshotManager(new 
TraceableFileIO(), tablePath);
         Snapshot branchSnapshot =
                 Snapshot.fromPath(
                         new TraceableFileIO(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
index 13c9f87838..1516f5bf2a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -56,7 +57,7 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
         catalog.createTable(identifier, schema, true);
         Table table = catalog.getTable(identifier);
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
-        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+        SnapshotManager snapshotManager = 
newSnapshotManager(LocalFileIO.create(), tablePath);
 
         Long timestampEarliest = System.currentTimeMillis();
         // snapshot 1: append
@@ -145,7 +146,7 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
         catalog.createTable(identifier, schema, true);
         Table table = catalog.getTable(identifier);
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
-        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+        SnapshotManager snapshotManager = 
newSnapshotManager(LocalFileIO.create(), tablePath);
 
         // snapshot 1: append
         write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), 
GenericRow.of(1, 3, 1));
@@ -183,7 +184,7 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
         catalog.createTable(identifier, schema, true);
         Table table = catalog.getTable(identifier);
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
-        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+        SnapshotManager snapshotManager = 
newSnapshotManager(LocalFileIO.create(), tablePath);
 
         // snapshot 1: append
         write(
@@ -240,7 +241,7 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
         catalog.createTable(identifier, schema, true);
         Table table = catalog.getTable(identifier);
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
-        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+        SnapshotManager snapshotManager = 
newSnapshotManager(LocalFileIO.create(), tablePath);
         Long timestampEarliest = System.currentTimeMillis();
         // snapshot 1: append
         write(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 82aefcbdd3..f3f55ac197 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -53,6 +53,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -91,7 +92,7 @@ public class FilesTableTest extends TableTestBase {
         Identifier filesTableId =
                 identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER + 
FilesTable.FILES);
         filesTable = (FilesTable) catalog.getTable(filesTableId);
-        snapshotManager = new SnapshotManager(fileIO, tablePath);
+        snapshotManager = newSnapshotManager(fileIO, tablePath);
 
         // snapshot 1: append
         write(table, GenericRow.of(1, 1, 10, 1), GenericRow.of(1, 2, 20, 5));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index f375dfd2c8..b16d0f82a8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static 
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertThrows;
@@ -76,7 +77,7 @@ public class ManifestsTableTest extends TableTestBase {
 
         FileIO fileIO = LocalFileIO.create();
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
-        snapshotManager = new SnapshotManager(fileIO, tablePath);
+        snapshotManager = newSnapshotManager(fileIO, tablePath);
 
         ManifestList.Factory factory =
                 new ManifestList.Factory(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
index 44381c124d..a7055c5c5e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
@@ -48,6 +48,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link SnapshotsTable}. */
@@ -71,7 +72,7 @@ public class SnapshotsTableTest extends TableTestBase {
                         .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
                         .option(CoreOptions.BUCKET.key(), "2")
                         .build();
-        snapshotManager = new SnapshotManager(fileIO, tablePath);
+        snapshotManager = newSnapshotManager(fileIO, tablePath);
         TableSchema tableSchema =
                 SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
         FileStoreTable table =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 61f6135b16..fe79938211 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -53,7 +55,7 @@ public class SnapshotManagerTest {
     @Test
     public void testSnapshotPath() {
         SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+                newSnapshotManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
         for (int i = 0; i < 20; i++) {
             assertThat(snapshotManager.snapshotPath(i))
                     .isEqualTo(new Path(tempDir.toString() + 
"/snapshot/snapshot-" + i));
@@ -203,7 +205,7 @@ public class SnapshotManagerTest {
         long millis = 1684726826L;
         FileIO localFileIO = LocalFileIO.create();
         SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+                newSnapshotManager(localFileIO, new Path(tempDir.toString()));
         // create 10 snapshots
         for (long i = 0; i < 10; i++) {
             Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
@@ -239,7 +241,7 @@ public class SnapshotManagerTest {
         assertThat(snapshotManager.laterOrEqualWatermark(millis + 
999)).isNull();
     }
 
-    private Snapshot createSnapshotWithMillis(long id, long millis) {
+    public static Snapshot createSnapshotWithMillis(long id, long millis) {
         return new Snapshot(
                 id,
                 0L,
@@ -304,7 +306,7 @@ public class SnapshotManagerTest {
     public void testLatestSnapshotOfUser() throws IOException, 
InterruptedException {
         FileIO localFileIO = LocalFileIO.create();
         SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+                newSnapshotManager(localFileIO, new Path(tempDir.toString()));
         // create 100 snapshots using user "lastCommitUser"
         for (long i = 0; i < 100; i++) {
             Snapshot snapshot =
@@ -353,7 +355,7 @@ public class SnapshotManagerTest {
     public void testTraversalSnapshotsFromLatestSafely() throws IOException, 
InterruptedException {
         FileIO localFileIO = LocalFileIO.create();
         Path path = new Path(tempDir.toString());
-        SnapshotManager snapshotManager = new SnapshotManager(localFileIO, 
path);
+        SnapshotManager snapshotManager = newSnapshotManager(localFileIO, 
path);
         // create 10 snapshots
         for (long i = 0; i < 10; i++) {
             Snapshot snapshot =
@@ -449,7 +451,7 @@ public class SnapshotManagerTest {
     public void testLongLivedChangelog() throws Exception {
         FileIO localFileIO = LocalFileIO.create();
         SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+                newSnapshotManager(localFileIO, new Path(tempDir.toString()));
         long millis = 1L;
         for (long i = 1; i <= 5; i++) {
             Changelog changelog = createChangelogWithMillis(i, millis + i * 
1000);
@@ -474,7 +476,7 @@ public class SnapshotManagerTest {
     public void testCommitChangelogWhenSameChangelogCommitTwice() throws 
IOException {
         FileIO localFileIO = LocalFileIO.create();
         SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+                newSnapshotManager(localFileIO, new Path(tempDir.toString()));
         long id = 1L;
         Changelog changelog = createChangelogWithMillis(id, 1L);
         snapshotManager.commitChangelog(changelog, id);
@@ -492,7 +494,7 @@ public class SnapshotManagerTest {
         private boolean deleteEarliestSnapshot = false;
 
         public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean 
isRaceCondition) {
-            super(fileIO, tablePath);
+            super(fileIO, tablePath, DEFAULT_MAIN_BRANCH, null, null);
             this.isRaceCondition = isRaceCondition;
         }
 
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index d8fcddd1c8..1d29080bb3 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -121,7 +122,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         iterator.close();
 
         SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory("T1"));
+                newSnapshotManager(LocalFileIO.create(), 
getTableDirectory("T1"));
         List<Snapshot> snapshots =
                 new 
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
         snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index 01d615c3e1..32c09c35fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -58,6 +58,8 @@ import java.util.Map;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
+
 /** ITCase for catalog. */
 public abstract class CatalogITCaseBase extends AbstractTestBase {
 
@@ -204,7 +206,7 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
     @Nullable
     protected Snapshot findLatestSnapshot(String tableName) {
         SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory(tableName));
+                newSnapshotManager(LocalFileIO.create(), 
getTableDirectory(tableName));
         Long id = snapshotManager.latestSnapshotId();
         return id == null ? null : snapshotManager.snapshot(id);
     }
@@ -212,7 +214,7 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
     @Nullable
     protected Snapshot findSnapshot(String tableName, long snapshotId) {
         SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory(tableName));
+                newSnapshotManager(LocalFileIO.create(), 
getTableDirectory(tableName));
         Long id = snapshotManager.latestSnapshotId();
         return id == null ? null : id >= snapshotId ? 
snapshotManager.snapshot(snapshotId) : null;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index b448858328..689afe09fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -264,7 +265,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         iterator.close();
 
         SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory("T1"));
+                newSnapshotManager(LocalFileIO.create(), 
getTableDirectory("T1"));
         List<Snapshot> snapshots =
                 new 
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
         snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index d44ba03f12..9b981102ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -120,7 +121,7 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
         Snapshot lastSnapshot = findLatestSnapshot("T3");
         assertThat(lastSnapshot).isNotNull();
         SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory("T3"));
+                newSnapshotManager(LocalFileIO.create(), 
getTableDirectory("T3"));
         for (long snapshotId = lastSnapshot.id();
                 snapshotId > snapshotAfterRescale.id();
                 snapshotId--) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 28c93ca79b..1981abd373 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -67,6 +67,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -158,7 +159,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         testHarness.snapshot(cpId, 1);
         testHarness.notifyOfCompletedCheckpoint(cpId);
 
-        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+        SnapshotManager snapshotManager = 
newSnapshotManager(LocalFileIO.create(), tablePath);
 
         // should create 10 snapshots
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(cpId);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 53e3a6dcb7..1958d15a3f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -72,6 +72,7 @@ import java.util.Objects;
 import java.util.UUID;
 
 import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -289,10 +290,9 @@ class StoreMultiCommitterTest {
         testHarness.snapshot(cpId, 1);
         testHarness.notifyOfCompletedCheckpoint(cpId);
 
-        SnapshotManager snapshotManager1 =
-                new SnapshotManager(LocalFileIO.create(), firstTablePath);
+        SnapshotManager snapshotManager1 = 
newSnapshotManager(LocalFileIO.create(), firstTablePath);
         SnapshotManager snapshotManager2 =
-                new SnapshotManager(LocalFileIO.create(), secondTablePath);
+                newSnapshotManager(LocalFileIO.create(), secondTablePath);
 
         // should create 10 snapshots for first table
         assertThat(snapshotManager1.latestSnapshotId()).isEqualTo(cpId);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index a9107a78fc..8b4dcf979f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -62,6 +62,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static java.util.Collections.singletonList;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 
 /** Util class to read and write data for source tests. */
 public class TestChangelogDataReadWrite {
@@ -113,7 +114,7 @@ public class TestChangelogDataReadWrite {
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
                         null,
                         null);
-        this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new 
Path(root));
+        this.snapshotManager = newSnapshotManager(LocalFileIO.create(), new 
Path(root));
         this.commitUser = UUID.randomUUID().toString();
     }
 
diff --git a/paimon-open-api/rest-catalog-open-api.yaml 
b/paimon-open-api/rest-catalog-open-api.yaml
index 014c3ee662..4b36bcb3fd 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -469,6 +469,43 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
+  /v1/{prefix}/databases/{database}/tables/{table}/snapshot:
+    get:
+      tags:
+        - table
+      summary: Get table snapshot
+      operationId: getTableSnapshot
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetTableSnapshotResponse'
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
   /v1/{prefix}/databases/{database}/tables/{table}/partitions:
     get:
       tags:
@@ -1215,6 +1252,11 @@ components:
         expiresAt:
           type: integer
           format: int64
+    GetTableSnapshotResponse:
+      type: object
+      properties:
+        snapshot:
+          $ref: '#/components/schemas/Snapshot'
     AlterDatabaseRequest:
       type: object
       properties:
diff --git 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 1ed5d1cca6..059d593c1a 100644
--- 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++ 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -38,6 +38,7 @@ import 
org.apache.paimon.rest.responses.CreateDatabaseResponse;
 import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -383,6 +384,31 @@ public class RESTCatalogController {
                 ImmutableMap.of("key", "value"), System.currentTimeMillis());
     }
 
+    @Operation(
+            summary = "Get table snapshot",
+            tags = {"table"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {
+                    @Content(schema = @Schema(implementation = 
GetTableSnapshotResponse.class))
+                }),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/snapshot")
+    public GetTableSnapshotResponse getTableSnapshot(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table) {
+        return new GetTableSnapshotResponse(null);
+    }
+
     @Operation(
             summary = "List partitions",
             tags = {"partition"})

Reply via email to