This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f457c6e68c [python] Support rollback API for RESTCatalog and
FileStoreTable (#7325)
f457c6e68c is described below
commit f457c6e68ca99d26490d5b37b79198f1f06a09cb
Author: umi <[email protected]>
AuthorDate: Mon Mar 2 17:51:42 2026 +0800
[python] Support rollback API for RESTCatalog and FileStoreTable (#7325)
---
docs/content/pypaimon/python-api.md | 75 ++++--
paimon-python/pypaimon/api/api_request.py | 10 +
paimon-python/pypaimon/api/resource_paths.py | 4 +
paimon-python/pypaimon/api/rest_api.py | 24 +-
paimon-python/pypaimon/catalog/catalog.py | 17 ++
.../pypaimon/catalog/catalog_environment.py | 16 ++
.../pypaimon/catalog/rest/rest_catalog.py | 29 ++-
paimon-python/pypaimon/catalog/table_rollback.py | 43 ++++
paimon-python/pypaimon/table/file_store_table.py | 92 +++++++
paimon-python/pypaimon/table/instant.py | 135 ++++++++++
paimon-python/pypaimon/table/rollback_helper.py | 93 +++++++
.../pypaimon/tests/rest/rest_catalog_test.py | 277 +++++++++++++++++++++
paimon-python/pypaimon/tests/rest/rest_server.py | 137 +++++++++-
.../pypaimon/tests/table/simple_table_test.py | 143 +++++++++++
14 files changed, 1064 insertions(+), 31 deletions(-)
diff --git a/docs/content/pypaimon/python-api.md
b/docs/content/pypaimon/python-api.md
index 9da4a07b66..3e9041ed5a 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -521,6 +521,36 @@ Key points about shard read:
- **Parallel Processing**: Each shard can be processed independently for
better performance
- **Consistency**: Combining all shards should produce the complete table data
+## Rollback
+
+Paimon supports rolling back a table to a previous snapshot or tag. This is
useful for undoing unwanted changes or
+restoring the table to a known good state.
+
+### Rollback to Snapshot
+
+You can rollback a table to a specific snapshot by its ID:
+
+```python
+table = catalog.get_table('database_name.table_name')
+
+# Rollback to snapshot 3
+table.rollback_to(3) # snapshot id
+```
+
+### Rollback to Tag
+
+You can also rollback a table to a previously created tag:
+
+```python
+table = catalog.get_table('database_name.table_name')
+
+# Rollback to tag 'v3'
+table.rollback_to('v3') # tag name
+```
+
+The `rollback_to` method accepts either an `int` (snapshot ID) or a `str` (tag
name) and automatically dispatches
+to the appropriate rollback logic.
+
## Data Types
| Python Native Type | PyArrow Type |
Paimon Type |
@@ -566,28 +596,31 @@ Key points about shard read:
The following shows the supported features of Python Paimon compared to Java
Paimon:
**Catalog Level**
- - FileSystemCatalog
- - RestCatalog
+
+- FileSystemCatalog
+- RestCatalog
**Table Level**
- - Append Tables
+
+- Append Tables
- `bucket = -1` (unaware)
- `bucket > 0` (fixed)
- - Primary Key Tables
- - only support deduplicate
- - `bucket = -2` (postpone)
- - `bucket > 0` (fixed)
- - read with deletion vectors enabled
- - Format Tables
- - PARQUET
- - CSV
- - JSON
- - ORC
- - TEXT
- - Read/Write Operations
- - Batch read and write for append tables and primary key tables
- - Predicate filtering
- - Overwrite semantics
- - Incremental reading of Delta data
- - Reading and writing blob data
- - `with_shard` feature
+- Primary Key Tables
+ - only support deduplicate
+ - `bucket = -2` (postpone)
+ - `bucket > 0` (fixed)
+ - read with deletion vectors enabled
+- Format Tables
+ - PARQUET
+ - CSV
+ - JSON
+ - ORC
+ - TEXT
+- Read/Write Operations
+ - Batch read and write for append tables and primary key tables
+ - Predicate filtering
+ - Overwrite semantics
+ - Incremental reading of Delta data
+ - Reading and writing blob data
+ - `with_shard` feature
+ - Rollback feature
diff --git a/paimon-python/pypaimon/api/api_request.py
b/paimon-python/pypaimon/api/api_request.py
index f2d062f8d0..f191fb03d3 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -26,6 +26,7 @@ from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.table.instant import Instant
class RESTRequest(ABC):
@@ -84,3 +85,12 @@ class AlterTableRequest(RESTRequest):
FIELD_CHANGES = "changes"
changes: List[SchemaChange] = json_field(FIELD_CHANGES)
+
+
+@dataclass
+class RollbackTableRequest(RESTRequest):
+ FIELD_INSTANT = "instant"
+ FIELD_FROM_SNAPSHOT = "fromSnapshot"
+
+ instant: Instant = json_field(FIELD_INSTANT)
+ from_snapshot: Optional[int] = json_field(FIELD_FROM_SNAPSHOT)
diff --git a/paimon-python/pypaimon/api/resource_paths.py
b/paimon-python/pypaimon/api/resource_paths.py
index 79adc0ade3..16967d16e2 100644
--- a/paimon-python/pypaimon/api/resource_paths.py
+++ b/paimon-python/pypaimon/api/resource_paths.py
@@ -70,3 +70,7 @@ class ResourcePaths:
def commit_table(self, database_name: str, table_name: str) -> str:
return ("{}/{}/{}/{}/{}/commit".format(self.base_path, self.DATABASES,
RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name)))
+
+ def rollback_table(self, database_name: str, table_name: str) -> str:
+ return ("{}/{}/{}/{}/{}/rollback".format(self.base_path,
self.DATABASES, RESTUtil.encode_string(database_name),
+ self.TABLES,
RESTUtil.encode_string(table_name)))
diff --git a/paimon-python/pypaimon/api/rest_api.py
b/paimon-python/pypaimon/api/rest_api.py
index dc2c02104b..069220d687 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -20,7 +20,8 @@ from typing import Callable, Dict, List, Optional, Union
from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest,
CommitTableRequest,
CreateDatabaseRequest,
- CreateTableRequest, RenameTableRequest)
+ CreateTableRequest, RenameTableRequest,
+ RollbackTableRequest)
from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
GetDatabaseResponse, GetTableResponse,
GetTableTokenResponse,
@@ -358,6 +359,27 @@ class RESTApi:
)
return response.is_success()
+ def rollback_to(self, identifier, instant, from_snapshot=None):
+ """Rollback table to the given instant.
+
+ Args:
+ identifier: The table identifier.
+ instant: The Instant (SnapshotInstant or TagInstant) to rollback
to.
+ from_snapshot: Optional snapshot ID. Success only occurs when the
+ latest snapshot is this snapshot.
+
+ Raises:
+ NoSuchResourceException: If the table, snapshot or tag does not
exist.
+ ForbiddenException: If no permission to access this table.
+ """
+ database_name, table_name = self.__validate_identifier(identifier)
+ request = RollbackTableRequest(instant=instant,
from_snapshot=from_snapshot)
+ self.client.post(
+ self.resource_paths.rollback_table(database_name, table_name),
+ request,
+ self.rest_auth_function
+ )
+
@staticmethod
def __validate_identifier(identifier: Identifier):
if not identifier:
diff --git a/paimon-python/pypaimon/catalog/catalog.py
b/paimon-python/pypaimon/catalog/catalog.py
index 729522450b..aa914d2978 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -95,6 +95,23 @@ class Catalog(ABC):
"""
+ def rollback_to(self, identifier, instant, from_snapshot=None):
+ """Rollback table by the given identifier and instant.
+
+ Args:
+ identifier: Path of the table (Identifier instance).
+ instant: The Instant (SnapshotInstant or TagInstant) to rollback
to.
+ from_snapshot: Optional snapshot ID. Success only occurs when the
+ latest snapshot is this snapshot.
+
+ Raises:
+ TableNotExistException: If the table does not exist.
+ UnsupportedOperationError: If the catalog does not support version
management.
+ """
+ raise NotImplementedError(
+ "rollback_to is not supported by this catalog."
+ )
+
def drop_partitions(
self,
identifier: Union[str, Identifier],
diff --git a/paimon-python/pypaimon/catalog/catalog_environment.py
b/paimon-python/pypaimon/catalog/catalog_environment.py
index 762a42dd6a..2d95365508 100644
--- a/paimon-python/pypaimon/catalog/catalog_environment.py
+++ b/paimon-python/pypaimon/catalog/catalog_environment.py
@@ -60,6 +60,22 @@ class CatalogEnvironment:
# to create locks based on the catalog lock context
return RenamingSnapshotCommit(snapshot_manager)
+ def catalog_table_rollback(self):
+ """Create a TableRollback instance based on the catalog environment.
+
+ Returns a TableRollback that delegates to catalog.rollback_to
+ when catalog_loader is available and version management is supported.
+ Returns None otherwise (fallback to local file cleanup).
+
+ Returns:
+ A TableRollback instance, or None.
+ """
+ if self.catalog_loader is not None and
self.supports_version_management:
+ from pypaimon.catalog.table_rollback import TableRollback
+ catalog = self.catalog_loader.load()
+ return TableRollback(catalog, self.identifier)
+ return None
+
def copy(self, identifier: Identifier) -> 'CatalogEnvironment':
"""
Create a copy of this CatalogEnvironment with a different identifier.
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 9cf138c254..3e3a4916d4 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -45,7 +45,6 @@ from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.table.format.format_table import FormatTable, Format
from pypaimon.table.iceberg.iceberg_table import IcebergTable
-
FORMAT_TABLE_TYPE = "format-table"
ICEBERG_TABLE_TYPE = "iceberg-table"
@@ -256,6 +255,34 @@ class RESTCatalog(Catalog):
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e
+ def rollback_to(self, identifier, instant, from_snapshot=None):
+ """Rollback table by the given identifier and instant.
+
+ Args:
+ identifier: Path of the table (Identifier or string).
+ instant: The Instant (SnapshotInstant or TagInstant) to rollback
to.
+ from_snapshot: Optional snapshot ID. Success only occurs when the
+ latest snapshot is this snapshot.
+
+ Raises:
+ TableNotExistException: If the table does not exist.
+ TableNoPermissionException: If no permission to access this table.
+ """
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ try:
+ self.rest_api.rollback_to(identifier, instant, from_snapshot)
+ except NoSuchResourceException as e:
+ if e.resource_type == "snapshot":
+ raise ValueError(
+ "Rollback snapshot '{}' doesn't
exist.".format(e.resource_name)) from e
+ elif e.resource_type == "tag":
+ raise ValueError(
+ "Rollback tag '{}' doesn't
exist.".format(e.resource_name)) from e
+ raise TableNotExistException(identifier) from e
+ except ForbiddenException as e:
+ raise TableNoPermissionException(identifier) from e
+
def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
try:
response = self.rest_api.get_table(identifier)
diff --git a/paimon-python/pypaimon/catalog/table_rollback.py
b/paimon-python/pypaimon/catalog/table_rollback.py
new file mode 100644
index 0000000000..dbff309581
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/table_rollback.py
@@ -0,0 +1,43 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+
+class TableRollback:
+ """Rollback table to instant by delegating to catalog.rollback_to."""
+
+ def __init__(self, catalog, identifier):
+ self._catalog = catalog
+ self._identifier = identifier
+
+ def rollback_to(self, instant, from_snapshot=None):
+ """Rollback table to the given instant via catalog.
+
+ Args:
+ instant: The Instant (SnapshotInstant or TagInstant) to rollback
to.
+ from_snapshot: Optional snapshot ID. Success only occurs when the
+ latest snapshot is this snapshot.
+
+ Raises:
+ RuntimeError: If the rollback fails.
+ """
+ try:
+ self._catalog.rollback_to(self._identifier, instant, from_snapshot)
+ except Exception as e:
+ raise RuntimeError(
+ "Failed to rollback table {}: {}".format(
+ self._identifier, e)) from e
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 4f7e3c81dd..5fd46248d0 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -149,6 +149,98 @@ class FileStoreTable(Table):
tag_mgr = self.tag_manager()
return tag_mgr.list_tags()
+ def rollback_to(self, target):
+ """Rollback table to the given snapshot ID (int) or tag name (str).
+
+ Dispatches based on argument type:
+ - int: rollback to the snapshot with that ID
+ - str: rollback to the tag with that name
+
+ First tries catalog-based rollback (e.g., REST). If catalog rollback
+ is not available, falls back to local file cleanup.
+
+ Args:
+ target: Snapshot ID (int) or tag name (str).
+
+ Raises:
+ ValueError: If the target doesn't exist or type is unsupported.
+ """
+ if isinstance(target, int):
+ self._rollback_to_snapshot(target)
+ elif isinstance(target, str):
+ self._rollback_to_tag(target)
+ else:
+ raise ValueError("Unsupported rollback target type:
{}".format(type(target)))
+
+ def _rollback_to_snapshot(self, snapshot_id):
+ """Rollback to a snapshot by ID.
+
+ First tries catalog-based rollback. If not available, falls back to
+ local file cleanup. If the snapshot file has been expired, tries to
+ recover from a tag with the same snapshot ID.
+
+ Args:
+ snapshot_id: The snapshot ID to rollback to.
+
+ Raises:
+ ValueError: If the snapshot doesn't exist.
+ """
+ from pypaimon.table.instant import Instant
+
+ table_rollback = self.catalog_environment.catalog_table_rollback()
+ if table_rollback is not None:
+ table_rollback.rollback_to(Instant.snapshot(snapshot_id))
+ return
+
+ snapshot_mgr = self.snapshot_manager()
+ snapshot = snapshot_mgr.get_snapshot_by_id(snapshot_id)
+ if snapshot is not None:
+ self.rollback_helper().clean_larger_than(snapshot)
+ return
+
+ tag_mgr = self.tag_manager()
+ for tag_name in tag_mgr.list_tags():
+ tag = tag_mgr.get(tag_name)
+ if tag is not None:
+ tag_snapshot = tag.trim_to_snapshot()
+ if tag_snapshot.id == snapshot_id:
+ self.rollback_to(tag_name)
+ return
+
+ raise ValueError(
+ "Rollback snapshot '{}' doesn't exist.".format(snapshot_id))
+
+ def _rollback_to_tag(self, tag_name):
+ """Rollback to a tag by name.
+
+ First tries catalog-based rollback. If not available, falls back to
+ local file cleanup and recreates the snapshot file if needed.
+
+ Args:
+ tag_name: The tag name to rollback to.
+
+ Raises:
+ ValueError: If the tag doesn't exist.
+ """
+ from pypaimon.table.instant import Instant
+
+ table_rollback = self.catalog_environment.catalog_table_rollback()
+ if table_rollback is not None:
+ table_rollback.rollback_to(Instant.tag(tag_name))
+ return
+
+ tag_mgr = self.tag_manager()
+ tagged_snapshot = tag_mgr.get_or_throw(tag_name).trim_to_snapshot()
+ helper = self.rollback_helper()
+ helper.clean_larger_than(tagged_snapshot)
+ helper.create_snapshot_file_if_needed(tagged_snapshot)
+
+ def rollback_helper(self):
+ """Create a RollbackHelper instance for this table."""
+ from pypaimon.table.rollback_helper import RollbackHelper
+ return RollbackHelper(
+ self.snapshot_manager(), self.tag_manager(), self.file_io)
+
def rename_tag(self, old_name: str, new_name: str) -> None:
"""
Rename a tag.
diff --git a/paimon-python/pypaimon/table/instant.py
b/paimon-python/pypaimon/table/instant.py
new file mode 100644
index 0000000000..3a5fef72e9
--- /dev/null
+++ b/paimon-python/pypaimon/table/instant.py
@@ -0,0 +1,135 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC, abstractmethod
+
+
+class Instant(ABC):
+ """Table rollback instant.
+
+ Supports polymorphic JSON serialization via to_dict/from_dict
+
+ Serialization format:
+ SnapshotInstant: {"type": "snapshot", "snapshotId": 123}
+ TagInstant: {"type": "tag", "tagName": "test_tag"}
+ """
+
+ FIELD_TYPE = "type"
+ TYPE_SNAPSHOT = "snapshot"
+ TYPE_TAG = "tag"
+
+ @staticmethod
+ def snapshot(snapshot_id):
+ """Create a SnapshotInstant.
+
+ Args:
+ snapshot_id: The snapshot ID to rollback to.
+
+ Returns:
+ A SnapshotInstant instance.
+ """
+ return SnapshotInstant(snapshot_id)
+
+ @staticmethod
+ def tag(tag_name):
+ """Create a TagInstant.
+
+ Args:
+ tag_name: The tag name to rollback to.
+
+ Returns:
+ A TagInstant instance.
+ """
+ return TagInstant(tag_name)
+
+ @abstractmethod
+ def to_dict(self):
+ """Serialize this Instant to a dictionary for JSON output."""
+
+ @staticmethod
+ def from_dict(data):
+ """Deserialize an Instant from a dictionary.
+
+ Args:
+ data: A dictionary with a 'type' field indicating the instant type.
+
+ Returns:
+ A SnapshotInstant or TagInstant instance.
+
+ Raises:
+ ValueError: If the type field is missing or unknown.
+ """
+ instant_type = data.get(Instant.FIELD_TYPE)
+ if instant_type == Instant.TYPE_SNAPSHOT:
+ return SnapshotInstant(data[SnapshotInstant.FIELD_SNAPSHOT_ID])
+ elif instant_type == Instant.TYPE_TAG:
+ return TagInstant(data[TagInstant.FIELD_TAG_NAME])
+ else:
+ raise ValueError("Unknown instant type: {}".format(instant_type))
+
+
+class SnapshotInstant(Instant):
+ """Snapshot instant for table rollback."""
+
+ FIELD_SNAPSHOT_ID = "snapshotId"
+
+ def __init__(self, snapshot_id):
+ self.snapshot_id = snapshot_id
+
+ def to_dict(self):
+ return {
+ Instant.FIELD_TYPE: Instant.TYPE_SNAPSHOT,
+ self.FIELD_SNAPSHOT_ID: self.snapshot_id,
+ }
+
+ def __eq__(self, other):
+ if not isinstance(other, SnapshotInstant):
+ return False
+ return self.snapshot_id == other.snapshot_id
+
+ def __hash__(self):
+ return hash(self.snapshot_id)
+
+ def __repr__(self):
+ return "SnapshotInstant(snapshot_id={})".format(self.snapshot_id)
+
+
+class TagInstant(Instant):
+ """Tag instant for table rollback."""
+
+ FIELD_TAG_NAME = "tagName"
+
+ def __init__(self, tag_name):
+ self.tag_name = tag_name
+
+ def to_dict(self):
+ return {
+ Instant.FIELD_TYPE: Instant.TYPE_TAG,
+ self.FIELD_TAG_NAME: self.tag_name,
+ }
+
+ def __eq__(self, other):
+ if not isinstance(other, TagInstant):
+ return False
+ return self.tag_name == other.tag_name
+
+ def __hash__(self):
+ return hash(self.tag_name)
+
+ def __repr__(self):
+ return "TagInstant(tag_name={})".format(self.tag_name)
diff --git a/paimon-python/pypaimon/table/rollback_helper.py
b/paimon-python/pypaimon/table/rollback_helper.py
new file mode 100644
index 0000000000..f7fe2c45bc
--- /dev/null
+++ b/paimon-python/pypaimon/table/rollback_helper.py
@@ -0,0 +1,93 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import logging
+
+from pypaimon.common.json_util import JSON
+
+logger = logging.getLogger(__name__)
+
+
+class RollbackHelper:
+ """Helper class for table rollback including utils to clean snapshots.
+ """
+
+ def __init__(self, snapshot_manager, tag_manager, file_io):
+ """Initialize RollbackHelper.
+
+ Args:
+ snapshot_manager: The SnapshotManager instance.
+ tag_manager: The TagManager instance.
+ file_io: The FileIO instance for file operations.
+ """
+ self._snapshot_manager = snapshot_manager
+ self._tag_manager = tag_manager
+ self._file_io = file_io
+
+ def clean_larger_than(self, retained_snapshot):
+ """Clean snapshots and tags whose id is larger than the retained
snapshot.
+
+ Updates the LATEST hint and removes snapshot files and tag files
+ for snapshots newer than the retained one.
+
+ Args:
+ retained_snapshot: The snapshot to retain; everything newer is
removed.
+ """
+ latest = self._snapshot_manager.get_latest_snapshot()
+ if latest is None:
+ return
+
+ latest_id = latest.id
+ retained_id = retained_snapshot.id
+
+ # Update LATEST hint
+ try:
+ # Try atomic write first
+ success =
self._file_io.try_to_write_atomic(self._snapshot_manager.latest_file,
str(retained_id))
+ if not success:
+ # Fallback to regular write
+ self._file_io.write_file(self._snapshot_manager.latest_file,
str(retained_id), overwrite=True)
+ except Exception as e:
+ logger.warning("Failed to update LATEST hint: %s", e)
+
+ # Delete snapshot files larger than retained
+ for snapshot_id in range(retained_id + 1, latest_id + 1):
+ snapshot_path =
self._snapshot_manager.get_snapshot_path(snapshot_id)
+ if self._file_io.exists(snapshot_path):
+ self._file_io.delete(snapshot_path)
+
+ # Clean tags whose snapshot id is larger than retained
+ for tag_name in self._tag_manager.list_tags():
+ tag = self._tag_manager.get(tag_name)
+ if tag is not None:
+ tag_snapshot = tag.trim_to_snapshot()
+ if tag_snapshot.id > retained_id:
+ self._tag_manager.delete_tag(tag_name)
+
+ def create_snapshot_file_if_needed(self, tagged_snapshot):
+ """Create a snapshot file from a tag if the snapshot file doesn't
exist.
+
+ When rolling back to a tag, the snapshot file may have been expired.
+ This method recreates it from the tag data and updates the earliest
hint.
+
+ Args:
+ tagged_snapshot: The snapshot from the tag to potentially write.
+ """
+ snapshot_path =
self._snapshot_manager.get_snapshot_path(tagged_snapshot.id)
+ if not self._file_io.exists(snapshot_path):
+ self._file_io.write_file(
+ snapshot_path, JSON.to_json(tagged_snapshot), overwrite=False)
diff --git a/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
new file mode 100644
index 0000000000..153c22ba8c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
@@ -0,0 +1,277 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import Schema
+from pypaimon.common.identifier import Identifier
+from pypaimon.table.instant import Instant
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
+
+
+class RESTCatalogTest(RESTBaseTest):
+
+ def test_catalog_rollback(self):
+ """Test table rollback to snapshot and tag."""
+ table_name = "default.table_for_rollback"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+ identifier = Identifier.from_string(table_name)
+
+ # Write 10 commits and create a tag for each
+ for i in range(10):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ table.create_tag("tag-{}".format(i + 1))
+
+ snapshot_mgr = table.snapshot_manager()
+ tag_mgr = table.tag_manager()
+
+ # Verify we have 10 snapshots and 10 tags
+ latest = snapshot_mgr.get_latest_snapshot()
+ self.assertEqual(latest.id, 10)
+
+ # --- Rollback to snapshot 4 ---
+ rollback_to_snapshot_id = 4
+ self.rest_catalog.rollback_to(
+ identifier, Instant.snapshot(rollback_to_snapshot_id))
+
+ # After rollback, latest snapshot should be 4
+ latest_after = snapshot_mgr.get_latest_snapshot()
+ self.assertEqual(latest_after.id, rollback_to_snapshot_id)
+
+ # Tags with snapshot > 4 should be cleaned (tag-6 and above)
+
self.assertFalse(tag_mgr.tag_exists("tag-{}".format(rollback_to_snapshot_id +
2)))
+
+ # Snapshots > 4 should not exist
+ snapshot_5 = snapshot_mgr.get_snapshot_by_id(rollback_to_snapshot_id +
1)
+ self.assertIsNone(snapshot_5)
+
+ # Rollback to a non-existent snapshot (5) should fail
+ with self.assertRaises(ValueError) as context:
+ self.rest_catalog.rollback_to(
+ identifier, Instant.snapshot(rollback_to_snapshot_id + 1))
+ self.assertIn("Rollback snapshot", str(context.exception))
+ self.assertIn("doesn't exist", str(context.exception))
+
+ # --- Rollback to tag-3 (snapshot 3) ---
+ rollback_to_tag_name = "tag-{}".format(rollback_to_snapshot_id - 1)
+ self.rest_catalog.rollback_to(identifier,
Instant.tag(rollback_to_tag_name))
+
+ tag_snapshot =
tag_mgr.get_or_throw(rollback_to_tag_name).trim_to_snapshot()
+ latest_after_tag = snapshot_mgr.get_latest_snapshot()
+ self.assertEqual(latest_after_tag.id, tag_snapshot.id)
+
+ # --- Rollback to snapshot 2 with from_snapshot check ---
+ # from_snapshot=4 should fail because latest is 3
+ with self.assertRaises(Exception) as context:
+ self.rest_catalog.rollback_to(
+ identifier, Instant.snapshot(2), from_snapshot=4)
+ self.assertIn("Latest snapshot 3 is not 4", str(context.exception))
+
+ # from_snapshot=3 should succeed
+ self.rest_catalog.rollback_to(
+ identifier, Instant.snapshot(2), from_snapshot=3)
+ latest_final = snapshot_mgr.get_latest_snapshot()
+ self.assertEqual(latest_final.id, 2)
+
+ def test_catalog_rollback_to_nonexistent_tag(self):
+ """Test that rollback to a non-existent tag raises an error."""
+ table_name = "default.table_rollback_no_tag"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+ identifier = Identifier.from_string(table_name)
+
+ # Write one commit so the table has a snapshot
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [1]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ with self.assertRaises(ValueError) as context:
+ self.rest_catalog.rollback_to(
+ identifier, Instant.tag("nonexistent-tag"))
+ self.assertIn("Rollback tag", str(context.exception))
+ self.assertIn("nonexistent-tag", str(context.exception))
+ self.assertIn("doesn't exist", str(context.exception))
+
+ def test_catalog_rollback_with_string_identifier(self):
+ """Test rollback using a string identifier instead of Identifier
object."""
+ table_name = "default.table_rollback_str_id"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+
+ for i in range(3):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ snapshot_mgr = table.snapshot_manager()
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+
+ # Use string identifier directly (not Identifier object)
+ self.rest_catalog.rollback_to(table_name, Instant.snapshot(2))
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 2)
+
+ def test_catalog_rollback_on_nonexistent_table(self):
+ """Test that rollback on a non-existent table raises an error."""
+ from pypaimon.catalog.catalog_exception import TableNotExistException
+ identifier = Identifier.from_string("default.no_such_table")
+ with self.assertRaises(TableNotExistException) as context:
+ self.rest_catalog.rollback_to(identifier, Instant.snapshot(1))
+ self.assertIn("default.no_such_table", str(context.exception))
+ self.assertIn("does not exist", str(context.exception))
+
+ def test_table_rollback_to_snapshot(self):
+ """Test table-level rollback_to_snapshot via FileStoreTable."""
+ table_name = "default.table_level_rollback_snapshot"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+
+ # Write 5 commits
+ for i in range(5):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ snapshot_mgr = table.snapshot_manager()
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+ # Rollback to snapshot 3 via table method (singledispatch on int)
+ table.rollback_to(3)
+
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+ self.assertIsNone(snapshot_mgr.get_snapshot_by_id(4))
+ self.assertIsNone(snapshot_mgr.get_snapshot_by_id(5))
+
+ def test_table_rollback_to_tag(self):
+ """Test table-level rollback_to_tag via FileStoreTable."""
+ table_name = "default.table_level_rollback_tag"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+
+ # Write 5 commits and create tags
+ for i in range(5):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ table.create_tag("v{}".format(i + 1))
+
+ snapshot_mgr = table.snapshot_manager()
+ tag_mgr = table.tag_manager()
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+ # Rollback to tag v3 (singledispatch on str)
+ table.rollback_to("v3")
+
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+ # Tags with snapshot > 3 should be cleaned
+ self.assertFalse(tag_mgr.tag_exists("v4"))
+ self.assertFalse(tag_mgr.tag_exists("v5"))
+ # Tag v3 should still exist
+ self.assertTrue(tag_mgr.tag_exists("v3"))
+
+ def test_table_rollback_to_nonexistent_snapshot(self):
+ """Test that table-level rollback to non-existent snapshot raises
ValueError."""
+ table_name = "default.table_level_rollback_no_snap"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+
+ # Write 2 commits
+ for i in range(2):
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Rollback to snapshot 99 should fail
+ with self.assertRaises(Exception) as context:
+ table.rollback_to(99)
+ self.assertIn("Rollback snapshot", str(context.exception))
+ self.assertIn("99", str(context.exception))
+ self.assertIn("doesn't exist", str(context.exception))
+
+ def test_table_rollback_to_nonexistent_tag(self):
+ """Test that table-level rollback to non-existent tag raises
ValueError."""
+ table_name = "default.table_level_rollback_no_tag"
+ pa_schema = pa.schema([('col1', pa.int32())])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+
+ # Write 1 commit
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({'col1': [1]}, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ with self.assertRaises(Exception) as context:
+ table.rollback_to("no-such-tag")
+ self.assertIn("Rollback tag", str(context.exception))
+ self.assertIn("no-such-tag", str(context.exception))
+ self.assertIn("doesn't exist", str(context.exception))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py
b/paimon-python/pypaimon/tests/rest/rest_server.py
index fbfa5a487a..24929dce48 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -45,7 +45,7 @@ from pypaimon.catalog.catalog_exception import
(DatabaseNoPermissionException,
TableAlreadyExistException)
from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.common.identifier import Identifier
-from pypaimon.common.json_util import JSON
+from pypaimon.common.json_util import JSON, json_field
from pypaimon import Schema
from pypaimon.schema.schema_change import Actions, SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
@@ -66,10 +66,22 @@ class ErrorResponse(RESTResponse):
RESOURCE_TYPE_DEFINITION = "definition"
RESOURCE_TYPE_DIALECT = "dialect"
- resource_type: Optional[str]
- resource_name: Optional[str]
- message: str
- code: int
+ resource_type: Optional[str] = json_field("resourceType", default=None)
+ resource_name: Optional[str] = json_field("resourceName", default=None)
+ message: Optional[str] = json_field("message", default=None)
+ code: Optional[int] = json_field("code", default=None)
+
+ def __init__(
+ self,
+ resource_type: Optional[str] = None,
+ resource_name: Optional[str] = None,
+ message: Optional[str] = None,
+ code: Optional[int] = None,
+ ):
+ self.resource_type = resource_type
+ self.resource_name = resource_name
+ self.message = message
+ self.code = code
# Constants
@@ -480,6 +492,8 @@ class RESTCatalogServer:
return self._table_commit_handle(method, data,
lookup_identifier, branch_part)
elif operation == "token":
return self._table_token_handle(method, lookup_identifier)
+ elif operation == "rollback":
+ return self._table_rollback_handle(method, data,
lookup_identifier)
else:
return self._mock_response(ErrorResponse(None, None, "Not
Found", 404), 404)
return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
@@ -662,6 +676,113 @@ class RESTCatalogServer:
ErrorResponse(None, None, f"Commit failed: {str(e)}", 500), 500
)
+ def _table_rollback_handle(self, method: str, data: str,
+ identifier: Identifier) -> Tuple[str, int]:
+ """Handle table rollback operations"""
+ if method != "POST":
+ return self._mock_response(ErrorResponse(None, None, "Method Not
Allowed", 405), 405)
+
+ if identifier.get_full_name() not in self.table_metadata_store:
+ raise TableNotExistException(identifier)
+
+ try:
+ import json as json_module
+ from pypaimon.table.instant import Instant, SnapshotInstant,
TagInstant
+
+ request_dict = json_module.loads(data)
+ instant_dict = request_dict.get("instant")
+ from_snapshot = request_dict.get("fromSnapshot")
+
+ instant = Instant.from_dict(instant_dict)
+
+ if isinstance(instant, SnapshotInstant):
+ return self._rollback_table_by_snapshot(
+ identifier, instant.snapshot_id, from_snapshot)
+ elif isinstance(instant, TagInstant):
+ return self._rollback_table_by_tag(identifier,
instant.tag_name)
+ else:
+ return self._mock_response(
+ ErrorResponse(None, None, "Unknown instant type", 400),
400)
+
+ except Exception as e:
+ self.logger.error(f"Error in rollback operation: {e}")
+ import traceback
+ self.logger.error(f"Traceback: {traceback.format_exc()}")
+ return self._mock_response(
+ ErrorResponse(None, None, f"Rollback failed: {str(e)}", 500),
500)
+
+ def _rollback_table_by_snapshot(self, identifier: Identifier, snapshot_id:
int,
+ from_snapshot: Optional[int]) ->
Tuple[str, int]:
+ """Rollback table to a specific snapshot ID by delegating to
table.rollback_to()."""
+ table = self._get_file_table(identifier)
+
+ snapshot_mgr = table.snapshot_manager()
+ snapshot = snapshot_mgr.get_snapshot_by_id(snapshot_id)
+ if snapshot is None:
+ return self._mock_response(
+ ErrorResponse(ErrorResponse.RESOURCE_TYPE_SNAPSHOT,
+ str(snapshot_id), "", 404), 404)
+
+ latest = snapshot_mgr.get_latest_snapshot()
+ if latest is None:
+ return self._mock_response(
+ ErrorResponse(None, None, "No latest snapshot found", 500),
500)
+
+ if from_snapshot is not None and from_snapshot != latest.id:
+ return self._mock_response(
+ ErrorResponse(None, None,
+ f"Latest snapshot {latest.id} is not
{from_snapshot}",
+ 500), 500)
+
+ table.rollback_to(snapshot_id)
+ return self._mock_response("", 200)
+
+ def _rollback_table_by_tag(self, identifier: Identifier,
+ tag_name: str) -> Tuple[str, int]:
+ """Rollback table to a specific tag by delegating to
table.rollback_to()."""
+ table = self._get_file_table(identifier)
+
+ tag_mgr = table.tag_manager()
+ if not tag_mgr.tag_exists(tag_name):
+ return self._mock_response(
+ ErrorResponse(ErrorResponse.RESOURCE_TYPE_TAG,
+ tag_name, "", 404), 404)
+
+ table.rollback_to(tag_name)
+ return self._mock_response("", 200)
+
+ def _get_file_table(self, identifier: Identifier):
+ """Construct a FileStoreTable from the metadata store.
+
+ Mirrors Java RESTCatalogServer.getFileTable(): loads the schema from
+ the metadata store, builds a CatalogEnvironment (without catalog
+ loader so rollback goes through local file cleanup), and returns a
+ FileStoreTable.
+ """
+ from pypaimon.catalog.catalog_environment import CatalogEnvironment
+ from pypaimon.common.file_io import FileIO
+ from pypaimon.common.options.options import Options
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ table_metadata =
self.table_metadata_store.get(identifier.get_full_name())
+ if table_metadata is None:
+ raise TableNotExistException(identifier)
+
+ table_schema = table_metadata.schema
+ table_path = (
+ f'file://{self.data_path}/{self.warehouse}/'
+ f'{identifier.get_database_name()}/{identifier.get_object_name()}')
+
+ catalog_env = CatalogEnvironment(
+ identifier=identifier,
+ uuid=table_metadata.uuid,
+ catalog_loader=None,
+ supports_version_management=False
+ )
+
+ file_io = FileIO.get(table_path, Options({}))
+ return FileStoreTable(file_io, identifier, table_path, table_schema,
catalog_env)
+
def _write_snapshot_files(self, identifier: Identifier, snapshot,
statistics):
"""Write snapshot and related files to the file system"""
import json
@@ -887,9 +1008,9 @@ class RESTCatalogServer:
or metadata_table_type == table_type
)
if (identifier.get_database_name() == database_name and
- table_type_matches and
- (not table_name_pattern or
self._match_name_pattern(identifier.get_table_name(),
-
table_name_pattern))):
+ table_type_matches and
+ (not table_name_pattern or
self._match_name_pattern(identifier.get_table_name(),
+
table_name_pattern))):
tables.append(identifier.get_table_name())
return tables
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py
b/paimon-python/pypaimon/tests/table/simple_table_test.py
index 683f72c811..01f48d0031 100644
--- a/paimon-python/pypaimon/tests/table/simple_table_test.py
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -537,3 +537,146 @@ class SimpleTableTest(unittest.TestCase):
}, schema=expected_schema)
self.assertEqual(expected, result.sort_by('user_id'))
+
+ def test_table_rollback_to_snapshot(self):
+ """Test table-level rollback to a specific snapshot."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_rollback_snapshot', schema,
False)
+ table = self.catalog.get_table('default.test_rollback_snapshot')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write 5 commits
+ for i in range(5):
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [i],
+ 'v': [i * 100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ snapshot_mgr = table.snapshot_manager()
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+ # Rollback to snapshot 3
+ table.rollback_to(3)
+
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+ self.assertIsNone(snapshot_mgr.get_snapshot_by_id(4))
+ self.assertIsNone(snapshot_mgr.get_snapshot_by_id(5))
+
+ def test_table_rollback_to_tag(self):
+ """Test table-level rollback to a specific tag."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_rollback_tag', schema, False)
+ table = self.catalog.get_table('default.test_rollback_tag')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write 5 commits and create tags
+ for i in range(5):
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [i],
+ 'v': [i * 100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ table.create_tag("v{}".format(i + 1))
+
+ snapshot_mgr = table.snapshot_manager()
+ tag_mgr = table.tag_manager()
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+ # Rollback to tag v3
+ table.rollback_to("v3")
+
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+ # Tags with snapshot > 3 should be cleaned
+ self.assertFalse(tag_mgr.tag_exists("v4"))
+ self.assertFalse(tag_mgr.tag_exists("v5"))
+ # Tag v3 should still exist
+ self.assertTrue(tag_mgr.tag_exists("v3"))
+
+ def test_table_rollback_to_nonexistent_snapshot(self):
+ """Test that rollback to a non-existent snapshot raises ValueError."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_rollback_no_snap', schema,
False)
+ table = self.catalog.get_table('default.test_rollback_no_snap')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write 2 commits
+ for i in range(2):
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [i],
+ 'v': [i * 100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ with self.assertRaises(ValueError) as context:
+ table.rollback_to(99)
+ self.assertIn("Rollback snapshot", str(context.exception))
+ self.assertIn("99", str(context.exception))
+ self.assertIn("doesn't exist", str(context.exception))
+
+ def test_table_rollback_to_nonexistent_tag(self):
+ """Test that rollback to a non-existent tag raises ValueError."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_rollback_no_tag', schema,
False)
+ table = self.catalog.get_table('default.test_rollback_no_tag')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write 1 commit
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [10],
+ 'v': [100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ with self.assertRaises(ValueError) as context:
+ table.rollback_to("no-such-tag")
+ self.assertIn("no-such-tag", str(context.exception))
+ self.assertIn("doesn't exist", str(context.exception))