This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f457c6e68c [python] Support rollback API for RESTCatalog and 
FileStoreTable (#7325)
f457c6e68c is described below

commit f457c6e68ca99d26490d5b37b79198f1f06a09cb
Author: umi <[email protected]>
AuthorDate: Mon Mar 2 17:51:42 2026 +0800

    [python] Support rollback API for RESTCatalog and FileStoreTable (#7325)
---
 docs/content/pypaimon/python-api.md                |  75 ++++--
 paimon-python/pypaimon/api/api_request.py          |  10 +
 paimon-python/pypaimon/api/resource_paths.py       |   4 +
 paimon-python/pypaimon/api/rest_api.py             |  24 +-
 paimon-python/pypaimon/catalog/catalog.py          |  17 ++
 .../pypaimon/catalog/catalog_environment.py        |  16 ++
 .../pypaimon/catalog/rest/rest_catalog.py          |  29 ++-
 paimon-python/pypaimon/catalog/table_rollback.py   |  43 ++++
 paimon-python/pypaimon/table/file_store_table.py   |  92 +++++++
 paimon-python/pypaimon/table/instant.py            | 135 ++++++++++
 paimon-python/pypaimon/table/rollback_helper.py    |  93 +++++++
 .../pypaimon/tests/rest/rest_catalog_test.py       | 277 +++++++++++++++++++++
 paimon-python/pypaimon/tests/rest/rest_server.py   | 137 +++++++++-
 .../pypaimon/tests/table/simple_table_test.py      | 143 +++++++++++
 14 files changed, 1064 insertions(+), 31 deletions(-)

diff --git a/docs/content/pypaimon/python-api.md 
b/docs/content/pypaimon/python-api.md
index 9da4a07b66..3e9041ed5a 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -521,6 +521,36 @@ Key points about shard read:
 - **Parallel Processing**: Each shard can be processed independently for 
better performance
 - **Consistency**: Combining all shards should produce the complete table data
 
+## Rollback
+
+Paimon supports rolling back a table to a previous snapshot or tag. This is 
useful for undoing unwanted changes or
+restoring the table to a known good state.
+
+### Rollback to Snapshot
+
+You can rollback a table to a specific snapshot by its ID:
+
+```python
+table = catalog.get_table('database_name.table_name')
+
+# Rollback to snapshot 3
+table.rollback_to(3)  # snapshot id
+```
+
+### Rollback to Tag
+
+You can also rollback a table to a previously created tag:
+
+```python
+table = catalog.get_table('database_name.table_name')
+
+# Rollback to tag 'v3'
+table.rollback_to('v3')  # tag name
+```
+
+The `rollback_to` method accepts either an `int` (snapshot ID) or a `str` (tag 
name) and automatically dispatches
+to the appropriate rollback logic.
+
 ## Data Types
 
 | Python Native Type  | PyArrow Type                                     | 
Paimon Type                       |
@@ -566,28 +596,31 @@ Key points about shard read:
 The following shows the supported features of Python Paimon compared to Java 
Paimon:
 
 **Catalog Level**
-  - FileSystemCatalog
-  - RestCatalog
+
+- FileSystemCatalog
+- RestCatalog
 
 **Table Level**
-  - Append Tables
+
+- Append Tables
     - `bucket = -1` (unaware)
     - `bucket > 0` (fixed)
-  - Primary Key Tables
-      - only support deduplicate
-      - `bucket = -2` (postpone)
-      - `bucket > 0` (fixed)
-      - read with deletion vectors enabled 
-  - Format Tables
-      - PARQUET
-      - CSV
-      - JSON
-      - ORC
-      - TEXT
-  - Read/Write Operations
-      - Batch read and write for append tables and primary key tables
-      - Predicate filtering
-      - Overwrite semantics
-      - Incremental reading of Delta data
-      - Reading and writing blob data
-      - `with_shard` feature
+- Primary Key Tables
+    - only support deduplicate
+    - `bucket = -2` (postpone)
+    - `bucket > 0` (fixed)
+    - read with deletion vectors enabled
+- Format Tables
+    - PARQUET
+    - CSV
+    - JSON
+    - ORC
+    - TEXT
+- Read/Write Operations
+    - Batch read and write for append tables and primary key tables
+    - Predicate filtering
+    - Overwrite semantics
+    - Incremental reading of Delta data
+    - Reading and writing blob data
+    - `with_shard` feature
+    - Rollback feature
diff --git a/paimon-python/pypaimon/api/api_request.py 
b/paimon-python/pypaimon/api/api_request.py
index f2d062f8d0..f191fb03d3 100644
--- a/paimon-python/pypaimon/api/api_request.py
+++ b/paimon-python/pypaimon/api/api_request.py
@@ -26,6 +26,7 @@ from pypaimon.schema.schema import Schema
 from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.table.instant import Instant
 
 
 class RESTRequest(ABC):
@@ -84,3 +85,12 @@ class AlterTableRequest(RESTRequest):
     FIELD_CHANGES = "changes"
 
     changes: List[SchemaChange] = json_field(FIELD_CHANGES)
+
+
+@dataclass
+class RollbackTableRequest(RESTRequest):
+    FIELD_INSTANT = "instant"
+    FIELD_FROM_SNAPSHOT = "fromSnapshot"
+
+    instant: Instant = json_field(FIELD_INSTANT)
+    from_snapshot: Optional[int] = json_field(FIELD_FROM_SNAPSHOT)
diff --git a/paimon-python/pypaimon/api/resource_paths.py 
b/paimon-python/pypaimon/api/resource_paths.py
index 79adc0ade3..16967d16e2 100644
--- a/paimon-python/pypaimon/api/resource_paths.py
+++ b/paimon-python/pypaimon/api/resource_paths.py
@@ -70,3 +70,7 @@ class ResourcePaths:
     def commit_table(self, database_name: str, table_name: str) -> str:
         return ("{}/{}/{}/{}/{}/commit".format(self.base_path, self.DATABASES, 
RESTUtil.encode_string(database_name),
                 self.TABLES, RESTUtil.encode_string(table_name)))
+
+    def rollback_table(self, database_name: str, table_name: str) -> str:
+        return ("{}/{}/{}/{}/{}/rollback".format(self.base_path, 
self.DATABASES, RESTUtil.encode_string(database_name),
+                                                 self.TABLES, 
RESTUtil.encode_string(table_name)))
diff --git a/paimon-python/pypaimon/api/rest_api.py 
b/paimon-python/pypaimon/api/rest_api.py
index dc2c02104b..069220d687 100755
--- a/paimon-python/pypaimon/api/rest_api.py
+++ b/paimon-python/pypaimon/api/rest_api.py
@@ -20,7 +20,8 @@ from typing import Callable, Dict, List, Optional, Union
 
 from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest, 
CommitTableRequest,
                                       CreateDatabaseRequest,
-                                      CreateTableRequest, RenameTableRequest)
+                                      CreateTableRequest, RenameTableRequest,
+                                      RollbackTableRequest)
 from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
                                        GetDatabaseResponse, GetTableResponse,
                                        GetTableTokenResponse,
