This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 809fe70673 [python] Support rename tag API for pypaimon (#7322)
809fe70673 is described below
commit 809fe706733ad5c6ceda1b8f2ad99af3e9a4928d
Author: xuzifu666 <[email protected]>
AuthorDate: Sat Feb 28 17:09:30 2026 +0800
[python] Support rename tag API for pypaimon (#7322)
---
paimon-python/pypaimon/table/file_store_table.py | 15 ++
paimon-python/pypaimon/tag/tag_manager.py | 37 ++++-
.../pypaimon/tests/table/simple_table_test.py | 180 +++++++++++++++++++++
3 files changed, 230 insertions(+), 2 deletions(-)
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index f3e61cc930..4f7e3c81dd 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -149,6 +149,21 @@ class FileStoreTable(Table):
tag_mgr = self.tag_manager()
return tag_mgr.list_tags()
+ def rename_tag(self, old_name: str, new_name: str) -> None:
+ """
+ Rename a tag.
+
+ Args:
+ old_name: Current name of the tag
+ new_name: New name for the tag
+
+ Raises:
+ ValueError: If old_name or new_name is blank, old_name doesn't
exist,
+ or new_name already exists
+ """
+ tag_mgr = self.tag_manager()
+ tag_mgr.rename_tag(old_name, new_name)
+
def path_factory(self) -> 'FileStorePathFactory':
from pypaimon.utils.file_store_path_factory import FileStorePathFactory
diff --git a/paimon-python/pypaimon/tag/tag_manager.py
b/paimon-python/pypaimon/tag/tag_manager.py
index e342147a16..4d3f902bbd 100644
--- a/paimon-python/pypaimon/tag/tag_manager.py
+++ b/paimon-python/pypaimon/tag/tag_manager.py
@@ -180,10 +180,10 @@ class TagManager:
def delete_tag(self, tag_name: str) -> bool:
"""
Delete a tag.
-
+
Args:
tag_name: Name of the tag to delete
-
+
Returns:
True if tag was deleted, False if tag didn't exist
"""
@@ -198,3 +198,36 @@ class TagManager:
path = self.tag_path(tag_name)
self.file_io.delete_quietly(path)
return True
+
+ def rename_tag(self, old_name: str, new_name: str) -> None:
+ """
+ Rename a tag.
+
+ Args:
+ old_name: Current name of the tag
+ new_name: New name for the tag
+
+ Raises:
+ ValueError: If old_name or new_name is blank, old_name doesn't
exist,
+ or new_name already exists
+ """
+ if not old_name or old_name.isspace():
+ raise ValueError("Old tag name shouldn't be blank.")
+
+ if not new_name or new_name.isspace():
+ raise ValueError("New tag name shouldn't be blank.")
+
+ # Check if old tag exists
+ if not self.tag_exists(old_name):
+ raise ValueError(f"Tag '{old_name}' doesn't exist.")
+
+ # Check if new tag already exists
+ if self.tag_exists(new_name):
+ raise ValueError(f"Tag '{new_name}' already exists.")
+
+ # Rename the tag file from old name to new name
+ old_path = self.tag_path(old_name)
+ new_path = self.tag_path(new_name)
+ self.file_io.rename(old_path, new_path)
+
+ logger.info(f"Tag renamed from '{old_name}' to '{new_name}'.")
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py
b/paimon-python/pypaimon/tests/table/simple_table_test.py
index 66579d3e85..683f72c811 100644
--- a/paimon-python/pypaimon/tests/table/simple_table_test.py
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -254,6 +254,186 @@ class SimpleTableTest(unittest.TestCase):
# Create the same tag with ignore_if_exists=True - should not raise
error
table.create_tag("duplicate_tag", ignore_if_exists=True)
+ def test_tag_rename(self):
+ """Test renaming a tag."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_tag_rename', schema, False)
+ table = self.catalog.get_table('default.test_tag_rename')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write some data - snapshot 1
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [10],
+ 'v': [100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Create a tag
+ table.create_tag("old_tag")
+
+ # Rename the tag
+ table.rename_tag("old_tag", "new_tag")
+
+ # Verify old tag no longer exists
+ tags = table.list_tags()
+ self.assertNotIn("old_tag", tags)
+ self.assertIn("new_tag", tags)
+
+ # Verify tag content is preserved
+ tag_manager = table.tag_manager()
+ tag = tag_manager.get("new_tag")
+ self.assertIsNotNone(tag)
+ self.assertEqual(tag.id, 1)
+
+ def test_tag_rename_nonexistent(self):
+ """Test renaming a nonexistent tag."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_tag_rename_nonexistent',
schema, False)
+ table = self.catalog.get_table('default.test_tag_rename_nonexistent')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write some data
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [10],
+ 'v': [100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Try to rename nonexistent tag
+ with self.assertRaises(ValueError) as context:
+ table.rename_tag("nonexistent", "new_tag")
+ self.assertIn("doesn't exist", str(context.exception))
+
+ def test_tag_rename_to_existing(self):
+ """Test renaming a tag to a name that already exists."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_tag_rename_to_existing',
schema, False)
+ table = self.catalog.get_table('default.test_tag_rename_to_existing')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write some data - snapshot 1
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [10],
+ 'v': [100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Create two tags
+ table.create_tag("tag1")
+ table.create_tag("tag2")
+
+ # Try to rename tag1 to tag2 (which already exists)
+ with self.assertRaises(ValueError) as context:
+ table.rename_tag("tag1", "tag2")
+ self.assertIn("already exists", str(context.exception))
+
+ # Verify both original tags still exist
+ tags = table.list_tags()
+ self.assertIn("tag1", tags)
+ self.assertIn("tag2", tags)
+
+ def test_tag_rename_and_scan(self):
+ """Test that renaming a tag and then scanning by new name works
correctly."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_tag_rename_scan', schema,
False)
+ table = self.catalog.get_table('default.test_tag_rename_scan')
+
+ write_builder = table.new_batch_write_builder()
+
+ # First commit
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = pa.Table.from_pydict({
+ 'pt': [1, 1],
+ 'k': [10, 20],
+ 'v': [100, 200]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Second commit
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data2 = pa.Table.from_pydict({
+ 'pt': [2, 2],
+ 'k': [30, 40],
+ 'v': [101, 201]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Create tag at snapshot 1
+ table.create_tag("old_tag", snapshot_id=1)
+
+ # Rename the tag
+ table.rename_tag("old_tag", "new_tag")
+
+ # Read from the renamed tag
+ table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key():
"new_tag"})
+ read_builder = table_with_tag.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify: should only contain data from snapshot 1
+ result_sorted = result.sort_by([('pt', 'ascending'), ('k',
'ascending')])
+
+ expected = pa.Table.from_pydict({
+ 'pt': [1, 1],
+ 'k': [10, 20],
+ 'v': [100, 200]
+ }, schema=self.pa_schema)
+
+ self.assertEqual(result_sorted.num_rows, 2)
+ self.assertEqual(result_sorted.column('pt').to_pylist(),
expected.column('pt').to_pylist())
+ self.assertEqual(result_sorted.column('k').to_pylist(),
expected.column('k').to_pylist())
+ self.assertEqual(result_sorted.column('v').to_pylist(),
expected.column('v').to_pylist())
+
def test_schema_evolution_tag_read(self):
# schema 0
pa_schema = pa.schema([