This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 764875f08b [python] Add replace_tag support to TagManager and 
FileStoreTable (#7904)
764875f08b is described below

commit 764875f08bcc4dcc9af53e82ce308478e120fbbb
Author: Junrui Lee <[email protected]>
AuthorDate: Wed May 20 17:27:53 2026 +0800

    [python] Add replace_tag support to TagManager and FileStoreTable (#7904)
    
    Java supports replacing an existing tag with a new snapshot via
    Table.replaceTag() and Spark's ReplaceTagProcedure. pypaimon currently
    lacks this capability.
    Users who want to update a tag (e.g. periodically refreshing a
    latest-prod tag to point to the newest snapshot) must delete and
    re-create it, leaving a window where the tag does not exist. replace_tag
    provides atomic replacement semantics.
---
 paimon-python/pypaimon/table/file_store_table.py   | 22 +++++++++
 paimon-python/pypaimon/tag/tag_manager.py          | 19 ++++++++
 .../pypaimon/tests/filesystem_catalog_tag_test.py  | 56 ++++++++++++++++++++++
 3 files changed, 97 insertions(+)

diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 94564761fe..af52bbe843 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -313,6 +313,28 @@ class FileStoreTable(Table):
         tag_mgr = self.tag_manager()
         tag_mgr.rename_tag(old_name, new_name)
 
+    def replace_tag(self, tag_name: str, snapshot_id: int = None) -> None:
+        """
+        Replace an existing tag with a new snapshot.
+
+        Args:
+            tag_name: Name of the tag to replace
+            snapshot_id: The snapshot id to associate with the tag.
+                        If None, uses the latest snapshot.
+
+        Raises:
+            ValueError: If tag doesn't exist, or snapshot doesn't exist
+        """
+        if snapshot_id is None:
+            snapshot = self.snapshot_manager().get_latest_snapshot()
+            if snapshot is None:
+                raise ValueError("Cannot replace tag because latest snapshot 
doesn't exist.")
+        else:
+            snapshot = self.snapshot_manager().get_snapshot_by_id(snapshot_id)
+            if snapshot is None:
+                raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
+        self.tag_manager().replace_tag(snapshot, tag_name)
+
     def path_factory(self) -> 'FileStorePathFactory':
         from pypaimon.utils.file_store_path_factory import FileStorePathFactory
 
diff --git a/paimon-python/pypaimon/tag/tag_manager.py 
b/paimon-python/pypaimon/tag/tag_manager.py
index 7dc3d48ae6..45a1810414 100644
--- a/paimon-python/pypaimon/tag/tag_manager.py
+++ b/paimon-python/pypaimon/tag/tag_manager.py
@@ -199,6 +199,25 @@ class TagManager:
         self.file_io.delete_quietly(path)
         return True
 
+    def replace_tag(self, snapshot: Snapshot, tag_name: str) -> None:
+        """
+        Replace an existing tag with a new snapshot.
+
+        Args:
+            snapshot: The new snapshot to associate with the tag
+            tag_name: Name of the tag to replace
+
+        Raises:
+            ValueError: If tag_name is blank or tag doesn't exist
+        """
+        if not tag_name or tag_name.isspace():
+            raise ValueError("Tag name shouldn't be blank.")
+
+        if not self.tag_exists(tag_name):
+            raise ValueError(f"Tag '{tag_name}' doesn't exist.")
+
+        self._create_or_replace_tag(snapshot, tag_name)
+
     def rename_tag(self, old_name: str, new_name: str) -> None:
         """
         Rename a tag.
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py 
b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
index b9e992ab25..a721dec3d4 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_tag_test.py
@@ -178,6 +178,62 @@ class FileSystemCatalogTagCRUDTest(unittest.TestCase):
         self.assertEqual(result.elements, [])
         self.assertIsNone(result.next_page_token)
 
+    # -- replace_tag 
-----------------------------------------------------------
+
+    def test_replace_tag_with_snapshot_id(self):
+        table = self.catalog.get_table(self.identifier)
+        # Create a second snapshot
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        w.write_arrow(pa.Table.from_pydict(
+            {"id": [4, 5], "value": ["d", "e"]},
+            schema=self.pa_schema,
+        ))
+        wb.new_commit().commit(w.prepare_commit())
+        w.close()
+
+        # Create tag pointing to snapshot 1
+        table.create_tag("replace_test", snapshot_id=1)
+        tag = table.tag_manager().get("replace_test")
+        self.assertEqual(tag.trim_to_snapshot().id, 1)
+
+        # Replace tag to point to snapshot 2
+        table.replace_tag("replace_test", snapshot_id=2)
+        tag = table.tag_manager().get("replace_test")
+        self.assertEqual(tag.trim_to_snapshot().id, 2)
+
+    def test_replace_tag_with_latest_snapshot(self):
+        table = self.catalog.get_table(self.identifier)
+        # Create a second snapshot
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        w.write_arrow(pa.Table.from_pydict(
+            {"id": [4], "value": ["d"]},
+            schema=self.pa_schema,
+        ))
+        wb.new_commit().commit(w.prepare_commit())
+        w.close()
+
+        # Create tag pointing to snapshot 1
+        table.create_tag("latest_test", snapshot_id=1)
+        # Replace with latest (should be snapshot 2)
+        table.replace_tag("latest_test")
+        tag = table.tag_manager().get("latest_test")
+        self.assertEqual(tag.trim_to_snapshot().id, 2)
+
+    def test_replace_tag_not_exists_raises(self):
+        table = self.catalog.get_table(self.identifier)
+        with self.assertRaises(ValueError) as cm:
+            table.replace_tag("nonexistent", snapshot_id=1)
+        self.assertIn("doesn't exist", str(cm.exception))
+
+    def test_replace_tag_snapshot_not_exists_raises(self):
+        table = self.catalog.get_table(self.identifier)
+        table.create_tag("exists_tag", snapshot_id=1)
+        with self.assertRaises(ValueError) as cm:
+            table.replace_tag("exists_tag", snapshot_id=999)
+        self.assertIn("doesn't exist", str(cm.exception))
+
 
 if __name__ == "__main__":
     unittest.main()

Reply via email to