@@ -358,6 +359,27 @@ class RESTApi:
         )
         return response.is_success()
 
+    def rollback_to(self, identifier, instant, from_snapshot=None):
+        """Rollback table to the given instant.
+
+        Args:
+            identifier: The table identifier.
+            instant: The Instant (SnapshotInstant or TagInstant) to rollback 
to.
+            from_snapshot: Optional snapshot ID. Success only occurs when the
+                latest snapshot is this snapshot.
+
+        Raises:
+            NoSuchResourceException: If the table, snapshot or tag does not 
exist.
+            ForbiddenException: If no permission to access this table.
+        """
+        database_name, table_name = self.__validate_identifier(identifier)
+        request = RollbackTableRequest(instant=instant, 
from_snapshot=from_snapshot)
+        self.client.post(
+            self.resource_paths.rollback_table(database_name, table_name),
+            request,
+            self.rest_auth_function
+        )
+
     @staticmethod
     def __validate_identifier(identifier: Identifier):
         if not identifier:
diff --git a/paimon-python/pypaimon/catalog/catalog.py 
b/paimon-python/pypaimon/catalog/catalog.py
index 729522450b..aa914d2978 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -95,6 +95,23 @@ class Catalog(ABC):
 
         """
 
+    def rollback_to(self, identifier, instant, from_snapshot=None):
+        """Rollback table by the given identifier and instant.
+
+        Args:
+            identifier: Path of the table (Identifier instance).
+            instant: The Instant (SnapshotInstant or TagInstant) to rollback 
to.
+            from_snapshot: Optional snapshot ID. Success only occurs when the
+                latest snapshot is this snapshot.
+
+        Raises:
+            TableNotExistException: If the table does not exist.
+            UnsupportedOperationError: If the catalog does not support version 
management.
+        """
+        raise NotImplementedError(
+            "rollback_to is not supported by this catalog."
+        )
+
     def drop_partitions(
         self,
         identifier: Union[str, Identifier],
diff --git a/paimon-python/pypaimon/catalog/catalog_environment.py 
b/paimon-python/pypaimon/catalog/catalog_environment.py
index 762a42dd6a..2d95365508 100644
--- a/paimon-python/pypaimon/catalog/catalog_environment.py
+++ b/paimon-python/pypaimon/catalog/catalog_environment.py
@@ -60,6 +60,22 @@ class CatalogEnvironment:
             # to create locks based on the catalog lock context
             return RenamingSnapshotCommit(snapshot_manager)
 
+    def catalog_table_rollback(self):
+        """Create a TableRollback instance based on the catalog environment.
+
+        Returns a TableRollback that delegates to catalog.rollback_to
+        when catalog_loader is available and version management is supported.
+        Returns None otherwise (fallback to local file cleanup).
+
+        Returns:
+            A TableRollback instance, or None.
+        """
+        if self.catalog_loader is not None and 
self.supports_version_management:
+            from pypaimon.catalog.table_rollback import TableRollback
+            catalog = self.catalog_loader.load()
+            return TableRollback(catalog, self.identifier)
+        return None
+
     def copy(self, identifier: Identifier) -> 'CatalogEnvironment':
         """
         Create a copy of this CatalogEnvironment with a different identifier.
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 9cf138c254..3e3a4916d4 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -45,7 +45,6 @@ from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.table.format.format_table import FormatTable, Format
 from pypaimon.table.iceberg.iceberg_table import IcebergTable
 
