This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 809fe70673 [python] Support rename tag API for pypaimon (#7322)
809fe70673 is described below

commit 809fe706733ad5c6ceda1b8f2ad99af3e9a4928d
Author: xuzifu666 <[email protected]>
AuthorDate: Sat Feb 28 17:09:30 2026 +0800

    [python] Support rename tag API for pypaimon (#7322)
---
 paimon-python/pypaimon/table/file_store_table.py   |  15 ++
 paimon-python/pypaimon/tag/tag_manager.py          |  37 ++++-
 .../pypaimon/tests/table/simple_table_test.py      | 180 +++++++++++++++++++++
 3 files changed, 230 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index f3e61cc930..4f7e3c81dd 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -149,6 +149,21 @@ class FileStoreTable(Table):
         tag_mgr = self.tag_manager()
         return tag_mgr.list_tags()
 
+    def rename_tag(self, old_name: str, new_name: str) -> None:
+        """
+        Rename a tag.
+
+        Args:
+            old_name: Current name of the tag
+            new_name: New name for the tag
+
+        Raises:
+            ValueError: If old_name or new_name is blank, old_name doesn't 
exist,
+                       or new_name already exists
+        """
+        tag_mgr = self.tag_manager()
+        tag_mgr.rename_tag(old_name, new_name)
+
     def path_factory(self) -> 'FileStorePathFactory':
         from pypaimon.utils.file_store_path_factory import FileStorePathFactory
 
diff --git a/paimon-python/pypaimon/tag/tag_manager.py 
b/paimon-python/pypaimon/tag/tag_manager.py
index e342147a16..4d3f902bbd 100644
--- a/paimon-python/pypaimon/tag/tag_manager.py
+++ b/paimon-python/pypaimon/tag/tag_manager.py
@@ -180,10 +180,10 @@ class TagManager:
     def delete_tag(self, tag_name: str) -> bool:
         """
         Delete a tag.
-        
+
         Args:
             tag_name: Name of the tag to delete
-            
+
         Returns:
             True if tag was deleted, False if tag didn't exist
         """
@@ -198,3 +198,36 @@ class TagManager:
         path = self.tag_path(tag_name)
         self.file_io.delete_quietly(path)
         return True
+
+    def rename_tag(self, old_name: str, new_name: str) -> None:
+        """
+        Rename a tag.
+
+        Args:
+            old_name: Current name of the tag
+            new_name: New name for the tag
+
+        Raises:
+            ValueError: If old_name or new_name is blank, old_name doesn't 
exist,
+                       or new_name already exists
+        """
+        if not old_name or old_name.isspace():
+            raise ValueError("Old tag name shouldn't be blank.")
+
+        if not new_name or new_name.isspace():
+            raise ValueError("New tag name shouldn't be blank.")
+
+        # Check if old tag exists
+        if not self.tag_exists(old_name):
+            raise ValueError(f"Tag '{old_name}' doesn't exist.")
+
+        # Check if new tag already exists
+        if self.tag_exists(new_name):
+            raise ValueError(f"Tag '{new_name}' already exists.")
+
+        # Rename the tag file from old name to new name
+        old_path = self.tag_path(old_name)
+        new_path = self.tag_path(new_name)
+        self.file_io.rename(old_path, new_path)
+
+        logger.info(f"Tag renamed from '{old_name}' to '{new_name}'.")
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py 
b/paimon-python/pypaimon/tests/table/simple_table_test.py
index 66579d3e85..683f72c811 100644
--- a/paimon-python/pypaimon/tests/table/simple_table_test.py
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -254,6 +254,186 @@ class SimpleTableTest(unittest.TestCase):
         # Create the same tag with ignore_if_exists=True - should not raise 
error
         table.create_tag("duplicate_tag", ignore_if_exists=True)
 
+    def test_tag_rename(self):
+        """Test renaming a tag."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_tag_rename', schema, False)
+        table = self.catalog.get_table('default.test_tag_rename')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write some data - snapshot 1
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'pt': [1],
+            'k': [10],
+            'v': [100]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Create a tag
+        table.create_tag("old_tag")
+
+        # Rename the tag
+        table.rename_tag("old_tag", "new_tag")
+
+        # Verify old tag no longer exists
+        tags = table.list_tags()
+        self.assertNotIn("old_tag", tags)
+        self.assertIn("new_tag", tags)
+
+        # Verify tag content is preserved
+        tag_manager = table.tag_manager()
+        tag = tag_manager.get("new_tag")
+        self.assertIsNotNone(tag)
+        self.assertEqual(tag.id, 1)
+
+    def test_tag_rename_nonexistent(self):
+        """Test renaming a nonexistent tag."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_tag_rename_nonexistent', 
schema, False)
+        table = self.catalog.get_table('default.test_tag_rename_nonexistent')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write some data
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'pt': [1],
+            'k': [10],
+            'v': [100]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Try to rename nonexistent tag
+        with self.assertRaises(ValueError) as context:
+            table.rename_tag("nonexistent", "new_tag")
+        self.assertIn("doesn't exist", str(context.exception))
+
+    def test_tag_rename_to_existing(self):
+        """Test renaming a tag to a name that already exists."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_tag_rename_to_existing', 
schema, False)
+        table = self.catalog.get_table('default.test_tag_rename_to_existing')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write some data - snapshot 1
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'pt': [1],
+            'k': [10],
+            'v': [100]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Create two tags
+        table.create_tag("tag1")
+        table.create_tag("tag2")
+
+        # Try to rename tag1 to tag2 (which already exists)
+        with self.assertRaises(ValueError) as context:
+            table.rename_tag("tag1", "tag2")
+        self.assertIn("already exists", str(context.exception))
+
+        # Verify both original tags still exist
+        tags = table.list_tags()
+        self.assertIn("tag1", tags)
+        self.assertIn("tag2", tags)
+
+    def test_tag_rename_and_scan(self):
+        """Test that renaming a tag and then scanning by new name works 
correctly."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_tag_rename_scan', schema, 
False)
+        table = self.catalog.get_table('default.test_tag_rename_scan')
+
+        write_builder = table.new_batch_write_builder()
+
+        # First commit
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = pa.Table.from_pydict({
+            'pt': [1, 1],
+            'k': [10, 20],
+            'v': [100, 200]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Second commit
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data2 = pa.Table.from_pydict({
+            'pt': [2, 2],
+            'k': [30, 40],
+            'v': [101, 201]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Create tag at snapshot 1
+        table.create_tag("old_tag", snapshot_id=1)
+
+        # Rename the tag
+        table.rename_tag("old_tag", "new_tag")
+
+        # Read from the renamed tag
+        table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key(): 
"new_tag"})
+        read_builder = table_with_tag.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify: should only contain data from snapshot 1
+        result_sorted = result.sort_by([('pt', 'ascending'), ('k', 
'ascending')])
+
+        expected = pa.Table.from_pydict({
+            'pt': [1, 1],
+            'k': [10, 20],
+            'v': [100, 200]
+        }, schema=self.pa_schema)
+
+        self.assertEqual(result_sorted.num_rows, 2)
+        self.assertEqual(result_sorted.column('pt').to_pylist(), 
expected.column('pt').to_pylist())
+        self.assertEqual(result_sorted.column('k').to_pylist(), 
expected.column('k').to_pylist())
+        self.assertEqual(result_sorted.column('v').to_pylist(), 
expected.column('v').to_pylist())
+
     def test_schema_evolution_tag_read(self):
         # schema 0
         pa_schema = pa.schema([

Reply via email to