This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new db117e108c [python] Add rollback_to_timestamp to FileStoreTable (#7941)
db117e108c is described below

commit db117e108c6a3bdf0af4f7672b4db311c6b94da2
Author: Junrui Lee <[email protected]>
AuthorDate: Sun May 24 09:50:47 2026 +0800

    [python] Add rollback_to_timestamp to FileStoreTable (#7941)
---
 paimon-python/pypaimon/table/file_store_table.py   | 17 +++++
 .../pypaimon/tests/table/simple_table_test.py      | 73 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index af52bbe843..67be2587b7 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -298,6 +298,23 @@ class FileStoreTable(Table):
         return RollbackHelper(
             self.snapshot_manager(), self.tag_manager(), self.file_io)
 
+    def rollback_to_timestamp(self, timestamp_millis: int) -> None:
+        """Rollback table to the latest snapshot with commit time <= the given 
timestamp.
+
+        Args:
+            timestamp_millis: The timestamp in milliseconds to rollback to.
+
+        Raises:
+            ValueError: If no snapshot exists at or before the given timestamp.
+        """
+        snapshot_mgr = self.snapshot_manager()
+        snapshot = snapshot_mgr.earlier_or_equal_time_mills(timestamp_millis)
+        if snapshot is None:
+            raise ValueError(
+                f"No snapshot found with timestamp earlier than or equal to 
{timestamp_millis}ms."
+            )
+        self.rollback_to(snapshot.id)
+
     def rename_tag(self, old_name: str, new_name: str) -> None:
         """
         Rename a tag.
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py 
b/paimon-python/pypaimon/tests/table/simple_table_test.py
index dac7361103..5e13579c33 100644
--- a/paimon-python/pypaimon/tests/table/simple_table_test.py
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -686,3 +686,76 @@ class SimpleTableTest(unittest.TestCase):
             table.rollback_to("no-such-tag")
         self.assertIn("no-such-tag", str(context.exception))
         self.assertIn("doesn't exist", str(context.exception))
+
+    def test_table_rollback_to_timestamp(self):
+        """Test table-level rollback to a timestamp."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_rollback_ts', schema, False)
+        table = self.catalog.get_table('default.test_rollback_ts')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write 5 commits
+        for i in range(5):
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'pt': [1],
+                'k': [i],
+                'v': [i * 100]
+            }, schema=self.pk_pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        snapshot_mgr = table.snapshot_manager()
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+        # Get the timestamp of snapshot 3 and rollback to it
+        snap3 = snapshot_mgr.get_snapshot_by_id(3)
+        table.rollback_to_timestamp(snap3.time_millis)
+
+        self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+        self.assertIsNone(snapshot_mgr.get_snapshot_by_id(4))
+        self.assertIsNone(snapshot_mgr.get_snapshot_by_id(5))
+
+    def test_table_rollback_to_timestamp_no_match(self):
+        """Test rollback_to_timestamp raises ValueError when no snapshot 
exists."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_rollback_ts_nomatch', schema, 
False)
+        table = self.catalog.get_table('default.test_rollback_ts_nomatch')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write 1 commit
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'pt': [1],
+            'k': [0],
+            'v': [100]
+        }, schema=self.pk_pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Use a timestamp before the first snapshot
+        snapshot_mgr = table.snapshot_manager()
+        earliest = snapshot_mgr.try_get_earliest_snapshot()
+        before_earliest = earliest.time_millis - 1
+
+        with self.assertRaises(ValueError) as context:
+            table.rollback_to_timestamp(before_earliest)
+        self.assertIn("No snapshot found", str(context.exception))

Reply via email to