This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9e6887ae15 [python] Support create tag manually and read data from
tag. (#7243)
9e6887ae15 is described below
commit 9e6887ae158d6ca67604dce69f5eadf5ce893144
Author: umi <[email protected]>
AuthorDate: Tue Feb 10 12:44:22 2026 +0800
[python] Support create tag manually and read data from tag. (#7243)
This pr support creating tag manually and reading data from tag.
---
docs/content/pypaimon/manage-tags.md | 61 ++++
paimon-python/pypaimon/__init__.py | 5 +
.../pypaimon/common/options/core_options.py | 10 +
paimon-python/pypaimon/common/options/options.py | 4 +
paimon-python/pypaimon/read/table_scan.py | 16 +
.../pypaimon/snapshot/time_travel_util.py | 75 +++++
paimon-python/pypaimon/table/file_store_table.py | 76 +++++
.../pypaimon/{ => table/source}/__init__.py | 17 --
paimon-python/pypaimon/{ => tag}/__init__.py | 18 +-
paimon-python/pypaimon/tag/tag.py | 44 +++
paimon-python/pypaimon/tag/tag_manager.py | 182 +++++++++++
.../pypaimon/tests/table/simple_table_test.py | 340 +++++++++++++++++++++
12 files changed, 816 insertions(+), 32 deletions(-)
diff --git a/docs/content/pypaimon/manage-tags.md
b/docs/content/pypaimon/manage-tags.md
new file mode 100644
index 0000000000..d3962121e8
--- /dev/null
+++ b/docs/content/pypaimon/manage-tags.md
@@ -0,0 +1,61 @@
+---
+title: "Manage Tags"
+weight: 3
+type: docs
+aliases:
+ - /pypaimon/manage-tags.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Manage Tags
+
+Just like Java API of Paimon, you can create a [tag]({{< ref
"maintenance/manage-tags" >}}) based on a snapshot. The tag will maintain the
manifests and data files of the snapshot.
+A typical usage is creating tags daily, then you can maintain the historical
data of each day for batch reading.
+## Create and Delete Tag
+
+You can create a tag with given name and snapshot ID, and delete a tag with
given name.
+
+```python
+
+table = catalog.get_table('database_name.table_name')
+table.create_tag("tag2", snapshot_id=2) # create tag2 based on snapshot 2
+table.create_tag("tag2") # create tag2 based on latest snapshot
+table.delete_tag("tag2") # delete tag2
+```
+
+If snapshot_id unset, snapshot_id defaults to the latest.
+
+## Read Tag
+You can read data from a specific tag.
+```python
+
+table = catalog.get_table('database_name.table_name')
+table.create_tag("tag2", snapshot_id=2)
+
+# Read from tag2 using scan.tag-name option
+table_with_tag = table.copy({"scan.tag-name": "tag2"})
+read_builder = table_with_tag.new_read_builder()
+table_scan = read_builder.new_scan()
+table_read = read_builder.new_read()
+result = table_read.to_arrow(table_scan.plan().splits())
+```
+
+
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/__init__.py
index 024f8b732b..77965c3a14 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -16,6 +16,7 @@
# under the License.
import sys
+
if sys.version_info[:2] == (3, 6):
try:
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
@@ -25,9 +26,13 @@ if sys.version_info[:2] == (3, 6):
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.schema.schema import Schema
+from pypaimon.tag.tag import Tag
+from pypaimon.tag.tag_manager import TagManager
__all__ = [
"PaimonVirtualFileSystem",
"CatalogFactory",
"Schema",
+ "Tag",
+ "TagManager",
]
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 43c7de8ff3..c83a75c655 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -214,6 +214,13 @@ class CoreOptions:
.with_description("The timestamp range for incremental reading.")
)
+ SCAN_TAG_NAME: ConfigOption[str] = (
+ ConfigOptions.key("scan.tag-name")
+ .string_type()
+ .no_default_value()
+ .with_description("Optional tag name used in case of 'from-snapshot'
scan mode.")
+ )
+
SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
@@ -505,6 +512,9 @@ class CoreOptions:
def incremental_between_timestamp(self, default=None):
return self.options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
default)
+ def scan_tag_name(self, default=None):
+ return self.options.get(CoreOptions.SCAN_TAG_NAME, default)
+
def source_split_target_size(self, default=None):
return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE,
default).get_bytes()
diff --git a/paimon-python/pypaimon/common/options/options.py
b/paimon-python/pypaimon/common/options/options.py
index 22f172cbdb..0718d96a41 100644
--- a/paimon-python/pypaimon/common/options/options.py
+++ b/paimon-python/pypaimon/common/options/options.py
@@ -54,5 +54,9 @@ class Options:
def contains(self, key: ConfigOption):
return key.key() in self.data
+ def contains_key(self, key: str) -> bool:
+ """Check if the given key string exists in the options."""
+ return key in self.data
+
def copy(self) -> 'Options':
return Options(dict(self.data))
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index cca81998b2..17da0692e8 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -102,6 +102,22 @@ class TableScan:
return manifests
return FileScanner(self.table, incremental_manifest,
self.predicate, self.limit)
+ elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based
reading
+ tag_name = options.get(CoreOptions.SCAN_TAG_NAME)
+
+ def tag_manifest_scanner():
+ tag_manager = self.table.tag_manager()
+ tag = tag_manager.get_or_throw(tag_name)
+ snapshot = tag.trim_to_snapshot()
+ return manifest_list_manager.read_all(snapshot)
+
+ return FileScanner(
+ self.table,
+ tag_manifest_scanner,
+ self.predicate,
+ self.limit,
+ vector_search=self.vector_search
+ )
def all_manifests():
snapshot = snapshot_manager.get_latest_snapshot()
diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py
b/paimon-python/pypaimon/snapshot/time_travel_util.py
new file mode 100644
index 0000000000..df3eeb11cb
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/time_travel_util.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""The util class of resolve snapshot from scan params for time travel."""
+
+from typing import Optional
+
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.options import Options
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.tag.tag_manager import TagManager
+
+SCAN_KEYS = [
+ CoreOptions.SCAN_TAG_NAME.key()
+]
+
+
+class TimeTravelUtil:
+ """The util class of resolve snapshot from scan params for time travel."""
+
+ @staticmethod
+ def try_travel_to_snapshot(
+ options: Options,
+ tag_manager: TagManager
+ ) -> Optional[Snapshot]:
+ """
+ Try to travel to a snapshot based on the options.
+
+ Supports the following time travel options:
+ - scan.tag-name: Travel to a specific tag
+
+ Args:
+ options: The options containing time travel parameters
+ tag_manager: The tag manager
+
+ Returns:
+ The Snapshot to travel to, or None if no time travel option is set.
+
+ Raises:
+ ValueError: If more than one time travel option is set
+ """
+
+ scan_handle_keys = [key for key in SCAN_KEYS if
options.contains_key(key)]
+
+ if not scan_handle_keys:
+ return None
+
+ if len(scan_handle_keys) > 1:
+ raise ValueError(f"Only one of the following parameters may be
set: {SCAN_KEYS}")
+
+ key = scan_handle_keys[0]
+ core_options = CoreOptions(options)
+
+ if key == CoreOptions.SCAN_TAG_NAME.key():
+ tag_name = core_options.scan_tag_name()
+ tag = tag_manager.get(tag_name)
+ if tag is None:
+ raise ValueError(f"Tag '{tag_name}' doesn't exist.")
+ return tag.trim_to_snapshot()
+ else:
+ raise ValueError(f"Unsupported time travel mode: {key}")
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 21f8c78500..e23d9cec75 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -89,6 +89,56 @@ class FileStoreTable(Table):
from pypaimon.snapshot.snapshot_manager import SnapshotManager
return SnapshotManager(self)
+ def tag_manager(self):
+ """Get the tag manager for this table."""
+ from pypaimon import TagManager
+ return TagManager(self.file_io, self.table_path, self.current_branch())
+
+ def create_tag(
+ self,
+ tag_name: str,
+ snapshot_id: Optional[int] = None,
+ ignore_if_exists: bool = False
+ ) -> None:
+ """
+ Create a tag for a snapshot.
+
+ Args:
+ tag_name: Name for the tag
+ snapshot_id: ID of the snapshot to tag. If None, uses the latest
snapshot.
+ ignore_if_exists: If True, don't raise error if tag already exists
+
+ Raises:
+ ValueError: If no snapshot exists or tag already exists (when
ignore_if_exists=False)
+ """
+
+ snapshot_mgr = self.snapshot_manager()
+
+ if snapshot_id is not None:
+ snapshot = snapshot_mgr.get_snapshot_by_id(snapshot_id)
+ if snapshot is None:
+ raise ValueError(f"Snapshot with id {snapshot_id} doesn't
exist.")
+ else:
+ snapshot = snapshot_mgr.get_latest_snapshot()
+ if snapshot is None:
+ raise ValueError("No snapshot exists in this table.")
+
+ tag_mgr = self.tag_manager()
+ tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists)
+
+ def delete_tag(self, tag_name: str) -> bool:
+ """
+ Delete a tag.
+
+ Args:
+ tag_name: Name of the tag to delete
+
+ Returns:
+ True if tag was deleted, False if tag didn't exist
+ """
+ tag_mgr = self.tag_manager()
+ return tag_mgr.delete_tag(tag_name)
+
def path_factory(self) -> 'FileStorePathFactory':
from pypaimon.utils.file_store_path_factory import FileStorePathFactory
@@ -186,10 +236,36 @@ class FileStoreTable(Table):
new_options.pop(k)
else:
new_options[k] = v
+
new_table_schema = self.table_schema.copy(new_options=new_options)
+
+ time_travel_schema = self._try_time_travel(Options(new_options))
+ if time_travel_schema is not None:
+ new_table_schema = time_travel_schema
+
return FileStoreTable(self.file_io, self.identifier, self.table_path,
new_table_schema,
self.catalog_environment)
+ def _try_time_travel(self, options: Options) -> Optional[TableSchema]:
+ """
+ Try to resolve time travel options and return the corresponding schema.
+
+ Supports the following time travel options:
+ - scan.tag-name: Travel to a specific tag
+
+ Returns:
+ The TableSchema at the time travel point, or None if no time
travel option is set.
+ """
+
+ try:
+ from pypaimon.snapshot.time_travel_util import TimeTravelUtil
+ snapshot = TimeTravelUtil.try_travel_to_snapshot(options,
self.tag_manager())
+ if snapshot is None:
+ return None
+ return
self.schema_manager.get_schema(snapshot.schema_id).copy(new_options=options.to_map())
+ except Exception:
+ return None
+
def _create_external_paths(self) -> List[str]:
from urllib.parse import urlparse
from pypaimon.common.options.core_options import ExternalPathStrategy
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/table/source/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/__init__.py
copy to paimon-python/pypaimon/table/source/__init__.py
index 024f8b732b..a67d5ea255 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/table/source/__init__.py
@@ -14,20 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-import sys
-if sys.version_info[:2] == (3, 6):
- try:
- from pypaimon.manifest import fastavro_py36_compat # noqa: F401
- except ImportError:
- pass
-
-from pypaimon.catalog.catalog_factory import CatalogFactory
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
-from pypaimon.schema.schema import Schema
-
-__all__ = [
- "PaimonVirtualFileSystem",
- "CatalogFactory",
- "Schema",
-]
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/tag/__init__.py
similarity index 66%
copy from paimon-python/pypaimon/__init__.py
copy to paimon-python/pypaimon/tag/__init__.py
index 024f8b732b..609e9a995e 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/tag/__init__.py
@@ -15,19 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-import sys
-if sys.version_info[:2] == (3, 6):
- try:
- from pypaimon.manifest import fastavro_py36_compat # noqa: F401
- except ImportError:
- pass
+from pypaimon.tag.tag import Tag
+from pypaimon.tag.tag_manager import TagManager
-from pypaimon.catalog.catalog_factory import CatalogFactory
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
-from pypaimon.schema.schema import Schema
-
-__all__ = [
- "PaimonVirtualFileSystem",
- "CatalogFactory",
- "Schema",
-]
+__all__ = ["Tag", "TagManager"]
diff --git a/paimon-python/pypaimon/tag/tag.py
b/paimon-python/pypaimon/tag/tag.py
new file mode 100644
index 0000000000..c8089861b8
--- /dev/null
+++ b/paimon-python/pypaimon/tag/tag.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class Tag(Snapshot):
+
+ def trim_to_snapshot(self) -> Snapshot:
+ """Convert this Tag to a Snapshot"""
+ return Snapshot(
+ version=self.version,
+ id=self.id,
+ schema_id=self.schema_id,
+ base_manifest_list=self.base_manifest_list,
+ delta_manifest_list=self.delta_manifest_list,
+ total_record_count=self.total_record_count,
+ delta_record_count=self.delta_record_count,
+ commit_user=self.commit_user,
+ commit_identifier=self.commit_identifier,
+ commit_kind=self.commit_kind,
+ time_millis=self.time_millis,
+ changelog_manifest_list=self.changelog_manifest_list,
+ index_manifest=self.index_manifest,
+ changelog_record_count=self.changelog_record_count,
+ watermark=self.watermark,
+ statistics=self.statistics,
+ next_row_id=self.next_row_id
+ )
diff --git a/paimon-python/pypaimon/tag/tag_manager.py
b/paimon-python/pypaimon/tag/tag_manager.py
new file mode 100644
index 0000000000..7f63f4ac06
--- /dev/null
+++ b/paimon-python/pypaimon/tag/tag_manager.py
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.json_util import JSON
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.tag.tag import Tag
+
+logger = logging.getLogger(__name__)
+
+TAG_PREFIX = "tag-"
+
+
+class TagManager:
+ """
+ Manager for Tag files.
+
+ This class manages tag files stored in the table's tag directory.
+ Tags are essentially named snapshots that can be used for time travel
queries.
+ """
+
+ def __init__(self, file_io: FileIO, table_path: str, branch: str = "main"):
+ """
+ Initialize TagManager.
+
+ Args:
+ file_io: FileIO instance for file operations
+ table_path: Path to the table root directory
+ branch: Branch name, defaults to "main"
+ """
+ self.file_io = file_io
+ self.table_path = table_path.rstrip('/')
+ self.branch = self._normalize_branch(branch)
+
+ @staticmethod
+ def _normalize_branch(branch: str) -> str:
+ """Normalize branch name."""
+ if not branch or branch == "main":
+ return "main"
+ return branch
+
+ def _branch_path(self) -> str:
+ """Get the branch path."""
+ if self.branch == "main":
+ return self.table_path
+ return f"{self.table_path}/branch/branch-{self.branch}"
+
+ def tag_directory(self) -> str:
+ """Return the root directory of tags."""
+ return f"{self._branch_path()}/tag"
+
+ def tag_path(self, tag_name: str) -> str:
+ """Return the path of a tag file."""
+ return f"{self.tag_directory()}/{TAG_PREFIX}{tag_name}"
+
+ def tag_exists(self, tag_name: str) -> bool:
+ """Check if a tag exists."""
+ path = self.tag_path(tag_name)
+ return self.file_io.exists(path)
+
+ def get(self, tag_name: str) -> Optional[Tag]:
+ """
+ Return the tag or None if the tag file not found.
+
+ Args:
+ tag_name: Name of the tag
+
+ Returns:
+ Tag instance or None if not found
+ """
+ if not tag_name or tag_name.isspace():
+ raise ValueError("Tag name shouldn't be blank.")
+
+ path = self.tag_path(tag_name)
+ if not self.file_io.exists(path):
+ return None
+
+ content = self.file_io.read_file_utf8(path)
+ return JSON.from_json(content, Tag)
+
+ def get_or_throw(self, tag_name: str) -> Tag:
+ """
+ Return the tag or throw exception indicating the tag not found.
+
+ Args:
+ tag_name: Name of the tag
+
+ Returns:
+ Tag instance
+
+ Raises:
+ ValueError: If tag doesn't exist
+ """
+ tag = self.get(tag_name)
+ if tag is None:
+ raise ValueError(f"Tag '{tag_name}' doesn't exist.")
+ return tag
+
+ def create_tag(
+ self,
+ snapshot: Snapshot,
+ tag_name: str,
+ ignore_if_exists: bool = False
+ ) -> None:
+ """
+ Create a tag from given snapshot and save it in the storage.
+
+ Args:
+ snapshot: The snapshot to tag
+ tag_name: Name for the tag
+ ignore_if_exists: If True, don't raise error if tag already exists
+
+ Raises:
+ ValueError: If tag_name is blank or tag already exists (when
ignore_if_exists=False)
+ """
+ if not tag_name or tag_name.isspace():
+ raise ValueError("Tag name shouldn't be blank.")
+
+ if self.tag_exists(tag_name):
+ if ignore_if_exists:
+ return
+ raise ValueError(f"Tag '{tag_name}' already exists.")
+
+ self._create_or_replace_tag(snapshot, tag_name)
+
+ def _create_or_replace_tag(
+ self,
+ snapshot: Snapshot,
+ tag_name: str
+ ) -> None:
+ """
+ Internal method to create or replace a tag.
+ """
+ tag_path = self.tag_path(tag_name)
+
+ # Ensure tag directory exists
+ tag_dir = self.tag_directory()
+ if not self.file_io.exists(tag_dir):
+ self.file_io.mkdirs(tag_dir)
+
+ content = JSON.to_json(snapshot)
+
+ self.file_io.overwrite_file_utf8(tag_path, content)
+
+ def delete_tag(self, tag_name: str) -> bool:
+ """
+ Delete a tag.
+
+ Args:
+ tag_name: Name of the tag to delete
+
+ Returns:
+ True if tag was deleted, False if tag didn't exist
+ """
+ if not tag_name or tag_name.isspace():
+ raise ValueError("Tag name shouldn't be blank.")
+
+ tag = self.get(tag_name)
+ if tag is None:
+ logger.warning(f"Tag '{tag_name}' doesn't exist.")
+ return False
+
+ path = self.tag_path(tag_name)
+ self.file_io.delete_quietly(path)
+ return True
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py
b/paimon-python/pypaimon/tests/table/simple_table_test.py
new file mode 100644
index 0000000000..4a4d024f39
--- /dev/null
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -0,0 +1,340 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.schema.schema_manager import SchemaManager
+
+
+class SimpleTableTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ ('pt', pa.int32()),
+ ('k', pa.int32()),
+ ('v', pa.int64())
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_tag_scan(self):
+ """
+ Test reading from a specific tag.
+
+ 1. Write data in 3 commits
+ 2. Create a tag at snapshot 2
+ 3. Read from the tag and verify only data from snapshots 1 and 2 is
returned
+ """
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_tag_scan', schema, False)
+ table = self.catalog.get_table('default.test_tag_scan')
+
+ write_builder = table.new_batch_write_builder()
+
+ # First commit: pt=1, k=10, v=100 and pt=1, k=20, v=200
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = pa.Table.from_pydict({
+ 'pt': [1, 1],
+ 'k': [10, 20],
+ 'v': [100, 200]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Second commit: pt=2, k=30, v=101 and pt=2, k=40, v=201
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data2 = pa.Table.from_pydict({
+ 'pt': [2, 2],
+ 'k': [30, 40],
+ 'v': [101, 201]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Third commit: pt=3, k=50, v=500 and pt=3, k=60, v=600
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data3 = pa.Table.from_pydict({
+ 'pt': [3, 3],
+ 'k': [50, 60],
+ 'v': [500, 600]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data3)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Create tag at snapshot 2
+ table.create_tag("tag2", snapshot_id=2)
+
+ # Read from tag2 using scan.tag-name option
+ table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key(): "tag2"})
+ read_builder = table_with_tag.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify: should only contain data from snapshot 1 and 2
+ # (pt=1, k=10, v=100), (pt=1, k=20, v=200), (pt=2, k=30, v=101),
(pt=2, k=40, v=201)
+ result_sorted = result.sort_by([('pt', 'ascending'), ('k',
'ascending')])
+
+ expected = pa.Table.from_pydict({
+ 'pt': [1, 1, 2, 2],
+ 'k': [10, 20, 30, 40],
+ 'v': [100, 200, 101, 201]
+ }, schema=self.pa_schema)
+
+ self.assertEqual(result_sorted.num_rows, 4)
+ self.assertEqual(result_sorted.column('pt').to_pylist(),
expected.column('pt').to_pylist())
+ self.assertEqual(result_sorted.column('k').to_pylist(),
expected.column('k').to_pylist())
+ self.assertEqual(result_sorted.column('v').to_pylist(),
expected.column('v').to_pylist())
+
+ def test_non_existing_tag(self):
+ """
+ Test that reading from a non-existing tag raises an error.
+ """
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_non_existing_tag', schema,
False)
+ table = self.catalog.get_table('default.test_non_existing_tag')
+
+ # Try to read from a non-existing tag
+ table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key():
"non-existing"})
+ read_builder = table_with_tag.new_read_builder()
+ table_scan = read_builder.new_scan()
+
+ with self.assertRaises(ValueError) as context:
+ table_scan.plan()
+
+ self.assertIn("non-existing", str(context.exception))
+ self.assertIn("doesn't exist", str(context.exception))
+
+ def test_tag_create_and_delete(self):
+ """Test creating and deleting tags."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_tag_create_delete', schema,
False)
+ table = self.catalog.get_table('default.test_tag_create_delete')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write some data
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1, 1],
+ 'k': [10, 20],
+ 'v': [100, 200]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Create a tag
+ table.create_tag("test_tag")
+
+ # Verify tag exists
+ tag_manager = table.tag_manager()
+ self.assertTrue(tag_manager.tag_exists("test_tag"))
+
+ # Get the tag
+ tag = tag_manager.get("test_tag")
+ self.assertIsNotNone(tag)
+ self.assertEqual(tag.id, 1)
+
+ # Delete the tag
+ result = table.delete_tag("test_tag")
+ self.assertTrue(result)
+
+ # Verify tag no longer exists
+ self.assertFalse(tag_manager.tag_exists("test_tag"))
+
+ def test_tag_ignore_if_exists(self):
+ """Test creating a tag with ignore_if_exists=True."""
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['pt', 'k'],
+ partition_keys=['pt'],
+ options={'bucket': '3'}
+ )
+ self.catalog.create_table('default.test_tag_ignore_exists', schema,
False)
+ table = self.catalog.get_table('default.test_tag_ignore_exists')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Write some data
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'pt': [1],
+ 'k': [10],
+ 'v': [100]
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Create a tag
+ table.create_tag("duplicate_tag")
+
+ # Try to create the same tag again without ignore_if_exists - should
raise error
+ with self.assertRaises(ValueError) as context:
+ table.create_tag("duplicate_tag")
+ self.assertIn("already exists", str(context.exception))
+
+ # Create the same tag with ignore_if_exists=True - should not raise
error
+ table.create_tag("duplicate_tag", ignore_if_exists=True)
+
+ def test_schema_evolution_tag_read(self):
+ # schema 0
+ pa_schema = pa.schema([
+ ('user_id', pa.int64()),
+ ('item_id', pa.int64()),
+ ('dt', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ self.catalog.create_table('default.test_0', schema, False)
+ table1 = self.catalog.get_table('default.test_0')
+ write_builder = table1.new_batch_write_builder()
+ # write 1
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ # write 2
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [11, 22, 33, 44],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ # schema 1 add behavior column
+ pa_schema = pa.schema([
+ ('user_id', pa.int64()),
+ ('item_id', pa.int64()),
+ ('dt', pa.string()),
+ ('behavior', pa.string())
+ ])
+ schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ self.catalog.create_table('default.test_1', schema2, False)
+ table2 = self.catalog.get_table('default.test_1')
+ write_builder = table2.new_batch_write_builder()
+ # write 1
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ }
+ pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ # write 2
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data2 = {
+ 'user_id': [55, 66, 77, 88],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ }
+ pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Create tag at snapshot 2
+ table1.create_tag("tag2", snapshot_id=2)
+ table2.create_tag("tag2", snapshot_id=2)
+ # When table2 read tag2, it will access table1's schema
+ table2.schema_manager = SchemaManager(table2.file_io,
table1.table_path)
+ table_with_tag = table2.copy({CoreOptions.SCAN_TAG_NAME.key(): "tag2"})
+ read_builder = table_with_tag.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ self.assertEqual(result.num_rows, 8)
+
+ expected_schema = pa.schema([
+ ('user_id', pa.int64()),
+ ('item_id', pa.int64()),
+ ('dt', pa.string())
+ ])
+ expected = pa.Table.from_pydict({
+ 'user_id': [5, 6, 7, 8, 55, 66, 77, 88],
+ 'item_id': [1005, 1006, 1007, 1008, 1005, 1006, 1007, 1008],
+ 'dt': ["p2", "p1", "p2", "p2", "p2", "p1", "p2", "p2"]
+ }, schema=expected_schema)
+
+ self.assertEqual(expected, result.sort_by('user_id'))