This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 16915c5522 [python] Support getting snapshot by RESTCatalog (#7412)
16915c5522 is described below

commit 16915c55221419027af3a9a5f31cb7a8118cbfe9
Author: umi <[email protected]>
AuthorDate: Thu Mar 12 17:42:46 2026 +0800

    [python] Support getting snapshot by RESTCatalog (#7412)
---
 paimon-python/pypaimon/api/api_request.py          |   4 +-
 paimon-python/pypaimon/api/api_response.py         |  28 ++++++
 paimon-python/pypaimon/api/resource_paths.py       |   4 +
 paimon-python/pypaimon/api/rest_api.py             |  21 ++++-
 paimon-python/pypaimon/catalog/catalog.py          |  21 ++++-
 .../pypaimon/catalog/catalog_environment.py        |  11 +++
 .../pypaimon/catalog/filesystem_catalog.py         |  11 +++
 .../pypaimon/catalog/filesystem_catalog_loader.py  |  67 ++++++++++++++
 .../pypaimon/catalog/rest/rest_catalog.py          |  30 +++++-
 paimon-python/pypaimon/snapshot/snapshot_loader.py |  57 ++++++++++++
 .../pypaimon/snapshot/snapshot_manager.py          |  56 ++++++++---
 paimon-python/pypaimon/snapshot/table_snapshot.py  |  38 ++++++++
 .../rest/rest_catalog_commit_snapshot_test.py      |   4 +-
 .../pypaimon/tests/rest/rest_catalog_test.py       |  96 +++++++++++++++++++
 paimon-python/pypaimon/tests/rest/rest_server.py   | 102 ++++++++++++---------
 .../pypaimon/tests/write/table_write_test.py       |   4 +-
 paimon-python/pypaimon/write/file_store_commit.py  |  14 ---
 17 files changed, 487 insertions(+), 81 deletions(-)

diff --git a/paimon-python/pypaimon/api/api_request.py 
b/paimon-python/pypaimon/api/api_request.py
index f191fb03d3..6d1af78894 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -71,11 +71,11 @@ class CreateTableRequest(RESTRequest):
 
 @dataclass
 class CommitTableRequest(RESTRequest):
-    FIELD_TABLE_UUID = "tableUuid"
+    FIELD_TABLE_ID = "tableId"
     FIELD_SNAPSHOT = "snapshot"
     FIELD_STATISTICS = "statistics"
 
-    table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID)
+    table_id: Optional[str] = json_field(FIELD_TABLE_ID)
     snapshot: Snapshot = json_field(FIELD_SNAPSHOT)
     statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS)
 
diff --git a/paimon-python/pypaimon/api/api_response.py 
b/paimon-python/pypaimon/api/api_response.py
index 8e1e498f59..0cfa73c855 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -23,6 +23,7 @@ from typing import Dict, Generic, List, Optional
 from pypaimon.common.json_util import T, json_field
 from pypaimon.common.options import Options
 from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.table_snapshot import TableSnapshot
 
 
 @dataclass
@@ -37,6 +38,18 @@ class RESTResponse(ABC):
 
 @dataclass
 class ErrorResponse(RESTResponse):
+    """Error response"""
+    RESOURCE_TYPE_DATABASE = "DATABASE"
+    RESOURCE_TYPE_TABLE = "TABLE"
+    RESOURCE_TYPE_VIEW = "VIEW"
+    RESOURCE_TYPE_FUNCTION = "FUNCTION"
+    RESOURCE_TYPE_COLUMN = "COLUMN"
+    RESOURCE_TYPE_SNAPSHOT = "SNAPSHOT"
+    RESOURCE_TYPE_TAG = "TAG"
+    RESOURCE_TYPE_BRANCH = "BRANCH"
+    RESOURCE_TYPE_DEFINITION = "DEFINITION"
+    RESOURCE_TYPE_DIALECT = "DIALECT"
+
     resource_type: Optional[str] = json_field("resourceType", default=None)
     resource_name: Optional[str] = json_field("resourceName", default=None)
     message: Optional[str] = json_field("message", default=None)
