This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e6887ae15 [python] Support create tag manually and read data from 
tag. (#7243)
9e6887ae15 is described below

commit 9e6887ae158d6ca67604dce69f5eadf5ce893144
Author: umi <[email protected]>
AuthorDate: Tue Feb 10 12:44:22 2026 +0800

    [python] Support create tag manually and read data from tag. (#7243)
    
    This pr support creating tag manually and reading data from tag.
---
 docs/content/pypaimon/manage-tags.md               |  61 ++++
 paimon-python/pypaimon/__init__.py                 |   5 +
 .../pypaimon/common/options/core_options.py        |  10 +
 paimon-python/pypaimon/common/options/options.py   |   4 +
 paimon-python/pypaimon/read/table_scan.py          |  16 +
 .../pypaimon/snapshot/time_travel_util.py          |  75 +++++
 paimon-python/pypaimon/table/file_store_table.py   |  76 +++++
 .../pypaimon/{ => table/source}/__init__.py        |  17 --
 paimon-python/pypaimon/{ => tag}/__init__.py       |  18 +-
 paimon-python/pypaimon/tag/tag.py                  |  44 +++
 paimon-python/pypaimon/tag/tag_manager.py          | 182 +++++++++++
 .../pypaimon/tests/table/simple_table_test.py      | 340 +++++++++++++++++++++
 12 files changed, 816 insertions(+), 32 deletions(-)

diff --git a/docs/content/pypaimon/manage-tags.md 
b/docs/content/pypaimon/manage-tags.md
new file mode 100644
index 0000000000..d3962121e8
--- /dev/null
+++ b/docs/content/pypaimon/manage-tags.md
@@ -0,0 +1,61 @@
+---
+title: "Manage Tags"
+weight: 3
+type: docs
+aliases:
+  - /pypaimon/manage-tags.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Manage Tags
+
+Just like Java API of Paimon, you can create a [tag]({{< ref 
"maintenance/manage-tags" >}}) based on a snapshot. The tag will maintain the 
manifests and data files of the snapshot. 
+A typical usage is creating tags daily, then you can maintain the historical 
data of each day for batch reading.
+## Create and Delete Tag
+
+You can create a tag with given name and snapshot ID, and delete a tag with 
given name.
+
+```python
+
+table = catalog.get_table('database_name.table_name')
+table.create_tag("tag2", snapshot_id=2)  # create tag2 based on snapshot 2
+table.create_tag("tag2")  # create tag2 based on latest snapshot
+table.delete_tag("tag2")  # delete tag2
+```
+
+If snapshot_id unset, snapshot_id defaults to the latest.
+
+## Read Tag
+You can read data from a specific tag.
+```python
+
+table = catalog.get_table('database_name.table_name')
+table.create_tag("tag2", snapshot_id=2)
+
+# Read from tag2 using scan.tag-name option
+table_with_tag = table.copy({"scan.tag-name": "tag2"})
+read_builder = table_with_tag.new_read_builder()
+table_scan = read_builder.new_scan()
+table_read = read_builder.new_read()
+result = table_read.to_arrow(table_scan.plan().splits())
+```
+
+
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/__init__.py
index 024f8b732b..77965c3a14 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -16,6 +16,7 @@
 #  under the License.
 
 import sys
+
 if sys.version_info[:2] == (3, 6):
     try:
         from pypaimon.manifest import fastavro_py36_compat  # noqa: F401
@@ -25,9 +26,13 @@ if sys.version_info[:2] == (3, 6):
 from pypaimon.catalog.catalog_factory import CatalogFactory
 from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
 from pypaimon.schema.schema import Schema
+from pypaimon.tag.tag import Tag
+from pypaimon.tag.tag_manager import TagManager
 
 __all__ = [
     "PaimonVirtualFileSystem",
     "CatalogFactory",
     "Schema",
+    "Tag",
+    "TagManager",
 ]
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 43c7de8ff3..c83a75c655 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -214,6 +214,13 @@ class CoreOptions:
         .with_description("The timestamp range for incremental reading.")
     )
 
+    SCAN_TAG_NAME: ConfigOption[str] = (
+        ConfigOptions.key("scan.tag-name")
+        .string_type()
+        .no_default_value()
+        .with_description("Optional tag name used in case of 'from-snapshot' 
scan mode.")
+    )
+
     SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
         ConfigOptions.key("source.split.target-size")
         .memory_type()
@@ -505,6 +512,9 @@ class CoreOptions:
     def incremental_between_timestamp(self, default=None):
         return self.options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, 
default)
 
+    def scan_tag_name(self, default=None):
+        return self.options.get(CoreOptions.SCAN_TAG_NAME, default)
+
     def source_split_target_size(self, default=None):
         return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, 
default).get_bytes()
 
