This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 16915c5522 [python] Support getting snapshot by RESTCatalog (#7412)
16915c5522 is described below
commit 16915c55221419027af3a9a5f31cb7a8118cbfe9
Author: umi <[email protected]>
AuthorDate: Thu Mar 12 17:42:46 2026 +0800
[python] Support getting snapshot by RESTCatalog (#7412)
---
paimon-python/pypaimon/api/api_request.py | 4 +-
paimon-python/pypaimon/api/api_response.py | 28 ++++++
paimon-python/pypaimon/api/resource_paths.py | 4 +
paimon-python/pypaimon/api/rest_api.py | 21 ++++-
paimon-python/pypaimon/catalog/catalog.py | 21 ++++-
.../pypaimon/catalog/catalog_environment.py | 11 +++
.../pypaimon/catalog/filesystem_catalog.py | 11 +++
.../pypaimon/catalog/filesystem_catalog_loader.py | 67 ++++++++++++++
.../pypaimon/catalog/rest/rest_catalog.py | 30 +++++-
paimon-python/pypaimon/snapshot/snapshot_loader.py | 57 ++++++++++++
.../pypaimon/snapshot/snapshot_manager.py | 56 ++++++++---
paimon-python/pypaimon/snapshot/table_snapshot.py | 38 ++++++++
.../rest/rest_catalog_commit_snapshot_test.py | 4 +-
.../pypaimon/tests/rest/rest_catalog_test.py | 96 +++++++++++++++++++
paimon-python/pypaimon/tests/rest/rest_server.py | 102 ++++++++++++---------
.../pypaimon/tests/write/table_write_test.py | 4 +-
paimon-python/pypaimon/write/file_store_commit.py | 14 ---
17 files changed, 487 insertions(+), 81 deletions(-)
diff --git a/paimon-python/pypaimon/api/api_request.py
b/paimon-python/pypaimon/api/api_request.py
index f191fb03d3..6d1af78894 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -71,11 +71,11 @@ class CreateTableRequest(RESTRequest):
@dataclass
class CommitTableRequest(RESTRequest):
- FIELD_TABLE_UUID = "tableUuid"
+ FIELD_TABLE_ID = "tableId"
FIELD_SNAPSHOT = "snapshot"
FIELD_STATISTICS = "statistics"
- table_uuid: Optional[str] = json_field(FIELD_TABLE_UUID)
+ table_id: Optional[str] = json_field(FIELD_TABLE_ID)
snapshot: Snapshot = json_field(FIELD_SNAPSHOT)
statistics: List[PartitionStatistics] = json_field(FIELD_STATISTICS)
diff --git a/paimon-python/pypaimon/api/api_response.py
b/paimon-python/pypaimon/api/api_response.py
index 8e1e498f59..0cfa73c855 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -23,6 +23,7 @@ from typing import Dict, Generic, List, Optional
from pypaimon.common.json_util import T, json_field
from pypaimon.common.options import Options
from pypaimon.schema.schema import Schema
+from pypaimon.snapshot.table_snapshot import TableSnapshot
@dataclass
@@ -37,6 +38,18 @@ class RESTResponse(ABC):
@dataclass
class ErrorResponse(RESTResponse):
+ """Error response"""
+ RESOURCE_TYPE_DATABASE = "DATABASE"
+ RESOURCE_TYPE_TABLE = "TABLE"
+ RESOURCE_TYPE_VIEW = "VIEW"
+ RESOURCE_TYPE_FUNCTION = "FUNCTION"
+ RESOURCE_TYPE_COLUMN = "COLUMN"
+ RESOURCE_TYPE_SNAPSHOT = "SNAPSHOT"
+ RESOURCE_TYPE_TAG = "TAG"
+ RESOURCE_TYPE_BRANCH = "BRANCH"
+ RESOURCE_TYPE_DEFINITION = "DEFINITION"
+ RESOURCE_TYPE_DIALECT = "DIALECT"
+
resource_type: Optional[str] = json_field("resourceType", default=None)
resource_name: Optional[str] = json_field("resourceName", default=None)
message: Optional[str] = json_field("message", default=None)
@@ -267,3 +280,18 @@ class CommitTableResponse(RESTResponse):
def is_success(self) -> bool:
return self.success
+
+
+@dataclass
+class GetTableSnapshotResponse(RESTResponse):
+ """Response for getting table snapshot."""
+
+ FIELD_SNAPSHOT = "snapshot"
+
+ snapshot: Optional[TableSnapshot] = json_field(FIELD_SNAPSHOT,
default=None)
+
+ def __init__(self, snapshot: Optional[TableSnapshot] = None):
+ self.snapshot = snapshot
+
+ def get_snapshot(self) -> Optional[TableSnapshot]:
+ return self.snapshot
diff --git a/paimon-python/pypaimon/api/resource_paths.py
b/paimon-python/pypaimon/api/resource_paths.py
index 16967d16e2..03a7c43fab 100644
--- a/paimon-python/pypaimon/api/resource_paths.py
+++ b/paimon-python/pypaimon/api/resource_paths.py
@@ -74,3 +74,7 @@ class ResourcePaths:
def rollback_table(self, database_name: str, table_name: str) -> str:
return ("{}/{}/{}/{}/{}/rollback".format(self.base_path,
self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES,
RESTUtil.encode_string(table_name)))
+
+ def table_snapshot(self, database_name: str, table_name: str) -> str:
+ return ("{}/{}/{}/{}/{}/snapshot".format(self.base_path,
self.DATABASES, RESTUtil.encode_string(database_name),
+ self.TABLES,
RESTUtil.encode_string(table_name)))
diff --git a/paimon-python/pypaimon/api/rest_api.py
b/paimon-python/pypaimon/api/rest_api.py
index 069220d687..b7c816c9d9 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -27,7 +27,7 @@ from pypaimon.api.api_response import (CommitTableResponse,
ConfigResponse,
GetTableTokenResponse,
ListDatabasesResponse,
ListTablesResponse, PagedList,
- PagedResponse)
+ PagedResponse, GetTableSnapshotResponse)
from pypaimon.api.auth import AuthProviderFactory, RESTAuthFunction
from pypaimon.api.client import HttpClient
from pypaimon.api.resource_paths import ResourcePaths
@@ -380,6 +380,25 @@ class RESTApi:
self.rest_auth_function
)
+ def load_snapshot(self, identifier: Identifier) ->
Optional['TableSnapshot']:
+ """Load latest snapshot for table.
+
+ Args:
+ identifier: Database name and table name.
+
+ Returns:
+ TableSnapshot instance or None if snapshot not found.
+ """
+ database_name, table_name = self.__validate_identifier(identifier)
+ response = self.client.get(
+ self.resource_paths.table_snapshot(database_name, table_name),
+ GetTableSnapshotResponse,
+ self.rest_auth_function
+ )
+ if response is None:
+ return None
+ return response.get_snapshot()
+
@staticmethod
def __validate_identifier(identifier: Identifier):
if not identifier:
diff --git a/paimon-python/pypaimon/catalog/catalog.py
b/paimon-python/pypaimon/catalog/catalog.py
index 2cca595411..d5f6cdee99 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -138,6 +138,21 @@ class Catalog(ABC):
"""
return False
+ @abstractmethod
+ def load_snapshot(self, identifier: Identifier):
+ """Load the snapshot of table identified by the given Identifier.
+
+ Args:
+ identifier: Path of the table
+
+ Returns:
+ TableSnapshot instance
+
+ Raises:
+ NotImplementedError: If the catalog does not support version
management
+ TableNotExistException: If the table does not exist
+ """
+
@abstractmethod
def commit_snapshot(
self,
@@ -178,9 +193,9 @@ class Catalog(ABC):
)
def drop_partitions(
- self,
- identifier: Union[str, Identifier],
- partitions: List[Dict[str, str]],
+ self,
+ identifier: Union[str, Identifier],
+ partitions: List[Dict[str, str]],
) -> None:
raise NotImplementedError(
"drop_partitions is not supported by this catalog. Use REST
catalog for partition drop."
diff --git a/paimon-python/pypaimon/catalog/catalog_environment.py
b/paimon-python/pypaimon/catalog/catalog_environment.py
index 2d95365508..984a007ed4 100644
--- a/paimon-python/pypaimon/catalog/catalog_environment.py
+++ b/paimon-python/pypaimon/catalog/catalog_environment.py
@@ -23,6 +23,7 @@ from pypaimon.common.identifier import Identifier
from pypaimon.snapshot.catalog_snapshot_commit import CatalogSnapshotCommit
from pypaimon.snapshot.renaming_snapshot_commit import RenamingSnapshotCommit
from pypaimon.snapshot.snapshot_commit import SnapshotCommit
+from pypaimon.snapshot.snapshot_loader import SnapshotLoader
class CatalogEnvironment:
@@ -76,6 +77,16 @@ class CatalogEnvironment:
return TableRollback(catalog, self.identifier)
return None
+ def snapshot_loader(self) -> Optional[SnapshotLoader]:
+ """Create a SnapshotLoader instance based on the catalog environment.
+
+ Returns:
+ SnapshotLoader instance if catalog_loader is available, None
otherwise
+ """
+ if self.catalog_loader is not None:
+ return SnapshotLoader(self.catalog_loader, self.identifier)
+ return None
+
def copy(self, identifier: Identifier) -> 'CatalogEnvironment':
"""
Create a copy of this CatalogEnvironment with a different identifier.
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index eb8fea5ba0..73a7f6e3cd 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -243,3 +243,14 @@ class FileSystemCatalog(Catalog):
statistics: List[PartitionStatistics]
) -> bool:
raise NotImplementedError("This catalog does not support commit
catalog")
+
+ def load_snapshot(self, identifier: Identifier):
+ """Load the snapshot of table identified by the given Identifier.
+
+ Args:
+ identifier: Path of the table
+
+ Raises:
+ NotImplementedError: FileSystemCatalog does not support version
management
+ """
+ raise NotImplementedError("Filesystem catalog does not support
load_snapshot")
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog_loader.py
b/paimon-python/pypaimon/catalog/filesystem_catalog_loader.py
new file mode 100644
index 0000000000..664b521d58
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog_loader.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+FileSystemCatalogLoader implementation for pypaimon.
+
+This module provides the FileSystemCatalogLoader class which implements the
CatalogLoader
+interface to create and load FileSystemCatalog instances.
+"""
+
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+
+
+class FileSystemCatalogLoader(CatalogLoader):
+ """
+ Loader to create FileSystemCatalog instances.
+
+ This class implements the CatalogLoader interface and is responsible for
+ creating and configuring FileSystemCatalog instances based on the provided
+ CatalogContext.
+ """
+
+ def __init__(self, context: CatalogContext):
+ """
+ Initialize FileSystemCatalogLoader with a CatalogContext.
+
+ Args:
+ context: The CatalogContext containing configuration options
+ """
+ self._context = context
+
+ def context(self) -> CatalogContext:
+ """
+ Get the CatalogContext associated with this loader.
+
+ Returns:
+ The CatalogContext instance
+ """
+ return self._context
+
+ def load(self) -> FileSystemCatalog:
+ """
+ Load and return a new FileSystemCatalog instance.
+
+ This method creates a new FileSystemCatalog instance using the stored
+ CatalogContext.
+
+ Returns:
+ A new FileSystemCatalog instance
+ """
+ return FileSystemCatalog(self._context.options)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index bfb936d9a7..399d4e1a86 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -17,7 +17,7 @@ limitations under the License.
"""
from typing import Any, Callable, Dict, List, Optional, Union
-from pypaimon.api.api_response import GetTableResponse, PagedList
+from pypaimon.api.api_response import GetTableResponse, PagedList,
ErrorResponse
from pypaimon.api.rest_api import RESTApi
from pypaimon.api.rest_exception import NoSuchResourceException,
AlreadyExistsException, ForbiddenException
from pypaimon.catalog.catalog import Catalog
@@ -301,16 +301,40 @@ class RESTCatalog(Catalog):
try:
self.rest_api.rollback_to(identifier, instant, from_snapshot)
except NoSuchResourceException as e:
- if e.resource_type == "snapshot":
+ if e.resource_type == ErrorResponse.RESOURCE_TYPE_SNAPSHOT:
raise ValueError(
"Rollback snapshot '{}' doesn't
exist.".format(e.resource_name)) from e
- elif e.resource_type == "tag":
+ elif e.resource_type == ErrorResponse.RESOURCE_TYPE_TAG:
raise ValueError(
"Rollback tag '{}' doesn't
exist.".format(e.resource_name)) from e
raise TableNotExistException(identifier) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e
+ def load_snapshot(self, identifier: Union[str, Identifier]) ->
Optional['TableSnapshot']:
+ """Load the latest snapshot for table.
+
+ Args:
+ identifier: Path of the table (Identifier or string).
+
+ Returns:
+ TableSnapshot instance or None if snapshot not found.
+
+ Raises:
+ TableNotExistException: If the table does not exist.
+ TableNoPermissionException: If no permission to access this table.
+ """
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ return self.rest_api.load_snapshot(identifier)
+ except NoSuchResourceException as e:
+ if e.resource_type == ErrorResponse.RESOURCE_TYPE_SNAPSHOT:
+ return None
+ raise TableNotExistException(identifier) from e
+ except ForbiddenException as e:
+ raise TableNoPermissionException(identifier) from e
+
def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
try:
response = self.rest_api.get_table(identifier)
diff --git a/paimon-python/pypaimon/snapshot/snapshot_loader.py
b/paimon-python/pypaimon/snapshot/snapshot_loader.py
new file mode 100644
index 0000000000..aba0a66c58
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/snapshot_loader.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Snapshot loader for loading snapshots from a catalog."""
+
+from typing import Optional
+
+
+class SnapshotLoader:
+ """Loader to load latest snapshot from a catalog.
+
+ This loader uses a CatalogLoader to create catalog instances and
+ load snapshots through the catalog's load_snapshot method.
+ """
+
+ def __init__(self, catalog_loader, identifier):
+ """Initialize the loader with a catalog loader and table identifier.
+
+ Args:
+ catalog_loader: The CatalogLoader instance to create catalog
+ identifier: The table identifier
+ """
+ self.catalog_loader = catalog_loader
+ self.identifier = identifier
+
+ def load(self) -> Optional[str]:
+ """Load the latest snapshot from the catalog.
+
+ Returns:
+ The latest snapshot JSON string, or None if not found
+
+ Raises:
+ RuntimeError: If there's an error loading the snapshot
+ """
+ try:
+ catalog = self.catalog_loader.load()
+ table_snapshot = catalog.load_snapshot(self.identifier)
+ if table_snapshot is None:
+ return None
+ return table_snapshot.snapshot
+ except RuntimeError as e:
+ raise e
+ except Exception as e:
+ raise RuntimeError(f"Failed to load snapshot: {e}")
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 61678e2d96..1ad1364d3a 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -1,4 +1,3 @@
-################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -7,14 +6,14 @@
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
import logging
from typing import Optional
@@ -23,6 +22,7 @@ from pypaimon.common.file_io import FileIO
logger = logging.getLogger(__name__)
from pypaimon.common.json_util import JSON
from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_loader import SnapshotLoader
class SnapshotManager:
@@ -33,17 +33,45 @@ class SnapshotManager:
self.table: FileStoreTable = table
self.file_io: FileIO = self.table.file_io
+ self.snapshot_loader: Optional[SnapshotLoader] =
self.table.catalog_environment.snapshot_loader()
+
snapshot_path = self.table.table_path.rstrip('/')
self.snapshot_dir = f"{snapshot_path}/snapshot"
self.latest_file = f"{self.snapshot_dir}/LATEST"
def get_latest_snapshot(self) -> Optional[Snapshot]:
- snapshot_json = self.get_latest_snapshot_json()
- if snapshot_json is None:
- return None
- return JSON.from_json(snapshot_json, Snapshot)
+ """
+ Get the latest snapshot with loader priority.
- def get_latest_snapshot_json(self) -> Optional[str]:
+ 1. Try to load from snapshotLoader if available
+ 2. Fallback to filesystem if loader is not available or throws
NotImplementedError`
+ 3. Return None if no snapshot found
+
+ Returns:
+ The latest snapshot JSON string, or None if not found
+ """
+ # Try to load from snapshotLoader if available
+ if self.snapshot_loader is not None:
+ try:
+ snapshot = self.snapshot_loader.load()
+ except NotImplementedError:
+ # Loader not supported, fallback to filesystem
+ snapshot = self._get_latest_snapshot_from_filesystem()
+ except IOError as e:
+ # IO error, re-raise with context
+ raise RuntimeError(f"Failed to load snapshot from loader: {e}")
+ else:
+ # No loader, use filesystem directly
+ snapshot = self._get_latest_snapshot_from_filesystem()
+ return snapshot
+
+ def _get_latest_snapshot_from_filesystem(self) -> Optional[Snapshot]:
+ """
+ Get the latest snapshot from filesystem by reading LATEST file.
+
+ Returns:
+ The latest snapshot, or None if not found
+ """
if not self.file_io.exists(self.latest_file):
return None
@@ -54,7 +82,7 @@ class SnapshotManager:
if not self.file_io.exists(snapshot_file):
return None
- return self.file_io.read_file_utf8(snapshot_file)
+ return JSON.from_json(self.file_io.read_file_utf8(snapshot_file),
Snapshot)
def read_latest_file(self, max_retries: int = 5):
"""
diff --git a/paimon-python/pypaimon/snapshot/table_snapshot.py
b/paimon-python/pypaimon/snapshot/table_snapshot.py
new file mode 100644
index 0000000000..541d756bc4
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/table_snapshot.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""TableSnapshot class with snapshot and table statistics."""
+
+from dataclasses import dataclass
+
+from pypaimon.common.json_util import json_field
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+@dataclass
+class TableSnapshot:
+ """Snapshot of a table, including basic statistics of this table.
+
+ This class wraps a Snapshot and provides additional table-level statistics
+ such as record count, file size, file count, and last file creation time.
+ """
+
+ # Required fields
+ snapshot: Snapshot = json_field("snapshot")
+ record_count: int = json_field("recordCount")
+ file_size_in_bytes: int = json_field("fileSizeInBytes")
+ file_count: int = json_field("fileCount")
+ last_file_creation_time: int = json_field("lastFileCreationTime")
diff --git
a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
index 998b169084..6e5ea47618 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
@@ -169,13 +169,13 @@ class TestRESTCatalogCommitSnapshot(unittest.TestCase):
from pypaimon.api.api_request import CommitTableRequest
request = CommitTableRequest(
- table_uuid="test-uuid",
+ table_id="test-uuid",
snapshot=self.test_snapshot,
statistics=self.test_statistics
)
# Verify request fields
- self.assertEqual(request.table_uuid, "test-uuid")
+ self.assertEqual(request.table_id, "test-uuid")
self.assertEqual(request.snapshot, self.test_snapshot)
self.assertEqual(request.statistics, self.test_statistics)
diff --git a/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
index 153c22ba8c..cdf313f49e 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
@@ -272,6 +272,102 @@ class RESTCatalogTest(RESTBaseTest):
self.assertIn("no-such-tag", str(context.exception))
self.assertIn("doesn't exist", str(context.exception))
+ def test_catalog_load_snapshot(self):
+ """Test RESTCatalog.load_snapshot returns the latest snapshot."""
+ table_name = "default.table_for_load_snapshot"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+ identifier = Identifier.from_string(table_name)
+
+ # Initially, no snapshot exists
+ table_snapshot = self.rest_catalog.load_snapshot(identifier)
+ self.assertIsNone(table_snapshot)
+
+ # Write 3 commits
+ for i in range(3):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Load snapshot should return the latest (snapshot 3)
+ snapshot = self.rest_catalog.load_snapshot(identifier).snapshot
+ self.assertIsNotNone(snapshot)
+ self.assertEqual(snapshot.id, 3)
+ self.assertEqual(snapshot.schema_id, 0)
+
+ def test_catalog_load_snapshot_with_string_identifier(self):
+ """Test load_snapshot using a string identifier instead of Identifier
object."""
+ table_name = "default.table_load_snapshot_str_id"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+
+ # Write 2 commits
+ for i in range(2):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Use string identifier directly
+ table_snapshot = self.rest_catalog.load_snapshot(table_name)
+ self.assertIsNotNone(table_snapshot)
+ self.assertEqual(table_snapshot.snapshot.id, 2)
+
+ def test_catalog_load_snapshot_nonexistent_table(self):
+ """Test that load_snapshot on a non-existent table raises an error."""
+ from pypaimon.catalog.catalog_exception import TableNotExistException
+ identifier =
Identifier.from_string("default.no_such_table_for_snapshot")
+ with self.assertRaises(TableNotExistException) as context:
+ self.rest_catalog.load_snapshot(identifier)
+ self.assertIn("no_such_table_for_snapshot", str(context.exception))
+ self.assertIn("does not exist", str(context.exception))
+
+ def test_catalog_load_snapshot_after_rollback(self):
+ """Test load_snapshot returns correct snapshot after rollback."""
+ table_name = "default.table_load_snapshot_after_rollback"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+ identifier = Identifier.from_string(table_name)
+
+ # Write 5 commits
+ for i in range(5):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Verify latest snapshot is 5
+ table_snapshot = self.rest_catalog.load_snapshot(identifier)
+ self.assertEqual(table_snapshot.snapshot.id, 5)
+
+ # Rollback to snapshot 3
+ from pypaimon.table.instant import Instant
+ self.rest_catalog.rollback_to(identifier, Instant.snapshot(3))
+
+ # Load snapshot should now return snapshot 3
+ table_snapshot = self.rest_catalog.load_snapshot(identifier)
+ self.assertIsNotNone(table_snapshot)
+ self.assertEqual(table_snapshot.snapshot.id, 3)
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py
b/paimon-python/pypaimon/tests/rest/rest_server.py
index dd6801f897..ee84553932 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -21,7 +21,6 @@ import re
import threading
import time
import uuid
-from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
@@ -35,7 +34,7 @@ from pypaimon.api.api_request import (AlterTableRequest,
CreateDatabaseRequest,
from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse,
GetTableResponse, ListDatabasesResponse,
ListTablesResponse, PagedList,
- RESTResponse)
+ RESTResponse, ErrorResponse)
from pypaimon.api.resource_paths import ResourcePaths
from pypaimon.api.rest_util import RESTUtil
from pypaimon.catalog.catalog_exception import (DatabaseNoPermissionException,
@@ -45,45 +44,12 @@ from pypaimon.catalog.catalog_exception import
(DatabaseNoPermissionException,
TableAlreadyExistException)
from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.common.identifier import Identifier
-from pypaimon.common.json_util import JSON, json_field
+from pypaimon.common.json_util import JSON
from pypaimon import Schema
from pypaimon.schema.schema_change import Actions, SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
-
-@dataclass
-class ErrorResponse(RESTResponse):
- """Error response"""
- RESOURCE_TYPE_DATABASE = "database"
- RESOURCE_TYPE_TABLE = "table"
- RESOURCE_TYPE_VIEW = "view"
- RESOURCE_TYPE_FUNCTION = "function"
- RESOURCE_TYPE_COLUMN = "column"
- RESOURCE_TYPE_SNAPSHOT = "snapshot"
- RESOURCE_TYPE_TAG = "tag"
- RESOURCE_TYPE_BRANCH = "branch"
- RESOURCE_TYPE_DEFINITION = "definition"
- RESOURCE_TYPE_DIALECT = "dialect"
-
- resource_type: Optional[str] = json_field("resourceType", default=None)
- resource_name: Optional[str] = json_field("resourceName", default=None)
- message: Optional[str] = json_field("message", default=None)
- code: Optional[int] = json_field("code", default=None)
-
- def __init__(
- self,
- resource_type: Optional[str] = None,
- resource_name: Optional[str] = None,
- message: Optional[str] = None,
- code: Optional[int] = None,
- ):
- self.resource_type = resource_type
- self.resource_name = resource_name
- self.message = message
- self.code = code
-
-
# Constants
DEFAULT_MAX_RESULTS = 100
AUTHORIZATION_HEADER_KEY = "Authorization"
@@ -486,7 +452,7 @@ class RESTCatalogServer:
# Basic table operations (GET, DELETE, etc.)
return self._table_handle(method, data, lookup_identifier)
elif len(path_parts) == 4:
- # Extended operations (e.g., commit, token)
+ # Extended operations (e.g., commit, token, snapshot)
operation = path_parts[3]
if operation == "commit":
return self._table_commit_handle(method, data,
lookup_identifier, branch_part)
@@ -494,6 +460,8 @@ class RESTCatalogServer:
return self._table_token_handle(method, lookup_identifier)
elif operation == "rollback":
return self._table_rollback_handle(method, data,
lookup_identifier)
+ elif operation == "snapshot":
+ return self._table_snapshot_handle(method, lookup_identifier)
else:
return self._mock_response(ErrorResponse(None, None, "Not
Found", 404), 404)
return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
@@ -751,13 +719,65 @@ class RESTCatalogServer:
table.rollback_to(tag_name)
return self._mock_response("", 200)
+ def _table_snapshot_handle(self, method: str, identifier: Identifier) ->
Tuple[str, int]:
+ """Handle table snapshot operations.
+
+ Args:
+ method: HTTP method
+ identifier: Table identifier
+
+ Returns:
+ Tuple of (response JSON, HTTP status code)
+ """
+ if method != "GET":
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ if identifier.get_full_name() not in self.table_metadata_store:
+ raise TableNotExistException(identifier)
+
+ table_metadata = self.table_metadata_store[identifier.get_full_name()]
+ if table_metadata.is_external:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_TABLE,
+ identifier.get_full_name(),
+ "external paimon table does not support get table snapshot in
rest server",
+ 501)
+ return self._mock_response(response, 404)
+
+ # Get the table and snapshot manager to retrieve snapshot
+ table = self._get_file_table(identifier)
+ snapshot_manager = table.snapshot_manager()
+
+ # Get latest snapshot
+ snapshot = snapshot_manager.get_latest_snapshot()
+
+ if snapshot is None:
+ response = ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_SNAPSHOT,
+ identifier.get_database_name(),
+ "No Snapshot",
+ 404)
+ return self._mock_response(response, 404)
+
+ from pypaimon.api.api_response import GetTableSnapshotResponse
+ from pypaimon.snapshot.table_snapshot import TableSnapshot
+
+ table_snapshot = TableSnapshot(
+ snapshot=snapshot,
+ record_count=snapshot.total_record_count,
+ file_size_in_bytes=0,
+ file_count=0,
+ last_file_creation_time=snapshot.time_millis
+ )
+ response = GetTableSnapshotResponse(table_snapshot)
+ return self._mock_response(response, 200)
+
def _get_file_table(self, identifier: Identifier):
"""Construct a FileStoreTable from the metadata store.
- Mirrors Java RESTCatalogServer.getFileTable(): loads the schema from
- the metadata store, builds a CatalogEnvironment (without catalog
- loader so rollback goes through local file cleanup), and returns a
- FileStoreTable.
+ loads the schema from the metadata store, builds a CatalogEnvironment
+ (without catalog loader so rollback goes through local file cleanup),
+ and returns a FileStoreTable.
"""
from pypaimon.catalog.catalog_environment import CatalogEnvironment
from pypaimon.common.file_io import FileIO
diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py
b/paimon-python/pypaimon/tests/write/table_write_test.py
index 1ff723ca73..60ddcdc0bd 100644
--- a/paimon-python/pypaimon/tests/write/table_write_test.py
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -27,6 +27,8 @@ from pypaimon import CatalogFactory, Schema
import pyarrow as pa
from parameterized import parameterized
+from pypaimon.common.json_util import JSON
+
class TableWriteTest(unittest.TestCase):
@classmethod
@@ -89,7 +91,7 @@ class TableWriteTest(unittest.TestCase):
self.assertEqual(self.expected, actual)
# snapshot
- snapshot_json: str =
table.snapshot_manager().get_latest_snapshot_json()
+ snapshot_json: str =
JSON.to_json(table.snapshot_manager().get_latest_snapshot())
self.assertEquals(True, snapshot_json.__contains__("baseManifestList"))
self.assertEquals(False, snapshot_json.__contains__("nextRowId"))
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index ee7d7a9694..397a597dc0 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -230,12 +230,10 @@ class FileStoreCommit:
def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan,
detect_conflicts=False, allow_rollback=False):
- import threading
retry_count = 0
retry_result = None
start_time_ms = int(time.time() * 1000)
- thread_id = threading.current_thread().name
while True:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
commit_entries = commit_entries_plan(latest_snapshot)
@@ -257,12 +255,6 @@ class FileStoreCommit:
if result.is_success():
commit_duration_ms = int(time.time() * 1000) - start_time_ms
- logger.info(
- "Thread %s: commit success %d after %d retries",
- thread_id,
- latest_snapshot.id + 1 if latest_snapshot else 1,
- retry_count,
- )
if commit_kind == "OVERWRITE":
logger.info(
"Finished overwrite to table %s, duration %d ms",
@@ -529,8 +521,6 @@ class FileStoreCommit:
return entries
def _commit_retry_wait(self, retry_count: int):
- import threading
- thread_id = threading.get_ident()
retry_wait_ms = min(
self.commit_min_retry_wait * (2 ** retry_count),
@@ -540,10 +530,6 @@ class FileStoreCommit:
jitter_ms = random.randint(0, max(1, int(retry_wait_ms * 0.2)))
total_wait_ms = retry_wait_ms + jitter_ms
- logger.debug(
- f"Thread {thread_id}: Waiting {total_wait_ms}ms before retry
(base: {retry_wait_ms}ms, "
- f"jitter: {jitter_ms}ms)"
- )
time.sleep(total_wait_ms / 1000.0)
def _cleanup_preparation_failure(self,