@@ -267,3 +280,18 @@ class CommitTableResponse(RESTResponse):
 
     def is_success(self) -> bool:
         return self.success
+
+
+@dataclass
+class GetTableSnapshotResponse(RESTResponse):
+    """Response for getting table snapshot."""
+
+    FIELD_SNAPSHOT = "snapshot"
+
+    snapshot: Optional[TableSnapshot] = json_field(FIELD_SNAPSHOT, 
default=None)
+
+    def __init__(self, snapshot: Optional[TableSnapshot] = None):
+        self.snapshot = snapshot
+
+    def get_snapshot(self) -> Optional[TableSnapshot]:
+        return self.snapshot
diff --git a/paimon-python/pypaimon/api/resource_paths.py 
b/paimon-python/pypaimon/api/resource_paths.py
index 16967d16e2..03a7c43fab 100644
--- a/paimon-python/pypaimon/api/resource_paths.py
+++ b/paimon-python/pypaimon/api/resource_paths.py
@@ -74,3 +74,7 @@ class ResourcePaths:
     def rollback_table(self, database_name: str, table_name: str) -> str:
         return ("{}/{}/{}/{}/{}/rollback".format(self.base_path, 
self.DATABASES, RESTUtil.encode_string(database_name),
                                                  self.TABLES, 
RESTUtil.encode_string(table_name)))
+
+    def table_snapshot(self, database_name: str, table_name: str) -> str:
+        return ("{}/{}/{}/{}/{}/snapshot".format(self.base_path, 
self.DATABASES, RESTUtil.encode_string(database_name),
+                                                 self.TABLES, 
RESTUtil.encode_string(table_name)))
diff --git a/paimon-python/pypaimon/api/rest_api.py 
b/paimon-python/pypaimon/api/rest_api.py
index 069220d687..b7c816c9d9 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -27,7 +27,7 @@ from pypaimon.api.api_response import (CommitTableResponse, 
ConfigResponse,
                                        GetTableTokenResponse,
                                        ListDatabasesResponse,
                                        ListTablesResponse, PagedList,
-                                       PagedResponse)
+                                       PagedResponse, GetTableSnapshotResponse)
 from pypaimon.api.auth import AuthProviderFactory, RESTAuthFunction
 from pypaimon.api.client import HttpClient
 from pypaimon.api.resource_paths import ResourcePaths
@@ -380,6 +380,25 @@ class RESTApi:
             self.rest_auth_function
         )
 
+    def load_snapshot(self, identifier: Identifier) -> 
Optional['TableSnapshot']:
+        """Load latest snapshot for table.
+
+        Args:
+            identifier: Database name and table name.
+
+        Returns:
+            TableSnapshot instance or None if snapshot not found.
+        """
+        database_name, table_name = self.__validate_identifier(identifier)
+        response = self.client.get(
+            self.resource_paths.table_snapshot(database_name, table_name),
+            GetTableSnapshotResponse,
+            self.rest_auth_function
+        )
+        if response is None:
+            return None
+        return response.get_snapshot()
+
     @staticmethod
     def __validate_identifier(identifier: Identifier):
         if not identifier:
diff --git a/paimon-python/pypaimon/catalog/catalog.py 
b/paimon-python/pypaimon/catalog/catalog.py
index 2cca595411..d5f6cdee99 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -138,6 +138,21 @@ class Catalog(ABC):
         """
         return False
 
+    @abstractmethod
+    def load_snapshot(self, identifier: Identifier):
+        """Load the snapshot of table identified by the given Identifier.
+
+        Args:
+            identifier: Path of the table
+
+        Returns:
+            TableSnapshot instance
+
+        Raises:
+            NotImplementedError: If the catalog does not support version 
management
+            TableNotExistException: If the table does not exist
+        """
+
     @abstractmethod
     def commit_snapshot(
             self,
@@ -178,9 +193,9 @@ class Catalog(ABC):
         )
 
     def drop_partitions(
-        self,
-        identifier: Union[str, Identifier],
-        partitions: List[Dict[str, str]],
+            self,
+            identifier: Union[str, Identifier],
+            partitions: List[Dict[str, str]],
     ) -> None:
         raise NotImplementedError(
             "drop_partitions is not supported by this catalog. Use REST 
catalog for partition drop."
diff --git a/paimon-python/pypaimon/catalog/catalog_environment.py 
b/paimon-python/pypaimon/catalog/catalog_environment.py
index 2d95365508..984a007ed4 100644
--- a/paimon-python/pypaimon/catalog/catalog_environment.py
+++ b/paimon-python/pypaimon/catalog/catalog_environment.py
@@ -23,6 +23,7 @@ from pypaimon.common.identifier import Identifier
 from pypaimon.snapshot.catalog_snapshot_commit import CatalogSnapshotCommit
 from pypaimon.snapshot.renaming_snapshot_commit import RenamingSnapshotCommit
 from pypaimon.snapshot.snapshot_commit import SnapshotCommit
+from pypaimon.snapshot.snapshot_loader import SnapshotLoader
 
 
 class CatalogEnvironment:
@@ -76,6 +77,16 @@ class CatalogEnvironment:
             return TableRollback(catalog, self.identifier)
         return None
 
+    def snapshot_loader(self) -> Optional[SnapshotLoader]:
+        """Create a SnapshotLoader instance based on the catalog environment.
+
+        Returns:
+            SnapshotLoader instance if catalog_loader is available, None 
otherwise
+        """
+        if self.catalog_loader is not None:
+            return SnapshotLoader(self.catalog_loader, self.identifier)
+        return None
+
     def copy(self, identifier: Identifier) -> 'CatalogEnvironment':
         """
         Create a copy of this CatalogEnvironment with a different identifier.
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index eb8fea5ba0..73a7f6e3cd 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -243,3 +243,14 @@ class FileSystemCatalog(Catalog):
             statistics: List[PartitionStatistics]
     ) -> bool:
         raise NotImplementedError("This catalog does not support commit 
catalog")
+
+    def load_snapshot(self, identifier: Identifier):
+        """Load the snapshot of table identified by the given Identifier.
+
+        Args:
+            identifier: Path of the table
+
+        Raises:
+            NotImplementedError: FileSystemCatalog does not support version 
management
+        """
+        raise NotImplementedError("Filesystem catalog does not support 
load_snapshot")
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog_loader.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog_loader.py
new file mode 100644
index 0000000000..664b521d58
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog_loader.py
@@ -0,0 +1,67 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""
+FileSystemCatalogLoader implementation for pypaimon.
+
+This module provides the FileSystemCatalogLoader class which implements the 
CatalogLoader
+interface to create and load FileSystemCatalog instances.
+"""
+
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+
+
+class FileSystemCatalogLoader(CatalogLoader):
+    """
+    Loader to create FileSystemCatalog instances.
+
+    This class implements the CatalogLoader interface and is responsible for
+    creating and configuring FileSystemCatalog instances based on the provided
+    CatalogContext.
+    """
+
+    def __init__(self, context: CatalogContext):
+        """
+        Initialize FileSystemCatalogLoader with a CatalogContext.
+
+        Args:
+            context: The CatalogContext containing configuration options
+        """
+        self._context = context
+
+    def context(self) -> CatalogContext:
+        """
+        Get the CatalogContext associated with this loader.
+
+        Returns:
+            The CatalogContext instance
+        """
+        return self._context
+
+    def load(self) -> FileSystemCatalog:
+        """
+        Load and return a new FileSystemCatalog instance.
+
+        This method creates a new FileSystemCatalog instance using the stored
+        CatalogContext.
+
+        Returns:
+            A new FileSystemCatalog instance
+        """
+        return FileSystemCatalog(self._context.options)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index bfb936d9a7..399d4e1a86 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -17,7 +17,7 @@ limitations under the License.
 """
 from typing import Any, Callable, Dict, List, Optional, Union
 
-from pypaimon.api.api_response import GetTableResponse, PagedList
+from pypaimon.api.api_response import GetTableResponse, PagedList, 
ErrorResponse
 from pypaimon.api.rest_api import RESTApi
 from pypaimon.api.rest_exception import NoSuchResourceException, 
AlreadyExistsException, ForbiddenException
 from pypaimon.catalog.catalog import Catalog
@@ -301,16 +301,40 @@ class RESTCatalog(Catalog):
         try:
             self.rest_api.rollback_to(identifier, instant, from_snapshot)
         except NoSuchResourceException as e:
-            if e.resource_type == "snapshot":
+            if e.resource_type == ErrorResponse.RESOURCE_TYPE_SNAPSHOT:
                 raise ValueError(
                     "Rollback snapshot '{}' doesn't 
exist.".format(e.resource_name)) from e
-            elif e.resource_type == "tag":
+            elif e.resource_type == ErrorResponse.RESOURCE_TYPE_TAG:
                 raise ValueError(
                     "Rollback tag '{}' doesn't 
exist.".format(e.resource_name)) from e
             raise TableNotExistException(identifier) from e
         except ForbiddenException as e:
             raise TableNoPermissionException(identifier) from e
 
+    def load_snapshot(self, identifier: Union[str, Identifier]) -> 
Optional['TableSnapshot']:
+        """Load the latest snapshot for table.
+
+        Args:
+            identifier: Path of the table (Identifier or string).
+
+        Returns:
+            TableSnapshot instance or None if snapshot not found.
+
+        Raises:
+            TableNotExistException: If the table does not exist.
+            TableNoPermissionException: If no permission to access this table.
+        """
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            return self.rest_api.load_snapshot(identifier)
+        except NoSuchResourceException as e:
+            if e.resource_type == ErrorResponse.RESOURCE_TYPE_SNAPSHOT:
+                return None
+            raise TableNotExistException(identifier) from e
+        except ForbiddenException as e:
+            raise TableNoPermissionException(identifier) from e
+
     def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
         try:
             response = self.rest_api.get_table(identifier)
diff --git a/paimon-python/pypaimon/snapshot/snapshot_loader.py 
b/paimon-python/pypaimon/snapshot/snapshot_loader.py
new file mode 100644
index 0000000000..aba0a66c58
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/snapshot_loader.py
@@ -0,0 +1,57 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+"""Snapshot loader for loading snapshots from a catalog."""
+
+from typing import Optional
+
+
+class SnapshotLoader:
+    """Loader to load latest snapshot from a catalog.
+    
+    This loader uses a CatalogLoader to create catalog instances and
+    load snapshots through the catalog's load_snapshot method.
+    """
+
+    def __init__(self, catalog_loader, identifier):
+        """Initialize the loader with a catalog loader and table identifier.
+        
+        Args:
+            catalog_loader: The CatalogLoader instance to create catalog
+            identifier: The table identifier
+        """
+        self.catalog_loader = catalog_loader
+        self.identifier = identifier
+
+    def load(self) -> Optional[str]:
+        """Load the latest snapshot from the catalog.
+        
+        Returns:
+            The latest snapshot JSON string, or None if not found
+            
+        Raises:
+            RuntimeError: If there's an error loading the snapshot
+        """
+        try:
+            catalog = self.catalog_loader.load()
+            table_snapshot = catalog.load_snapshot(self.identifier)
+            if table_snapshot is None:
+                return None
+            return table_snapshot.snapshot
+        except RuntimeError as e:
+            raise e
+        except Exception as e:
+            raise RuntimeError(f"Failed to load snapshot: {e}")
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 61678e2d96..1ad1364d3a 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -1,4 +1,3 @@
-################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -7,14 +6,14 @@
 #  "License"); you may not use this file except in compliance
 #  with the License.  You may obtain a copy of the License at
 #
-#      http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
 import logging
 from typing import Optional
 
@@ -23,6 +22,7 @@ from pypaimon.common.file_io import FileIO
 logger = logging.getLogger(__name__)
 from pypaimon.common.json_util import JSON
 from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_loader import SnapshotLoader
 
 
 class SnapshotManager:
@@ -33,17 +33,45 @@ class SnapshotManager:
 
         self.table: FileStoreTable = table
         self.file_io: FileIO = self.table.file_io
+        self.snapshot_loader: Optional[SnapshotLoader] = 
self.table.catalog_environment.snapshot_loader()
+
         snapshot_path = self.table.table_path.rstrip('/')
         self.snapshot_dir = f"{snapshot_path}/snapshot"
         self.latest_file = f"{self.snapshot_dir}/LATEST"
 
     def get_latest_snapshot(self) -> Optional[Snapshot]:
-        snapshot_json = self.get_latest_snapshot_json()
-        if snapshot_json is None:
-            return None
-        return JSON.from_json(snapshot_json, Snapshot)
+        """
+        Get the latest snapshot with loader priority.
 
-    def get_latest_snapshot_json(self) -> Optional[str]:
+        1. Try to load from snapshotLoader if available
+        2. Fallback to filesystem if loader is not available or throws 
NotImplementedError`
+        3. Return None if no snapshot found
+
+        Returns:
+            The latest snapshot JSON string, or None if not found
+        """
+        # Try to load from snapshotLoader if available
+        if self.snapshot_loader is not None:
+            try:
+                snapshot = self.snapshot_loader.load()
+            except NotImplementedError:
+                # Loader not supported, fallback to filesystem
+                snapshot = self._get_latest_snapshot_from_filesystem()
+            except IOError as e:
+                # IO error, re-raise with context
+                raise RuntimeError(f"Failed to load snapshot from loader: {e}")
+        else:
+            # No loader, use filesystem directly
+            snapshot = self._get_latest_snapshot_from_filesystem()
+        return snapshot
+
+    def _get_latest_snapshot_from_filesystem(self) -> Optional[Snapshot]:
+        """
+        Get the latest snapshot from filesystem by reading LATEST file.
+        
+        Returns:
+            The latest snapshot, or None if not found
+        """
         if not self.file_io.exists(self.latest_file):
             return None
 
@@ -54,7 +82,7 @@ class SnapshotManager:
         if not self.file_io.exists(snapshot_file):
             return None
 
-        return self.file_io.read_file_utf8(snapshot_file)
+        return JSON.from_json(self.file_io.read_file_utf8(snapshot_file), 
Snapshot)
 
     def read_latest_file(self, max_retries: int = 5):
         """
diff --git a/paimon-python/pypaimon/snapshot/table_snapshot.py 
b/paimon-python/pypaimon/snapshot/table_snapshot.py
new file mode 100644
index 0000000000..541d756bc4
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/table_snapshot.py
@@ -0,0 +1,38 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+"""TableSnapshot class with snapshot and table statistics."""
+
+from dataclasses import dataclass
+
+from pypaimon.common.json_util import json_field
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+@dataclass
+class TableSnapshot:
+    """Snapshot of a table, including basic statistics of this table.
+
+    This class wraps a Snapshot and provides additional table-level statistics
+    such as record count, file size, file count, and last file creation time.
+    """
+
+    # Required fields
+    snapshot: Snapshot = json_field("snapshot")
+    record_count: int = json_field("recordCount")
+    file_size_in_bytes: int = json_field("fileSizeInBytes")
+    file_count: int = json_field("fileCount")
+    last_file_creation_time: int = json_field("lastFileCreationTime")
diff --git 
a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py 
b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
index 998b169084..6e5ea47618 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
@@ -169,13 +169,13 @@ class TestRESTCatalogCommitSnapshot(unittest.TestCase):
         from pypaimon.api.api_request import CommitTableRequest
 
         request = CommitTableRequest(
-            table_uuid="test-uuid",
+            table_id="test-uuid",
             snapshot=self.test_snapshot,
             statistics=self.test_statistics
         )
 
         # Verify request fields
-        self.assertEqual(request.table_uuid, "test-uuid")
+        self.assertEqual(request.table_id, "test-uuid")
         self.assertEqual(request.snapshot, self.test_snapshot)
         self.assertEqual(request.statistics, self.test_statistics)
 
diff --git a/paimon-python/pypaimon/tests/rest/rest_catalog_test.py 
b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
index 153c22ba8c..cdf313f49e 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
@@ -272,6 +272,102 @@ class RESTCatalogTest(RESTBaseTest):
         self.assertIn("no-such-tag", str(context.exception))
         self.assertIn("doesn't exist", str(context.exception))
 
+    def test_catalog_load_snapshot(self):
+        """Test RESTCatalog.load_snapshot returns the latest snapshot."""
+        table_name = "default.table_for_load_snapshot"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        identifier = Identifier.from_string(table_name)
+
+        # Initially, no snapshot exists
+        table_snapshot = self.rest_catalog.load_snapshot(identifier)
+        self.assertIsNone(table_snapshot)
+
+        # Write 3 commits
+        for i in range(3):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Load snapshot should return the latest (snapshot 3)
+        snapshot = self.rest_catalog.load_snapshot(identifier).snapshot
+        self.assertIsNotNone(snapshot)
+        self.assertEqual(snapshot.id, 3)
+        self.assertEqual(snapshot.schema_id, 0)
+
+    def test_catalog_load_snapshot_with_string_identifier(self):
+        """Test load_snapshot using a string identifier instead of Identifier 
object."""
+        table_name = "default.table_load_snapshot_str_id"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        # Write 2 commits
+        for i in range(2):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Use string identifier directly
+        table_snapshot = self.rest_catalog.load_snapshot(table_name)
+        self.assertIsNotNone(table_snapshot)
+        self.assertEqual(table_snapshot.snapshot.id, 2)
+
+    def test_catalog_load_snapshot_nonexistent_table(self):
+        """Test that load_snapshot on a non-existent table raises an error."""
+        from pypaimon.catalog.catalog_exception import TableNotExistException
+        identifier = 
Identifier.from_string("default.no_such_table_for_snapshot")
+        with self.assertRaises(TableNotExistException) as context:
+            self.rest_catalog.load_snapshot(identifier)
+        self.assertIn("no_such_table_for_snapshot", str(context.exception))
+        self.assertIn("does not exist", str(context.exception))
+
+    def test_catalog_load_snapshot_after_rollback(self):
+        """Test load_snapshot returns correct snapshot after rollback."""
+        table_name = "default.table_load_snapshot_after_rollback"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        identifier = Identifier.from_string(table_name)
+
+        # Write 5 commits
+        for i in range(5):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Verify latest snapshot is 5
+        table_snapshot = self.rest_catalog.load_snapshot(identifier)
+        self.assertEqual(table_snapshot.snapshot.id, 5)
+
+        # Rollback to snapshot 3
+        from pypaimon.table.instant import Instant
+        self.rest_catalog.rollback_to(identifier, Instant.snapshot(3))
+
+        # Load snapshot should now return snapshot 3
+        table_snapshot = self.rest_catalog.load_snapshot(identifier)
+        self.assertIsNotNone(table_snapshot)
+        self.assertEqual(table_snapshot.snapshot.id, 3)
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py 
b/paimon-python/pypaimon/tests/rest/rest_server.py
index dd6801f897..ee84553932 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -21,7 +21,6 @@ import re
 import threading
 import time
 import uuid
-from dataclasses import dataclass
 from http.server import BaseHTTPRequestHandler, HTTPServer
 from pathlib import Path
 from typing import Any, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
@@ -35,7 +34,7 @@ from pypaimon.api.api_request import (AlterTableRequest, 
CreateDatabaseRequest,
 from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
                                        GetTableResponse, ListDatabasesResponse,
                                        ListTablesResponse, PagedList,
-                                       RESTResponse)
+                                       RESTResponse, ErrorResponse)
 from pypaimon.api.resource_paths import ResourcePaths
 from pypaimon.api.rest_util import RESTUtil
 from pypaimon.catalog.catalog_exception import (DatabaseNoPermissionException,
@@ -45,45 +44,12 @@ from pypaimon.catalog.catalog_exception import 
(DatabaseNoPermissionException,
                                                 TableAlreadyExistException)
 from pypaimon.catalog.rest.table_metadata import TableMetadata
 from pypaimon.common.identifier import Identifier
-from pypaimon.common.json_util import JSON, json_field
+from pypaimon.common.json_util import JSON
 from pypaimon import Schema
 from pypaimon.schema.schema_change import Actions, SchemaChange
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.schema.table_schema import TableSchema
 
-
-@dataclass
-class ErrorResponse(RESTResponse):
-    """Error response"""
-    RESOURCE_TYPE_DATABASE = "database"
-    RESOURCE_TYPE_TABLE = "table"
-    RESOURCE_TYPE_VIEW = "view"
-    RESOURCE_TYPE_FUNCTION = "function"
-    RESOURCE_TYPE_COLUMN = "column"
-    RESOURCE_TYPE_SNAPSHOT = "snapshot"
-    RESOURCE_TYPE_TAG = "tag"
-    RESOURCE_TYPE_BRANCH = "branch"
-    RESOURCE_TYPE_DEFINITION = "definition"
-    RESOURCE_TYPE_DIALECT = "dialect"
-
-    resource_type: Optional[str] = json_field("resourceType", default=None)
-    resource_name: Optional[str] = json_field("resourceName", default=None)
-    message: Optional[str] = json_field("message", default=None)
-    code: Optional[int] = json_field("code", default=None)
-
-    def __init__(
-        self,
-        resource_type: Optional[str] = None,
-        resource_name: Optional[str] = None,
-        message: Optional[str] = None,
-        code: Optional[int] = None,
-    ):
-        self.resource_type = resource_type
-        self.resource_name = resource_name
-        self.message = message
-        self.code = code
-
-
 # Constants
 DEFAULT_MAX_RESULTS = 100
 AUTHORIZATION_HEADER_KEY = "Authorization"
@@ -486,7 +452,7 @@ class RESTCatalogServer:
             # Basic table operations (GET, DELETE, etc.)
             return self._table_handle(method, data, lookup_identifier)
         elif len(path_parts) == 4:
-            # Extended operations (e.g., commit, token)
+            # Extended operations (e.g., commit, token, snapshot)
             operation = path_parts[3]
             if operation == "commit":
                 return self._table_commit_handle(method, data, 
lookup_identifier, branch_part)
@@ -494,6 +460,8 @@ class RESTCatalogServer:
                 return self._table_token_handle(method, lookup_identifier)
             elif operation == "rollback":
                 return self._table_rollback_handle(method, data, 
lookup_identifier)
+            elif operation == "snapshot":
+                return self._table_snapshot_handle(method, lookup_identifier)
             else:
                 return self._mock_response(ErrorResponse(None, None, "Not 
Found", 404), 404)
         return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
@@ -751,13 +719,65 @@ class RESTCatalogServer:
         table.rollback_to(tag_name)
         return self._mock_response("", 200)
 
+    def _table_snapshot_handle(self, method: str, identifier: Identifier) -> 
Tuple[str, int]:
+        """Handle table snapshot operations.
+
+        Args:
+            method: HTTP method
+            identifier: Table identifier
+
+        Returns:
+            Tuple of (response JSON, HTTP status code)
+        """
+        if method != "GET":
+            return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+        if identifier.get_full_name() not in self.table_metadata_store:
+            raise TableNotExistException(identifier)
+
+        table_metadata = self.table_metadata_store[identifier.get_full_name()]
+        if table_metadata.is_external:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_TABLE,
+                identifier.get_full_name(),
+                "external paimon table does not support get table snapshot in 
rest server",
+                501)
+            return self._mock_response(response, 404)
+
+        # Get the table and snapshot manager to retrieve snapshot
+        table = self._get_file_table(identifier)
+        snapshot_manager = table.snapshot_manager()
+
+        # Get latest snapshot
+        snapshot = snapshot_manager.get_latest_snapshot()
+
+        if snapshot is None:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_SNAPSHOT,
+                identifier.get_database_name(),
+                "No Snapshot",
+                404)
+            return self._mock_response(response, 404)
+
+        from pypaimon.api.api_response import GetTableSnapshotResponse
+        from pypaimon.snapshot.table_snapshot import TableSnapshot
+
+        table_snapshot = TableSnapshot(
+            snapshot=snapshot,
+            record_count=snapshot.total_record_count,
+            file_size_in_bytes=0,
+            file_count=0,
+            last_file_creation_time=snapshot.time_millis
+        )
+        response = GetTableSnapshotResponse(table_snapshot)
+        return self._mock_response(response, 200)
+
     def _get_file_table(self, identifier: Identifier):
         """Construct a FileStoreTable from the metadata store.
 
-        Mirrors Java RESTCatalogServer.getFileTable(): loads the schema from
-        the metadata store, builds a CatalogEnvironment (without catalog
-        loader so rollback goes through local file cleanup), and returns a
-        FileStoreTable.
+        loads the schema from the metadata store, builds a CatalogEnvironment
+        (without catalog loader so rollback goes through local file cleanup),
+        and returns a FileStoreTable.
         """
         from pypaimon.catalog.catalog_environment import CatalogEnvironment
         from pypaimon.common.file_io import FileIO
diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py 
b/paimon-python/pypaimon/tests/write/table_write_test.py
index 1ff723ca73..60ddcdc0bd 100644
--- a/paimon-python/pypaimon/tests/write/table_write_test.py
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -27,6 +27,8 @@ from pypaimon import CatalogFactory, Schema
 import pyarrow as pa
 from parameterized import parameterized
 
+from pypaimon.common.json_util import JSON
+
 
 class TableWriteTest(unittest.TestCase):
     @classmethod
@@ -89,7 +91,7 @@ class TableWriteTest(unittest.TestCase):
         self.assertEqual(self.expected, actual)
 
         # snapshot
-        snapshot_json: str = 
table.snapshot_manager().get_latest_snapshot_json()
+        snapshot_json: str = 
JSON.to_json(table.snapshot_manager().get_latest_snapshot())
         self.assertEquals(True, snapshot_json.__contains__("baseManifestList"))
         self.assertEquals(False, snapshot_json.__contains__("nextRowId"))
 
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index ee7d7a9694..397a597dc0 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -230,12 +230,10 @@ class FileStoreCommit:
 
     def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan,
                     detect_conflicts=False, allow_rollback=False):
-        import threading
 
         retry_count = 0
         retry_result = None
         start_time_ms = int(time.time() * 1000)
-        thread_id = threading.current_thread().name
         while True:
             latest_snapshot = self.snapshot_manager.get_latest_snapshot()
             commit_entries = commit_entries_plan(latest_snapshot)
@@ -257,12 +255,6 @@ class FileStoreCommit:
 
             if result.is_success():
                 commit_duration_ms = int(time.time() * 1000) - start_time_ms
-                logger.info(
-                    "Thread %s: commit success %d after %d retries",
-                    thread_id,
-                    latest_snapshot.id + 1 if latest_snapshot else 1,
-                    retry_count,
-                )
                 if commit_kind == "OVERWRITE":
                     logger.info(
                         "Finished overwrite to table %s, duration %d ms",
@@ -529,8 +521,6 @@ class FileStoreCommit:
         return entries
 
     def _commit_retry_wait(self, retry_count: int):
-        import threading
-        thread_id = threading.get_ident()
 
         retry_wait_ms = min(
             self.commit_min_retry_wait * (2 ** retry_count),
@@ -540,10 +530,6 @@ class FileStoreCommit:
         jitter_ms = random.randint(0, max(1, int(retry_wait_ms * 0.2)))
         total_wait_ms = retry_wait_ms + jitter_ms
 
-        logger.debug(
-            f"Thread {thread_id}: Waiting {total_wait_ms}ms before retry 
(base: {retry_wait_ms}ms, "
-            f"jitter: {jitter_ms}ms)"
-        )
         time.sleep(total_wait_ms / 1000.0)
 
     def _cleanup_preparation_failure(self,


Reply via email to