diff --git a/paimon-python/pypaimon/common/options/options.py 
b/paimon-python/pypaimon/common/options/options.py
index 22f172cbdb..0718d96a41 100644
--- a/paimon-python/pypaimon/common/options/options.py
+++ b/paimon-python/pypaimon/common/options/options.py
@@ -54,5 +54,9 @@ class Options:
     def contains(self, key: ConfigOption):
         return key.key() in self.data
 
+    def contains_key(self, key: str) -> bool:
+        """Check if the given key string exists in the options."""
+        return key in self.data
+
     def copy(self) -> 'Options':
         return Options(dict(self.data))
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index cca81998b2..17da0692e8 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -102,6 +102,22 @@ class TableScan:
                 return manifests
 
             return FileScanner(self.table, incremental_manifest, 
self.predicate, self.limit)
+        elif options.contains(CoreOptions.SCAN_TAG_NAME):  # Handle tag-based 
reading
+            tag_name = options.get(CoreOptions.SCAN_TAG_NAME)
+
+            def tag_manifest_scanner():
+                tag_manager = self.table.tag_manager()
+                tag = tag_manager.get_or_throw(tag_name)
+                snapshot = tag.trim_to_snapshot()
+                return manifest_list_manager.read_all(snapshot)
+
+            return FileScanner(
+                self.table,
+                tag_manifest_scanner,
+                self.predicate,
+                self.limit,
+                vector_search=self.vector_search
+            )
 
         def all_manifests():
             snapshot = snapshot_manager.get_latest_snapshot()
diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py 
b/paimon-python/pypaimon/snapshot/time_travel_util.py
new file mode 100644
index 0000000000..df3eeb11cb
--- /dev/null
+++ b/paimon-python/pypaimon/snapshot/time_travel_util.py
@@ -0,0 +1,75 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""The util class of resolve snapshot from scan params for time travel."""
+
+from typing import Optional
+
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.options import Options
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.tag.tag_manager import TagManager
+
+SCAN_KEYS = [
+    CoreOptions.SCAN_TAG_NAME.key()
+]
+
+
+class TimeTravelUtil:
+    """The util class of resolve snapshot from scan params for time travel."""
+
+    @staticmethod
+    def try_travel_to_snapshot(
+            options: Options,
+            tag_manager: TagManager
+    ) -> Optional[Snapshot]:
+        """
+        Try to travel to a snapshot based on the options.
+        
+        Supports the following time travel options:
+        - scan.tag-name: Travel to a specific tag
+        
+        Args:
+            options: The options containing time travel parameters
+            tag_manager: The tag manager
+            
+        Returns:
+            The Snapshot to travel to, or None if no time travel option is set.
+            
+        Raises:
+            ValueError: If more than one time travel option is set
+        """
+
+        scan_handle_keys = [key for key in SCAN_KEYS if 
options.contains_key(key)]
+
+        if not scan_handle_keys:
+            return None
+
+        if len(scan_handle_keys) > 1:
+            raise ValueError(f"Only one of the following parameters may be 
set: {SCAN_KEYS}")
+
+        key = scan_handle_keys[0]
+        core_options = CoreOptions(options)
+
+        if key == CoreOptions.SCAN_TAG_NAME.key():
+            tag_name = core_options.scan_tag_name()
+            tag = tag_manager.get(tag_name)
+            if tag is None:
+                raise ValueError(f"Tag '{tag_name}' doesn't exist.")
+            return tag.trim_to_snapshot()
+        else:
+            raise ValueError(f"Unsupported time travel mode: {key}")
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 21f8c78500..e23d9cec75 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -89,6 +89,56 @@ class FileStoreTable(Table):
         from pypaimon.snapshot.snapshot_manager import SnapshotManager
         return SnapshotManager(self)
 
+    def tag_manager(self):
+        """Get the tag manager for this table."""
+        from pypaimon import TagManager
+        return TagManager(self.file_io, self.table_path, self.current_branch())
+
+    def create_tag(
+            self,
+            tag_name: str,
+            snapshot_id: Optional[int] = None,
+            ignore_if_exists: bool = False
+    ) -> None:
+        """
+        Create a tag for a snapshot.
+        
+        Args:
+            tag_name: Name for the tag
+            snapshot_id: ID of the snapshot to tag. If None, uses the latest 
snapshot.
+            ignore_if_exists: If True, don't raise error if tag already exists
+            
+        Raises:
+            ValueError: If no snapshot exists or tag already exists (when 
ignore_if_exists=False)
+        """
+
+        snapshot_mgr = self.snapshot_manager()
+
+        if snapshot_id is not None:
+            snapshot = snapshot_mgr.get_snapshot_by_id(snapshot_id)
+            if snapshot is None:
+                raise ValueError(f"Snapshot with id {snapshot_id} doesn't 
exist.")
+        else:
+            snapshot = snapshot_mgr.get_latest_snapshot()
+            if snapshot is None:
+                raise ValueError("No snapshot exists in this table.")
+
+        tag_mgr = self.tag_manager()
+        tag_mgr.create_tag(snapshot, tag_name, ignore_if_exists)
+
+    def delete_tag(self, tag_name: str) -> bool:
+        """
+        Delete a tag.
+        
+        Args:
+            tag_name: Name of the tag to delete
+            
+        Returns:
+            True if tag was deleted, False if tag didn't exist
+        """
+        tag_mgr = self.tag_manager()
+        return tag_mgr.delete_tag(tag_name)
+
     def path_factory(self) -> 'FileStorePathFactory':
         from pypaimon.utils.file_store_path_factory import FileStorePathFactory
 
@@ -186,10 +236,36 @@ class FileStoreTable(Table):
                 new_options.pop(k)
             else:
                 new_options[k] = v
+
         new_table_schema = self.table_schema.copy(new_options=new_options)
+
+        time_travel_schema = self._try_time_travel(Options(new_options))
+        if time_travel_schema is not None:
+            new_table_schema = time_travel_schema
+
         return FileStoreTable(self.file_io, self.identifier, self.table_path, 
new_table_schema,
                               self.catalog_environment)
 
+    def _try_time_travel(self, options: Options) -> Optional[TableSchema]:
+        """
+        Try to resolve time travel options and return the corresponding schema.
+        
+        Supports the following time travel options:
+        - scan.tag-name: Travel to a specific tag
+        
+        Returns:
+            The TableSchema at the time travel point, or None if no time 
travel option is set.
+        """
+
+        try:
+            from pypaimon.snapshot.time_travel_util import TimeTravelUtil
+            snapshot = TimeTravelUtil.try_travel_to_snapshot(options, 
self.tag_manager())
+            if snapshot is None:
+                return None
+            return 
self.schema_manager.get_schema(snapshot.schema_id).copy(new_options=options.to_map())
+        except Exception:
+            return None
+
     def _create_external_paths(self) -> List[str]:
         from urllib.parse import urlparse
         from pypaimon.common.options.core_options import ExternalPathStrategy
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/table/source/__init__.py
similarity index 65%
copy from paimon-python/pypaimon/__init__.py
copy to paimon-python/pypaimon/table/source/__init__.py
index 024f8b732b..a67d5ea255 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/table/source/__init__.py
@@ -14,20 +14,3 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
-
-import sys
-if sys.version_info[:2] == (3, 6):
-    try:
-        from pypaimon.manifest import fastavro_py36_compat  # noqa: F401
-    except ImportError:
-        pass
-
-from pypaimon.catalog.catalog_factory import CatalogFactory
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
-from pypaimon.schema.schema import Schema
-
-__all__ = [
-    "PaimonVirtualFileSystem",
-    "CatalogFactory",
-    "Schema",
-]
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/tag/__init__.py
similarity index 66%
copy from paimon-python/pypaimon/__init__.py
copy to paimon-python/pypaimon/tag/__init__.py
index 024f8b732b..609e9a995e 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/tag/__init__.py
@@ -15,19 +15,7 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-import sys
-if sys.version_info[:2] == (3, 6):
-    try:
-        from pypaimon.manifest import fastavro_py36_compat  # noqa: F401
-    except ImportError:
-        pass
+from pypaimon.tag.tag import Tag
+from pypaimon.tag.tag_manager import TagManager
 
-from pypaimon.catalog.catalog_factory import CatalogFactory
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
-from pypaimon.schema.schema import Schema
-
-__all__ = [
-    "PaimonVirtualFileSystem",
-    "CatalogFactory",
-    "Schema",
-]
+__all__ = ["Tag", "TagManager"]
diff --git a/paimon-python/pypaimon/tag/tag.py 
b/paimon-python/pypaimon/tag/tag.py
new file mode 100644
index 0000000000..c8089861b8
--- /dev/null
+++ b/paimon-python/pypaimon/tag/tag.py
@@ -0,0 +1,44 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+
+from pypaimon.snapshot.snapshot import Snapshot
+
+
+class Tag(Snapshot):
+
+    def trim_to_snapshot(self) -> Snapshot:
+        """Convert this Tag to a Snapshot"""
+        return Snapshot(
+            version=self.version,
+            id=self.id,
+            schema_id=self.schema_id,
+            base_manifest_list=self.base_manifest_list,
+            delta_manifest_list=self.delta_manifest_list,
+            total_record_count=self.total_record_count,
+            delta_record_count=self.delta_record_count,
+            commit_user=self.commit_user,
+            commit_identifier=self.commit_identifier,
+            commit_kind=self.commit_kind,
+            time_millis=self.time_millis,
+            changelog_manifest_list=self.changelog_manifest_list,
+            index_manifest=self.index_manifest,
+            changelog_record_count=self.changelog_record_count,
+            watermark=self.watermark,
+            statistics=self.statistics,
+            next_row_id=self.next_row_id
+        )
diff --git a/paimon-python/pypaimon/tag/tag_manager.py 
b/paimon-python/pypaimon/tag/tag_manager.py
new file mode 100644
index 0000000000..7f63f4ac06
--- /dev/null
+++ b/paimon-python/pypaimon/tag/tag_manager.py
@@ -0,0 +1,182 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import logging
+from typing import Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.json_util import JSON
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.tag.tag import Tag
+
+logger = logging.getLogger(__name__)
+
+TAG_PREFIX = "tag-"
+
+
+class TagManager:
+    """
+    Manager for Tag files.
+    
+    This class manages tag files stored in the table's tag directory.
+    Tags are essentially named snapshots that can be used for time travel 
queries.
+    """
+
+    def __init__(self, file_io: FileIO, table_path: str, branch: str = "main"):
+        """
+        Initialize TagManager.
+        
+        Args:
+            file_io: FileIO instance for file operations
+            table_path: Path to the table root directory
+            branch: Branch name, defaults to "main"
+        """
+        self.file_io = file_io
+        self.table_path = table_path.rstrip('/')
+        self.branch = self._normalize_branch(branch)
+
+    @staticmethod
+    def _normalize_branch(branch: str) -> str:
+        """Normalize branch name."""
+        if not branch or branch == "main":
+            return "main"
+        return branch
+
+    def _branch_path(self) -> str:
+        """Get the branch path."""
+        if self.branch == "main":
+            return self.table_path
+        return f"{self.table_path}/branch/branch-{self.branch}"
+
+    def tag_directory(self) -> str:
+        """Return the root directory of tags."""
+        return f"{self._branch_path()}/tag"
+
+    def tag_path(self, tag_name: str) -> str:
+        """Return the path of a tag file."""
+        return f"{self.tag_directory()}/{TAG_PREFIX}{tag_name}"
+
+    def tag_exists(self, tag_name: str) -> bool:
+        """Check if a tag exists."""
+        path = self.tag_path(tag_name)
+        return self.file_io.exists(path)
+
+    def get(self, tag_name: str) -> Optional[Tag]:
+        """
+        Return the tag or None if the tag file not found.
+        
+        Args:
+            tag_name: Name of the tag
+            
+        Returns:
+            Tag instance or None if not found
+        """
+        if not tag_name or tag_name.isspace():
+            raise ValueError("Tag name shouldn't be blank.")
+
+        path = self.tag_path(tag_name)
+        if not self.file_io.exists(path):
+            return None
+
+        content = self.file_io.read_file_utf8(path)
+        return JSON.from_json(content, Tag)
+
+    def get_or_throw(self, tag_name: str) -> Tag:
+        """
+        Return the tag or throw exception indicating the tag not found.
+        
+        Args:
+            tag_name: Name of the tag
+            
+        Returns:
+            Tag instance
+            
+        Raises:
+            ValueError: If tag doesn't exist
+        """
+        tag = self.get(tag_name)
+        if tag is None:
+            raise ValueError(f"Tag '{tag_name}' doesn't exist.")
+        return tag
+
+    def create_tag(
+            self,
+            snapshot: Snapshot,
+            tag_name: str,
+            ignore_if_exists: bool = False
+    ) -> None:
+        """
+        Create a tag from given snapshot and save it in the storage.
+        
+        Args:
+            snapshot: The snapshot to tag
+            tag_name: Name for the tag
+            ignore_if_exists: If True, don't raise error if tag already exists
+            
+        Raises:
+            ValueError: If tag_name is blank or tag already exists (when 
ignore_if_exists=False)
+        """
+        if not tag_name or tag_name.isspace():
+            raise ValueError("Tag name shouldn't be blank.")
+
+        if self.tag_exists(tag_name):
+            if ignore_if_exists:
+                return
+            raise ValueError(f"Tag '{tag_name}' already exists.")
+
+        self._create_or_replace_tag(snapshot, tag_name)
+
+    def _create_or_replace_tag(
+            self,
+            snapshot: Snapshot,
+            tag_name: str
+    ) -> None:
+        """
+        Internal method to create or replace a tag.
+        """
+        tag_path = self.tag_path(tag_name)
+
+        # Ensure tag directory exists
+        tag_dir = self.tag_directory()
+        if not self.file_io.exists(tag_dir):
+            self.file_io.mkdirs(tag_dir)
+
+        content = JSON.to_json(snapshot)
+
+        self.file_io.overwrite_file_utf8(tag_path, content)
+
+    def delete_tag(self, tag_name: str) -> bool:
+        """
+        Delete a tag.
+        
+        Args:
+            tag_name: Name of the tag to delete
+            
+        Returns:
+            True if tag was deleted, False if tag didn't exist
+        """
+        if not tag_name or tag_name.isspace():
+            raise ValueError("Tag name shouldn't be blank.")
+
+        tag = self.get(tag_name)
+        if tag is None:
+            logger.warning(f"Tag '{tag_name}' doesn't exist.")
+            return False
+
+        path = self.tag_path(tag_name)
+        self.file_io.delete_quietly(path)
+        return True
diff --git a/paimon-python/pypaimon/tests/table/simple_table_test.py 
b/paimon-python/pypaimon/tests/table/simple_table_test.py
new file mode 100644
index 0000000000..4a4d024f39
--- /dev/null
+++ b/paimon-python/pypaimon/tests/table/simple_table_test.py
@@ -0,0 +1,340 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.schema.schema_manager import SchemaManager
+
+
+class SimpleTableTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('pt', pa.int32()),
+            ('k', pa.int32()),
+            ('v', pa.int64())
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_tag_scan(self):
+        """
+        Test reading from a specific tag.
+
+        1. Write data in 3 commits
+        2. Create a tag at snapshot 2
+        3. Read from the tag and verify only data from snapshots 1 and 2 is 
returned
+        """
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_tag_scan', schema, False)
+        table = self.catalog.get_table('default.test_tag_scan')
+
+        write_builder = table.new_batch_write_builder()
+
+        # First commit: pt=1, k=10, v=100 and pt=1, k=20, v=200
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = pa.Table.from_pydict({
+            'pt': [1, 1],
+            'k': [10, 20],
+            'v': [100, 200]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Second commit: pt=2, k=30, v=101 and pt=2, k=40, v=201
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data2 = pa.Table.from_pydict({
+            'pt': [2, 2],
+            'k': [30, 40],
+            'v': [101, 201]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Third commit: pt=3, k=50, v=500 and pt=3, k=60, v=600
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data3 = pa.Table.from_pydict({
+            'pt': [3, 3],
+            'k': [50, 60],
+            'v': [500, 600]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data3)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Create tag at snapshot 2
+        table.create_tag("tag2", snapshot_id=2)
+
+        # Read from tag2 using scan.tag-name option
+        table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key(): "tag2"})
+        read_builder = table_with_tag.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify: should only contain data from snapshot 1 and 2
+        # (pt=1, k=10, v=100), (pt=1, k=20, v=200), (pt=2, k=30, v=101), 
(pt=2, k=40, v=201)
+        result_sorted = result.sort_by([('pt', 'ascending'), ('k', 
'ascending')])
+
+        expected = pa.Table.from_pydict({
+            'pt': [1, 1, 2, 2],
+            'k': [10, 20, 30, 40],
+            'v': [100, 200, 101, 201]
+        }, schema=self.pa_schema)
+
+        self.assertEqual(result_sorted.num_rows, 4)
+        self.assertEqual(result_sorted.column('pt').to_pylist(), 
expected.column('pt').to_pylist())
+        self.assertEqual(result_sorted.column('k').to_pylist(), 
expected.column('k').to_pylist())
+        self.assertEqual(result_sorted.column('v').to_pylist(), 
expected.column('v').to_pylist())
+
+    def test_non_existing_tag(self):
+        """
+        Test that reading from a non-existing tag raises an error.
+        """
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_non_existing_tag', schema, 
False)
+        table = self.catalog.get_table('default.test_non_existing_tag')
+
+        # Try to read from a non-existing tag
+        table_with_tag = table.copy({CoreOptions.SCAN_TAG_NAME.key(): 
"non-existing"})
+        read_builder = table_with_tag.new_read_builder()
+        table_scan = read_builder.new_scan()
+
+        with self.assertRaises(ValueError) as context:
+            table_scan.plan()
+
+        self.assertIn("non-existing", str(context.exception))
+        self.assertIn("doesn't exist", str(context.exception))
+
+    def test_tag_create_and_delete(self):
+        """Test creating and deleting tags."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_tag_create_delete', schema, 
False)
+        table = self.catalog.get_table('default.test_tag_create_delete')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write some data
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'pt': [1, 1],
+            'k': [10, 20],
+            'v': [100, 200]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Create a tag
+        table.create_tag("test_tag")
+
+        # Verify tag exists
+        tag_manager = table.tag_manager()
+        self.assertTrue(tag_manager.tag_exists("test_tag"))
+
+        # Get the tag
+        tag = tag_manager.get("test_tag")
+        self.assertIsNotNone(tag)
+        self.assertEqual(tag.id, 1)
+
+        # Delete the tag
+        result = table.delete_tag("test_tag")
+        self.assertTrue(result)
+
+        # Verify tag no longer exists
+        self.assertFalse(tag_manager.tag_exists("test_tag"))
+
+    def test_tag_ignore_if_exists(self):
+        """Test creating a tag with ignore_if_exists=True."""
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['pt', 'k'],
+            partition_keys=['pt'],
+            options={'bucket': '3'}
+        )
+        self.catalog.create_table('default.test_tag_ignore_exists', schema, 
False)
+        table = self.catalog.get_table('default.test_tag_ignore_exists')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Write some data
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = pa.Table.from_pydict({
+            'pt': [1],
+            'k': [10],
+            'v': [100]
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Create a tag
+        table.create_tag("duplicate_tag")
+
+        # Try to create the same tag again without ignore_if_exists - should 
raise error
+        with self.assertRaises(ValueError) as context:
+            table.create_tag("duplicate_tag")
+        self.assertIn("already exists", str(context.exception))
+
+        # Create the same tag with ignore_if_exists=True - should not raise 
error
+        table.create_tag("duplicate_tag", ignore_if_exists=True)
+
+    def test_schema_evolution_tag_read(self):
+        # schema 0
+        pa_schema = pa.schema([
+            ('user_id', pa.int64()),
+            ('item_id', pa.int64()),
+            ('dt', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        self.catalog.create_table('default.test_0', schema, False)
+        table1 = self.catalog.get_table('default.test_0')
+        write_builder = table1.new_batch_write_builder()
+        # write 1
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        # write 2
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [11, 22, 33, 44],
+            'item_id': [1001, 1002, 1003, 1004],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        # schema 1  add behavior column
+        pa_schema = pa.schema([
+            ('user_id', pa.int64()),
+            ('item_id', pa.int64()),
+            ('dt', pa.string()),
+            ('behavior', pa.string())
+        ])
+        schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+        self.catalog.create_table('default.test_1', schema2, False)
+        table2 = self.catalog.get_table('default.test_1')
+        write_builder = table2.new_batch_write_builder()
+        # write 1
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+            'behavior': ['e', 'f', 'g', 'h'],
+        }
+        pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        # write 2
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data2 = {
+            'user_id': [55, 66, 77, 88],
+            'item_id': [1005, 1006, 1007, 1008],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+            'behavior': ['e', 'f', 'g', 'h'],
+        }
+        pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Create tag at snapshot 2
+        table1.create_tag("tag2", snapshot_id=2)
+        table2.create_tag("tag2", snapshot_id=2)
+        # When table2 read tag2, it will access table1's schema
+        table2.schema_manager = SchemaManager(table2.file_io, 
table1.table_path)
+        table_with_tag = table2.copy({CoreOptions.SCAN_TAG_NAME.key(): "tag2"})
+        read_builder = table_with_tag.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        self.assertEqual(result.num_rows, 8)
+
+        expected_schema = pa.schema([
+            ('user_id', pa.int64()),
+            ('item_id', pa.int64()),
+            ('dt', pa.string())
+        ])
+        expected = pa.Table.from_pydict({
+            'user_id': [5, 6, 7, 8, 55, 66, 77, 88],
+            'item_id': [1005, 1006, 1007, 1008, 1005, 1006, 1007, 1008],
+            'dt': ["p2", "p1", "p2", "p2", "p2", "p1", "p2", "p2"]
+        }, schema=expected_schema)
+
+        self.assertEqual(expected, result.sort_by('user_id'))


Reply via email to