-
 FORMAT_TABLE_TYPE = "format-table"
 ICEBERG_TABLE_TYPE = "iceberg-table"
 
@@ -256,6 +255,34 @@ class RESTCatalog(Catalog):
         except ForbiddenException as e:
             raise TableNoPermissionException(identifier) from e
 
+    def rollback_to(self, identifier, instant, from_snapshot=None):
+        """Rollback table by the given identifier and instant.
+
+        Args:
+            identifier: Path of the table (Identifier or string).
+            instant: The Instant (SnapshotInstant or TagInstant) to rollback 
to.
+            from_snapshot: Optional snapshot ID. Success only occurs when the
+                latest snapshot is this snapshot.
+
+        Raises:
+            TableNotExistException: If the table does not exist.
+            TableNoPermissionException: If no permission to access this table.
+        """
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        try:
+            self.rest_api.rollback_to(identifier, instant, from_snapshot)
+        except NoSuchResourceException as e:
+            if e.resource_type == "snapshot":
+                raise ValueError(
+                    "Rollback snapshot '{}' doesn't 
exist.".format(e.resource_name)) from e
+            elif e.resource_type == "tag":
+                raise ValueError(
+                    "Rollback tag '{}' doesn't 
exist.".format(e.resource_name)) from e
+            raise TableNotExistException(identifier) from e
+        except ForbiddenException as e:
+            raise TableNoPermissionException(identifier) from e
+
     def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
         try:
             response = self.rest_api.get_table(identifier)
diff --git a/paimon-python/pypaimon/catalog/table_rollback.py 
b/paimon-python/pypaimon/catalog/table_rollback.py
new file mode 100644
index 0000000000..dbff309581
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/table_rollback.py
@@ -0,0 +1,43 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+
+class TableRollback:
+    """Rollback table to instant by delegating to catalog.rollback_to."""
+
+    def __init__(self, catalog, identifier):
+        self._catalog = catalog
+        self._identifier = identifier
+
+    def rollback_to(self, instant, from_snapshot=None):
+        """Rollback table to the given instant via catalog.
+
+        Args:
+            instant: The Instant (SnapshotInstant or TagInstant) to rollback 
to.
+            from_snapshot: Optional snapshot ID. Success only occurs when the
+                latest snapshot is this snapshot.
+
+        Raises:
+            RuntimeError: If the rollback fails.
+        """
+        try:
+            self._catalog.rollback_to(self._identifier, instant, from_snapshot)
+        except Exception as e:
+            raise RuntimeError(
+                "Failed to rollback table {}: {}".format(
+                    self._identifier, e)) from e
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 4f7e3c81dd..5fd46248d0 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -149,6 +149,98 @@ class FileStoreTable(Table):
         tag_mgr = self.tag_manager()
         return tag_mgr.list_tags()
 
+    def rollback_to(self, target):
+        """Rollback table to the given snapshot ID (int) or tag name (str).
+
+        Dispatches based on argument type:
+        - int: rollback to the snapshot with that ID
+        - str: rollback to the tag with that name
+
+        First tries catalog-based rollback (e.g., REST). If catalog rollback
+        is not available, falls back to local file cleanup.
+
+        Args:
+            target: Snapshot ID (int) or tag name (str).
+
+        Raises:
+            ValueError: If the target doesn't exist or type is unsupported.
+        """
+        if isinstance(target, int):
+            self._rollback_to_snapshot(target)
+        elif isinstance(target, str):
+            self._rollback_to_tag(target)
+        else:
+            raise ValueError("Unsupported rollback target type: 
{}".format(type(target)))
+
+    def _rollback_to_snapshot(self, snapshot_id):
+        """Rollback to a snapshot by ID.
+
+        First tries catalog-based rollback. If not available, falls back to
+        local file cleanup. If the snapshot file has been expired, tries to
+        recover from a tag with the same snapshot ID.
+
+        Args:
+            snapshot_id: The snapshot ID to rollback to.
+
+        Raises:
+            ValueError: If the snapshot doesn't exist.
+        """
+        from pypaimon.table.instant import Instant
+
+        table_rollback = self.catalog_environment.catalog_table_rollback()
+        if table_rollback is not None:
+            table_rollback.rollback_to(Instant.snapshot(snapshot_id))
+            return
+
+        snapshot_mgr = self.snapshot_manager()
+        snapshot = snapshot_mgr.get_snapshot_by_id(snapshot_id)
+        if snapshot is not None:
+            self.rollback_helper().clean_larger_than(snapshot)
+            return
+
+        tag_mgr = self.tag_manager()
+        for tag_name in tag_mgr.list_tags():
+            tag = tag_mgr.get(tag_name)
+            if tag is not None:
+                tag_snapshot = tag.trim_to_snapshot()
+                if tag_snapshot.id == snapshot_id:
+                    self.rollback_to(tag_name)
+                    return
+
+        raise ValueError(
+            "Rollback snapshot '{}' doesn't exist.".format(snapshot_id))
+
+    def _rollback_to_tag(self, tag_name):
+        """Rollback to a tag by name.
+
+        First tries catalog-based rollback. If not available, falls back to
+        local file cleanup and recreates the snapshot file if needed.
+
+        Args:
+            tag_name: The tag name to rollback to.
+
+        Raises:
+            ValueError: If the tag doesn't exist.
+        """
+        from pypaimon.table.instant import Instant
+
+        table_rollback = self.catalog_environment.catalog_table_rollback()
+        if table_rollback is not None:
+            table_rollback.rollback_to(Instant.tag(tag_name))
+            return
+
+        tag_mgr = self.tag_manager()
+        tagged_snapshot = tag_mgr.get_or_throw(tag_name).trim_to_snapshot()
+        helper = self.rollback_helper()
+        helper.clean_larger_than(tagged_snapshot)
+        helper.create_snapshot_file_if_needed(tagged_snapshot)
+
+    def rollback_helper(self):
+        """Create a RollbackHelper instance for this table."""
+        from pypaimon.table.rollback_helper import RollbackHelper
+        return RollbackHelper(
+            self.snapshot_manager(), self.tag_manager(), self.file_io)
+
     def rename_tag(self, old_name: str, new_name: str) -> None:
         """
         Rename a tag.
diff --git a/paimon-python/pypaimon/table/instant.py 
b/paimon-python/pypaimon/table/instant.py
new file mode 100644
index 0000000000..3a5fef72e9
--- /dev/null
+++ b/paimon-python/pypaimon/table/instant.py
@@ -0,0 +1,135 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC, abstractmethod
+
+
+class Instant(ABC):
+    """Table rollback instant.
+
+    Supports polymorphic JSON serialization via to_dict/from_dict
+
+    Serialization format:
+        SnapshotInstant: {"type": "snapshot", "snapshotId": 123}
+        TagInstant:      {"type": "tag", "tagName": "test_tag"}
+    """
+
+    FIELD_TYPE = "type"
+    TYPE_SNAPSHOT = "snapshot"
+    TYPE_TAG = "tag"
+
+    @staticmethod
+    def snapshot(snapshot_id):
+        """Create a SnapshotInstant.
+
+        Args:
+            snapshot_id: The snapshot ID to rollback to.
+
+        Returns:
+            A SnapshotInstant instance.
+        """
+        return SnapshotInstant(snapshot_id)
+
+    @staticmethod
+    def tag(tag_name):
+        """Create a TagInstant.
+
+        Args:
+            tag_name: The tag name to rollback to.
+
+        Returns:
+            A TagInstant instance.
+        """
+        return TagInstant(tag_name)
+
+    @abstractmethod
+    def to_dict(self):
+        """Serialize this Instant to a dictionary for JSON output."""
+
+    @staticmethod
+    def from_dict(data):
+        """Deserialize an Instant from a dictionary.
+
+        Args:
+            data: A dictionary with a 'type' field indicating the instant type.
+
+        Returns:
+            A SnapshotInstant or TagInstant instance.
+
+        Raises:
+            ValueError: If the type field is missing or unknown.
+        """
+        instant_type = data.get(Instant.FIELD_TYPE)
+        if instant_type == Instant.TYPE_SNAPSHOT:
+            return SnapshotInstant(data[SnapshotInstant.FIELD_SNAPSHOT_ID])
+        elif instant_type == Instant.TYPE_TAG:
+            return TagInstant(data[TagInstant.FIELD_TAG_NAME])
+        else:
+            raise ValueError("Unknown instant type: {}".format(instant_type))
+
+
+class SnapshotInstant(Instant):
+    """Snapshot instant for table rollback."""
+
+    FIELD_SNAPSHOT_ID = "snapshotId"
+
+    def __init__(self, snapshot_id):
+        self.snapshot_id = snapshot_id
+
+    def to_dict(self):
+        return {
+            Instant.FIELD_TYPE: Instant.TYPE_SNAPSHOT,
+            self.FIELD_SNAPSHOT_ID: self.snapshot_id,
+        }
+
+    def __eq__(self, other):
+        if not isinstance(other, SnapshotInstant):
+            return False
+        return self.snapshot_id == other.snapshot_id
+
+    def __hash__(self):
+        return hash(self.snapshot_id)
+
+    def __repr__(self):
+        return "SnapshotInstant(snapshot_id={})".format(self.snapshot_id)
+
+
+class TagInstant(Instant):
+    """Tag instant for table rollback."""
+
+    FIELD_TAG_NAME = "tagName"
+
+    def __init__(self, tag_name):
+        self.tag_name = tag_name
+
+    def to_dict(self):
+        return {
+            Instant.FIELD_TYPE: Instant.TYPE_TAG,
+            self.FIELD_TAG_NAME: self.tag_name,
+        }
+
+    def __eq__(self, other):
+        if not isinstance(other, TagInstant):
+            return False
+        return self.tag_name == other.tag_name
+
+    def __hash__(self):
+        return hash(self.tag_name)
+
+    def __repr__(self):
+        return "TagInstant(tag_name={})".format(self.tag_name)
diff --git a/paimon-python/pypaimon/table/rollback_helper.py 
b/paimon-python/pypaimon/table/rollback_helper.py
new file mode 100644
index 0000000000..f7fe2c45bc
--- /dev/null
+++ b/paimon-python/pypaimon/table/rollback_helper.py
@@ -0,0 +1,93 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import logging
+
+from pypaimon.common.json_util import JSON
+
+logger = logging.getLogger(__name__)
+
+
+class RollbackHelper:
+    """Helper class for table rollback including utils to clean snapshots.
+    """
+
+    def __init__(self, snapshot_manager, tag_manager, file_io):
+        """Initialize RollbackHelper.
+
+        Args:
+            snapshot_manager: The SnapshotManager instance.
+            tag_manager: The TagManager instance.
+            file_io: The FileIO instance for file operations.
+        """
+        self._snapshot_manager = snapshot_manager
+        self._tag_manager = tag_manager
+        self._file_io = file_io
+
+    def clean_larger_than(self, retained_snapshot):
+        """Clean snapshots and tags whose id is larger than the retained 
snapshot.
+
+        Updates the LATEST hint and removes snapshot files and tag files
+        for snapshots newer than the retained one.
+
+        Args:
+            retained_snapshot: The snapshot to retain; everything newer is 
removed.
+        """
+        latest = self._snapshot_manager.get_latest_snapshot()
+        if latest is None:
+            return
+
+        latest_id = latest.id
+        retained_id = retained_snapshot.id
+
+        # Update LATEST hint
+        try:
+            # Try atomic write first
+            success = 
self._file_io.try_to_write_atomic(self._snapshot_manager.latest_file, 
str(retained_id))
+            if not success:
+                # Fallback to regular write
+                self._file_io.write_file(self._snapshot_manager.latest_file, 
str(retained_id), overwrite=True)
+        except Exception as e:
+            logger.warning("Failed to update LATEST hint: %s", e)
+
+        # Delete snapshot files larger than retained
+        for snapshot_id in range(retained_id + 1, latest_id + 1):
+            snapshot_path = 
self._snapshot_manager.get_snapshot_path(snapshot_id)
+            if self._file_io.exists(snapshot_path):
+                self._file_io.delete(snapshot_path)
+
+        # Clean tags whose snapshot id is larger than retained
+        for tag_name in self._tag_manager.list_tags():
+            tag = self._tag_manager.get(tag_name)
+            if tag is not None:
+                tag_snapshot = tag.trim_to_snapshot()
+                if tag_snapshot.id > retained_id:
+                    self._tag_manager.delete_tag(tag_name)
+
+    def create_snapshot_file_if_needed(self, tagged_snapshot):
+        """Create a snapshot file from a tag if the snapshot file doesn't 
exist.
+
+        When rolling back to a tag, the snapshot file may have been expired.
+        This method recreates it from the tag data and updates the earliest 
hint.
+
+        Args:
+            tagged_snapshot: The snapshot from the tag to potentially write.
+        """
+        snapshot_path = 
self._snapshot_manager.get_snapshot_path(tagged_snapshot.id)
+        if not self._file_io.exists(snapshot_path):
+            self._file_io.write_file(
+                snapshot_path, JSON.to_json(tagged_snapshot), overwrite=False)
diff --git a/paimon-python/pypaimon/tests/rest/rest_catalog_test.py 
b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
new file mode 100644
index 0000000000..153c22ba8c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_test.py
@@ -0,0 +1,277 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import Schema
+from pypaimon.common.identifier import Identifier
+from pypaimon.table.instant import Instant
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
+
+
+class RESTCatalogTest(RESTBaseTest):
+
+    def test_catalog_rollback(self):
+        """Test table rollback to snapshot and tag."""
+        table_name = "default.table_for_rollback"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        identifier = Identifier.from_string(table_name)
+
+        # Write 10 commits and create a tag for each
+        for i in range(10):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+            table.create_tag("tag-{}".format(i + 1))
+
+        snapshot_mgr = table.snapshot_manager()
+        tag_mgr = table.tag_manager()
+
+        # Verify we have 10 snapshots and 10 tags
+        latest = snapshot_mgr.get_latest_snapshot()
+        self.assertEqual(latest.id, 10)
+
+        # --- Rollback to snapshot 4 ---
+        rollback_to_snapshot_id = 4
+        self.rest_catalog.rollback_to(
+            identifier, Instant.snapshot(rollback_to_snapshot_id))
+
+        # After rollback, latest snapshot should be 4
+        latest_after = snapshot_mgr.get_latest_snapshot()
+        self.assertEqual(latest_after.id, rollback_to_snapshot_id)
+
+        # Tags with snapshot > 4 should be cleaned (tag-6 and above)
+        
self.assertFalse(tag_mgr.tag_exists("tag-{}".format(rollback_to_snapshot_id + 
2)))
+
+        # Snapshots > 4 should not exist
+        snapshot_5 = snapshot_mgr.get_snapshot_by_id(rollback_to_snapshot_id + 
1)
+        self.assertIsNone(snapshot_5)
+
+        # Rollback to a non-existent snapshot (5) should fail
+        with self.assertRaises(ValueError) as context:
+            self.rest_catalog.rollback_to(
+                identifier, Instant.snapshot(rollback_to_snapshot_id + 1))
+        self.assertIn("Rollback snapshot", str(context.exception))
+        self.assertIn("doesn't exist", str(context.exception))
+
+        # --- Rollback to tag-3 (snapshot 3) ---
+        rollback_to_tag_name = "tag-{}".format(rollback_to_snapshot_id - 1)
+        self.rest_catalog.rollback_to(identifier, 
Instant.tag(rollback_to_tag_name))
+
+        tag_snapshot = 
tag_mgr.get_or_throw(rollback_to_tag_name).trim_to_snapshot()
+        latest_after_tag = snapshot_mgr.get_latest_snapshot()
+        self.assertEqual(latest_after_tag.id, tag_snapshot.id)
+
+        # --- Rollback to snapshot 2 with from_snapshot check ---
+        # from_snapshot=4 should fail because latest is 3
+        with self.assertRaises(Exception) as context:
+            self.rest_catalog.rollback_to(
+                identifier, Instant.snapshot(2), from_snapshot=4)
+        self.assertIn("Latest snapshot 3 is not 4", str(context.exception))
+
+        # from_snapshot=3 should succeed
+        self.rest_catalog.rollback_to(
+            identifier, Instant.snapshot(2), from_snapshot=3)
+        latest_final = snapshot_mgr.get_latest_snapshot()
+        self.assertEqual(latest_final.id, 2)
+
+    def test_catalog_rollback_to_nonexistent_tag(self):
+        """Test that rollback to a non-existent tag raises an error."""
+        table_name = "default.table_rollback_no_tag"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+        identifier = Identifier.from_string(table_name)
+
+        # Write one commit so the table has a snapshot
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({'col1': [1]}, schema=pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        with self.assertRaises(ValueError) as context:
+            self.rest_catalog.rollback_to(
+                identifier, Instant.tag("nonexistent-tag"))
+        self.assertIn("Rollback tag", str(context.exception))
+        self.assertIn("nonexistent-tag", str(context.exception))
+        self.assertIn("doesn't exist", str(context.exception))
+
+    def test_catalog_rollback_with_string_identifier(self):
+        """Test rollback using a string identifier instead of Identifier 
object."""
+        table_name = "default.table_rollback_str_id"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        for i in range(3):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        snapshot_mgr = table.snapshot_manager()
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+
+        # Use string identifier directly (not Identifier object)
+        self.rest_catalog.rollback_to(table_name, Instant.snapshot(2))
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 2)
+
+    def test_catalog_rollback_on_nonexistent_table(self):
+        """Test that rollback on a non-existent table raises an error."""
+        from pypaimon.catalog.catalog_exception import TableNotExistException
+        identifier = Identifier.from_string("default.no_such_table")
+        with self.assertRaises(TableNotExistException) as context:
+            self.rest_catalog.rollback_to(identifier, Instant.snapshot(1))
+        self.assertIn("default.no_such_table", str(context.exception))
+        self.assertIn("does not exist", str(context.exception))
+
+    def test_table_rollback_to_snapshot(self):
+        """Test table-level rollback_to_snapshot via FileStoreTable."""
+        table_name = "default.table_level_rollback_snapshot"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        # Write 5 commits
+        for i in range(5):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        snapshot_mgr = table.snapshot_manager()
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+        # Rollback to snapshot 3 via table method (singledispatch on int)
+        table.rollback_to(3)
+
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+        self.assertIsNone(snapshot_mgr.get_snapshot_by_id(4))
+        self.assertIsNone(snapshot_mgr.get_snapshot_by_id(5))
+
+    def test_table_rollback_to_tag(self):
+        """Test table-level rollback_to_tag via FileStoreTable."""
+        table_name = "default.table_level_rollback_tag"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        # Write 5 commits and create tags
+        for i in range(5):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+            table.create_tag("v{}".format(i + 1))
+
+        snapshot_mgr = table.snapshot_manager()
+        tag_mgr = table.tag_manager()
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+        # Rollback to tag v3 (singledispatch on str)
+        table.rollback_to("v3")
+
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+        # Tags with snapshot > 3 should be cleaned
+        self.assertFalse(tag_mgr.tag_exists("v4"))
+        self.assertFalse(tag_mgr.tag_exists("v5"))
+        # Tag v3 should still exist
+        self.assertTrue(tag_mgr.tag_exists("v3"))
+
+    def test_table_rollback_to_nonexistent_snapshot(self):
+        """Test that table-level rollback to non-existent snapshot raises 
ValueError."""
+        table_name = "default.table_level_rollback_no_snap"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        # Write 2 commits
+        for i in range(2):
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({'col1': [i]}, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Rollback to snapshot 99 should fail
+        with self.assertRaises(Exception) as context:
+            table.rollback_to(99)
+        self.assertIn("Rollback snapshot", str(context.exception))
+        self.assertIn("99", str(context.exception))
+        self.assertIn("doesn't exist", str(context.exception))
+
+    def test_table_rollback_to_nonexistent_tag(self):
+        """Test that table-level rollback to non-existent tag raises 
ValueError."""
+        table_name = "default.table_level_rollback_no_tag"
+        pa_schema = pa.schema([('col1', pa.int32())])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.rest_catalog.create_table(table_name, schema, False)
+        table = self.rest_catalog.get_table(table_name)
+
+        # Write 1 commit
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({'col1': [1]}, schema=pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        with self.assertRaises(Exception) as context:
+            table.rollback_to("no-such-tag")
+        self.assertIn("Rollback tag", str(context.exception))
+        self.assertIn("no-such-tag", str(context.exception))
+        self.assertIn("doesn't exist", str(context.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py 
b/paimon-python/pypaimon/tests/rest/rest_server.py
index fbfa5a487a..24929dce48 100755
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -45,7 +45,7 @@ from pypaimon.catalog.catalog_exception import 
(DatabaseNoPermissionException,
                                                 TableAlreadyExistException)
 from pypaimon.catalog.rest.table_metadata import TableMetadata
 from pypaimon.common.identifier import Identifier
-from pypaimon.common.json_util import JSON
+from pypaimon.common.json_util import JSON, json_field
 from pypaimon import Schema
 from pypaimon.schema.schema_change import Actions, SchemaChange
 from pypaimon.schema.schema_manager import SchemaManager
@@ -66,10 +66,22 @@ class ErrorResponse(RESTResponse):
     RESOURCE_TYPE_DEFINITION = "definition"
     RESOURCE_TYPE_DIALECT = "dialect"
 
-    resource_type: Optional[str]
-    resource_name: Optional[str]
-    message: str
-    code: int
+    resource_type: Optional[str] = json_field("resourceType", default=None)
+    resource_name: Optional[str] = json_field("resourceName", default=None)
+    message: Optional[str] = json_field("message", default=None)
+    code: Optional[int] = json_field("code", default=None)
+
+    def __init__(
+        self,
+        resource_type: Optional[str] = None,
+        resource_name: Optional[str] = None,
+        message: Optional[str] = None,
+        code: Optional[int] = None,
+    ):
+        self.resource_type = resource_type
+        self.resource_name = resource_name
+        self.message = message
+        self.code = code
 
 
 # Constants
@@ -480,6 +492,8 @@ class RESTCatalogServer:
                 return self._table_commit_handle(method, data, 
lookup_identifier, branch_part)
             elif operation == "token":
                 return self._table_token_handle(method, lookup_identifier)
+            elif operation == "rollback":
+                return self._table_rollback_handle(method, data, 
lookup_identifier)
             else:
                 return self._mock_response(ErrorResponse(None, None, "Not 
Found", 404), 404)
         return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
@@ -662,6 +676,113 @@ class RESTCatalogServer:
                 ErrorResponse(None, None, f"Commit failed: {str(e)}", 500), 500
             )
 
+    def _table_rollback_handle(self, method: str, data: str,
+                               identifier: Identifier) -> Tuple[str, int]:
+        """Handle table rollback operations"""
+        if method != "POST":
+            return self._mock_response(ErrorResponse(None, None, "Method Not 
Allowed", 405), 405)
+
+        if identifier.get_full_name() not in self.table_metadata_store:
+            raise TableNotExistException(identifier)
+
+        try:
+            import json as json_module
+            from pypaimon.table.instant import Instant, SnapshotInstant, 
TagInstant
+
+            request_dict = json_module.loads(data)
+            instant_dict = request_dict.get("instant")
+            from_snapshot = request_dict.get("fromSnapshot")
+
+            instant = Instant.from_dict(instant_dict)
+
+            if isinstance(instant, SnapshotInstant):
+                return self._rollback_table_by_snapshot(
+                    identifier, instant.snapshot_id, from_snapshot)
+            elif isinstance(instant, TagInstant):
+                return self._rollback_table_by_tag(identifier, 
instant.tag_name)
+            else:
+                return self._mock_response(
+                    ErrorResponse(None, None, "Unknown instant type", 400), 
400)
+
+        except Exception as e:
+            self.logger.error(f"Error in rollback operation: {e}")
+            import traceback
+            self.logger.error(f"Traceback: {traceback.format_exc()}")
+            return self._mock_response(
+                ErrorResponse(None, None, f"Rollback failed: {str(e)}", 500), 
500)
+
+    def _rollback_table_by_snapshot(self, identifier: Identifier, snapshot_id: 
int,
+                                    from_snapshot: Optional[int]) -> 
Tuple[str, int]:
+        """Rollback table to a specific snapshot ID by delegating to 
table.rollback_to()."""
+        table = self._get_file_table(identifier)
+
+        snapshot_mgr = table.snapshot_manager()
+        snapshot = snapshot_mgr.get_snapshot_by_id(snapshot_id)
+        if snapshot is None:
+            return self._mock_response(
+                ErrorResponse(ErrorResponse.RESOURCE_TYPE_SNAPSHOT,
+                              str(snapshot_id), "", 404), 404)
+
+        latest = snapshot_mgr.get_latest_snapshot()
+        if latest is None:
+            return self._mock_response(
+                ErrorResponse(None, None, "No latest snapshot found", 500), 
500)
+
+        if from_snapshot is not None and from_snapshot != latest.id:
+            return self._mock_response(
+                ErrorResponse(None, None,
+                              f"Latest snapshot {latest.id} is not 
{from_snapshot}",
+                              500), 500)
+
+        table.rollback_to(snapshot_id)
+        return self._mock_response("", 200)
+
+    def _rollback_table_by_tag(self, identifier: Identifier,
+                               tag_name: str) -> Tuple[str, int]:
+        """Rollback table to a specific tag by delegating to 
table.rollback_to()."""
+        table = self._get_file_table(identifier)
+
+        tag_mgr = table.tag_manager()
+        if not tag_mgr.tag_exists(tag_name):
+            return self._mock_response(
+                ErrorResponse(ErrorResponse.RESOURCE_TYPE_TAG,
+                              tag_name, "", 404), 404)
+
+        table.rollback_to(tag_name)
+        return self._mock_response("", 200)
+
+    def _get_file_table(self, identifier: Identifier):
+        """Construct a FileStoreTable from the metadata store.
+
+        Mirrors Java RESTCatalogServer.getFileTable(): loads the schema from
+        the metadata store, builds a CatalogEnvironment (without catalog
+        loader so rollback goes through local file cleanup), and returns a
+        FileStoreTable.
+        """
+        from pypaimon.catalog.catalog_environment import CatalogEnvironment
+        from pypaimon.common.file_io import FileIO
+        from pypaimon.common.options.options import Options
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        table_metadata = 
self.table_metadata_store.get(identifier.get_full_name())
+        if table_metadata is None:
+            raise TableNotExistException(identifier)
+
+        table_schema = table_metadata.schema
+        table_path = (
+            f'file://{self.data_path}/{self.warehouse}/'
+            f'{identifier.get_database_name()}/{identifier.get_object_name()}')
+
+        catalog_env = CatalogEnvironment(
+            identifier=identifier,
+            uuid=table_metadata.uuid,
+            catalog_loader=None,
+            supports_version_management=False
+        )
+
+        file_io = FileIO.get(table_path, Options({}))
+        return FileStoreTable(file_io, identifier, table_path, table_schema, 
catalog_env)
+
     def _write_snapshot_files(self, identifier: Identifier, snapshot, 
statistics):
         """Write snapshot and related files to the file system"""
         import json
@@ -887,9 +1008,9 @@ class RESTCatalogServer:
                 or metadata_table_type == table_type
             )
             if (identifier.get_database_name() == database_name and
-                    table_type_matches and
-                    (not table_name_pattern or 
self._match_name_pattern(identifier.get_table_name(),
-                                                                        
table_name_pattern))):
+                table_type_matches and
+                (not table_name_pattern or 
self._match_name_pattern(identifier.get_table_name(),
+                                                                    
table_name_pattern))):
                 tables.append(identifier.get_table_name())
 
         return tables
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py 
b/paimon-python/pypaimon/tests/table/simple_table_test.py
index 683f72c811..01f48d0031 100644
--- a/paimon-python/pypaimon/tests/table/simple_table_test.py
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -537,3 +537,146 @@ class SimpleTableTest(unittest.TestCase):
         }, schema=expected_schema)
 
         self.assertEqual(expected, result.sort_by('user_id'))
+
+    def test_table_rollback_to_snapshot(self):
+        """Test table-level rollback to a specific snapshot."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_rollback_snapshot', schema, 
False)
+        table = self.catalog.get_table('default.test_rollback_snapshot')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write 5 commits
+        for i in range(5):
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'pt': [1],
+                'k': [i],
+                'v': [i * 100]
+            }, schema=self.pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        snapshot_mgr = table.snapshot_manager()
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+        # Rollback to snapshot 3
+        table.rollback_to(3)
+
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+        self.assertIsNone(snapshot_mgr.get_snapshot_by_id(4))
+        self.assertIsNone(snapshot_mgr.get_snapshot_by_id(5))
+
+    def test_table_rollback_to_tag(self):
+        """Test table-level rollback to a specific tag."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_rollback_tag', schema, False)
+        table = self.catalog.get_table('default.test_rollback_tag')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write 5 commits and create tags
+        for i in range(5):
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'pt': [1],
+                'k': [i],
+                'v': [i * 100]
+            }, schema=self.pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+            table.create_tag("v{}".format(i + 1))
+
+        snapshot_mgr = table.snapshot_manager()
+        tag_mgr = table.tag_manager()
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+        # Rollback to tag v3
+        table.rollback_to("v3")
+
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+        # Tags with snapshot > 3 should be cleaned
+        self.assertFalse(tag_mgr.tag_exists("v4"))
+        self.assertFalse(tag_mgr.tag_exists("v5"))
+        # Tag v3 should still exist
+        self.assertTrue(tag_mgr.tag_exists("v3"))
+
+    def test_table_rollback_to_nonexistent_snapshot(self):
+        """Test that rollback to a non-existent snapshot raises ValueError."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_rollback_no_snap', schema, 
False)
+        table = self.catalog.get_table('default.test_rollback_no_snap')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write 2 commits
+        for i in range(2):
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'pt': [1],
+                'k': [i],
+                'v': [i * 100]
+            }, schema=self.pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        with self.assertRaises(ValueError) as context:
+            table.rollback_to(99)
+        self.assertIn("Rollback snapshot", str(context.exception))
+        self.assertIn("99", str(context.exception))
+        self.assertIn("doesn't exist", str(context.exception))
+
+    def test_table_rollback_to_nonexistent_tag(self):
+        """Test that rollback to a non-existent tag raises ValueError."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_rollback_no_tag', schema, 
False)
+        table = self.catalog.get_table('default.test_rollback_no_tag')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write 1 commit
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'pt': [1],
+            'k': [10],
+            'v': [100]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        with self.assertRaises(ValueError) as context:
+            table.rollback_to("no-such-tag")
+        self.assertIn("no-such-tag", str(context.exception))
+        self.assertIn("doesn't exist", str(context.exception))

Reply via email to