This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new db117e108c [python] Add rollback_to_timestamp to FileStoreTable (#7941)
db117e108c is described below
commit db117e108c6a3bdf0af4f7672b4db311c6b94da2
Author: Junrui Lee <[email protected]>
AuthorDate: Sun May 24 09:50:47 2026 +0800
[python] Add rollback_to_timestamp to FileStoreTable (#7941)
---
paimon-python/pypaimon/table/file_store_table.py | 17 +++++
.../pypaimon/tests/table/simple_table_test.py | 73 ++++++++++++++++++++++
2 files changed, 90 insertions(+)
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index af52bbe843..67be2587b7 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -298,6 +298,23 @@ class FileStoreTable(Table):
return RollbackHelper(
self.snapshot_manager(), self.tag_manager(), self.file_io)
+ def rollback_to_timestamp(self, timestamp_millis: int) -> None:
+ """Rollback table to the latest snapshot with commit time <= the given
timestamp.
+
+ Args:
+ timestamp_millis: The timestamp in milliseconds to rollback to.
+
+ Raises:
+ ValueError: If no snapshot exists at or before the given timestamp.
+ """
+ snapshot_mgr = self.snapshot_manager()
+ snapshot = snapshot_mgr.earlier_or_equal_time_mills(timestamp_millis)
+ if snapshot is None:
+ raise ValueError(
+ f"No snapshot found with timestamp earlier than or equal to
{timestamp_millis}ms."
+ )
+ self.rollback_to(snapshot.id)
+
def rename_tag(self, old_name: str, new_name: str) -> None:
"""
Rename a tag.
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py
b/paimon-python/pypaimon/tests/table/simple_table_test.py
index dac7361103..5e13579c33 100644
--- a/paimon-python/pypaimon/tests/table/simple_table_test.py
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -686,3 +686,76 @@ class SimpleTableTest(unittest.TestCase):
table.rollback_to("no-such-tag")
self.assertIn("no-such-tag", str(context.exception))
self.assertIn("doesn't exist", str(context.exception))
+
+ def test_table_rollback_to_timestamp(self):
+ """Test table-level rollback to a timestamp."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_rollback_ts', schema, False)
+ table = self.catalog.get_table('default.test_rollback_ts')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write 5 commits
+ for i in range(5):
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [i],
+ 'v': [i * 100]
+ }, schema=self.pk_pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ snapshot_mgr = table.snapshot_manager()
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 5)
+
+ # Get the timestamp of snapshot 3 and rollback to it
+ snap3 = snapshot_mgr.get_snapshot_by_id(3)
+ table.rollback_to_timestamp(snap3.time_millis)
+
+ self.assertEqual(snapshot_mgr.get_latest_snapshot().id, 3)
+ self.assertIsNone(snapshot_mgr.get_snapshot_by_id(4))
+ self.assertIsNone(snapshot_mgr.get_snapshot_by_id(5))
+
+ def test_table_rollback_to_timestamp_no_match(self):
+ """Test rollback_to_timestamp raises ValueError when no snapshot
exists."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_rollback_ts_nomatch', schema,
False)
+ table = self.catalog.get_table('default.test_rollback_ts_nomatch')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write 1 commit
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [0],
+ 'v': [100]
+ }, schema=self.pk_pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Use a timestamp before the first snapshot
+ snapshot_mgr = table.snapshot_manager()
+ earliest = snapshot_mgr.try_get_earliest_snapshot()
+ before_earliest = earliest.time_millis - 1
+
+ with self.assertRaises(ValueError) as context:
+ table.rollback_to_timestamp(before_earliest)
+ self.assertIn("No snapshot found", str(context.exception))