This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 764875f08b [python] Add replace_tag support to TagManager and
FileStoreTable (#7904)
764875f08b is described below
commit 764875f08bcc4dcc9af53e82ce308478e120fbbb
Author: Junrui Lee <[email protected]>
AuthorDate: Wed May 20 17:27:53 2026 +0800
[python] Add replace_tag support to TagManager and FileStoreTable (#7904)
Java supports replacing an existing tag with a new snapshot via
Table.replaceTag() and Spark's ReplaceTagProcedure. pypaimon currently
lacks this capability.
Users who want to update a tag (e.g. periodically refreshing a
latest-prod tag to point to the newest snapshot) must delete and
re-create it, leaving a window where the tag does not exist. replace_tag
provides atomic replacement semantics.
---
paimon-python/pypaimon/table/file_store_table.py | 22 +++++++++
paimon-python/pypaimon/tag/tag_manager.py | 19 ++++++++
.../pypaimon/tests/filesystem_catalog_tag_test.py | 56 ++++++++++++++++++++++
3 files changed, 97 insertions(+)
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 94564761fe..af52bbe843 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -313,6 +313,28 @@ class FileStoreTable(Table):
tag_mgr = self.tag_manager()
tag_mgr.rename_tag(old_name, new_name)
+ def replace_tag(self, tag_name: str, snapshot_id: int = None) -> None:
+ """
+ Replace an existing tag with a new snapshot.
+
+ Args:
+ tag_name: Name of the tag to replace
+ snapshot_id: The snapshot id to associate with the tag.
+ If None, uses the latest snapshot.
+
+ Raises:
+ ValueError: If tag doesn't exist, or snapshot doesn't exist
+ """
+ if snapshot_id is None:
+ snapshot = self.snapshot_manager().get_latest_snapshot()
+ if snapshot is None:
+ raise ValueError("Cannot replace tag because latest snapshot
doesn't exist.")
+ else:
+ snapshot = self.snapshot_manager().get_snapshot_by_id(snapshot_id)
+ if snapshot is None:
+ raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
+ self.tag_manager().replace_tag(snapshot, tag_name)
+
def path_factory(self) -> 'FileStorePathFactory':
from pypaimon.utils.file_store_path_factory import FileStorePathFactory
diff --git a/paimon-python/pypaimon/tag/tag_manager.py
b/paimon-python/pypaimon/tag/tag_manager.py
index 7dc3d48ae6..45a1810414 100644
--- a/paimon-python/pypaimon/tag/tag_manager.py
+++ b/paimon-python/pypaimon/tag/tag_manager.py
@@ -199,6 +199,25 @@ class TagManager:
self.file_io.delete_quietly(path)
return True
+ def replace_tag(self, snapshot: Snapshot, tag_name: str) -> None:
+ """
+ Replace an existing tag with a new snapshot.
+
+ Args:
+ snapshot: The new snapshot to associate with the tag
+ tag_name: Name of the tag to replace
+
+ Raises:
+ ValueError: If tag_name is blank or tag doesn't exist
+ """
+ if not tag_name or tag_name.isspace():
+ raise ValueError("Tag name shouldn't be blank.")
+
+ if not self.tag_exists(tag_name):
+ raise ValueError(f"Tag '{tag_name}' doesn't exist.")
+
+ self._create_or_replace_tag(snapshot, tag_name)
+
def rename_tag(self, old_name: str, new_name: str) -> None:
"""
Rename a tag.
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
index b9e992ab25..a721dec3d4 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
@@ -178,6 +178,62 @@ class FileSystemCatalogTagCRUDTest(unittest.TestCase):
self.assertEqual(result.elements, [])
self.assertIsNone(result.next_page_token)
+ # -- replace_tag
-----------------------------------------------------------
+
+ def test_replace_tag_with_snapshot_id(self):
+ table = self.catalog.get_table(self.identifier)
+ # Create a second snapshot
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(pa.Table.from_pydict(
+ {"id": [4, 5], "value": ["d", "e"]},
+ schema=self.pa_schema,
+ ))
+ wb.new_commit().commit(w.prepare_commit())
+ w.close()
+
+ # Create tag pointing to snapshot 1
+ table.create_tag("replace_test", snapshot_id=1)
+ tag = table.tag_manager().get("replace_test")
+ self.assertEqual(tag.trim_to_snapshot().id, 1)
+
+ # Replace tag to point to snapshot 2
+ table.replace_tag("replace_test", snapshot_id=2)
+ tag = table.tag_manager().get("replace_test")
+ self.assertEqual(tag.trim_to_snapshot().id, 2)
+
+ def test_replace_tag_with_latest_snapshot(self):
+ table = self.catalog.get_table(self.identifier)
+ # Create a second snapshot
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(pa.Table.from_pydict(
+ {"id": [4], "value": ["d"]},
+ schema=self.pa_schema,
+ ))
+ wb.new_commit().commit(w.prepare_commit())
+ w.close()
+
+ # Create tag pointing to snapshot 1
+ table.create_tag("latest_test", snapshot_id=1)
+ # Replace with latest (should be snapshot 2)
+ table.replace_tag("latest_test")
+ tag = table.tag_manager().get("latest_test")
+ self.assertEqual(tag.trim_to_snapshot().id, 2)
+
+ def test_replace_tag_not_exists_raises(self):
+ table = self.catalog.get_table(self.identifier)
+ with self.assertRaises(ValueError) as cm:
+ table.replace_tag("nonexistent", snapshot_id=1)
+ self.assertIn("doesn't exist", str(cm.exception))
+
+ def test_replace_tag_snapshot_not_exists_raises(self):
+ table = self.catalog.get_table(self.identifier)
+ table.create_tag("exists_tag", snapshot_id=1)
+ with self.assertRaises(ValueError) as cm:
+ table.replace_tag("exists_tag", snapshot_id=999)
+ self.assertIn("doesn't exist", str(cm.exception))
+
if __name__ == "__main__":
unittest.main()