This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3271b21440 [python] Support snapshot_id and tag_name in Ray 
read_paimon API (#7802)
3271b21440 is described below

commit 3271b214403b70152eb22a0ec96c7d42198de5db
Author: chaoyang <[email protected]>
AuthorDate: Mon May 11 15:39:55 2026 +0800

    [python] Support snapshot_id and tag_name in Ray read_paimon API (#7802)
---
 docs/content/pypaimon/ray-data.md                  | 24 ++++++
 .../pypaimon/common/options/core_options.py        | 13 +++
 paimon-python/pypaimon/ray/ray_paimon.py           | 13 +++
 .../pypaimon/read/datasource/split_provider.py     | 18 +++-
 paimon-python/pypaimon/read/table_scan.py          | 17 ++++
 .../pypaimon/snapshot/time_travel_util.py          | 27 ++++--
 paimon-python/pypaimon/table/file_store_table.py   |  5 +-
 .../pypaimon/table/source/full_text_scan.py        |  3 +-
 .../pypaimon/table/source/vector_search_scan.py    |  3 +-
 .../pypaimon/tests/ray_integration_test.py         | 59 +++++++++++++
 .../pypaimon/tests/split_provider_test.py          | 72 ++++++++++++++++
 .../pypaimon/tests/time_travel_util_test.py        | 98 ++++++++++++++++++++++
 12 files changed, 342 insertions(+), 10 deletions(-)

diff --git a/docs/content/pypaimon/ray-data.md 
b/docs/content/pypaimon/ray-data.md
index 4e049248de..b0ea958849 100644
--- a/docs/content/pypaimon/ray-data.md
+++ b/docs/content/pypaimon/ray-data.md
@@ -90,6 +90,26 @@ ray_dataset = read_paimon(
 )
 ```
 
+**Time travel:**
+
+```python
+# Read a specific snapshot.
+ray_dataset = read_paimon(
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+    snapshot_id=42,
+)
+
+# Read a tagged snapshot.
+ray_dataset = read_paimon(
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+    tag_name="release-2026-04",
+)
+```
+
+`snapshot_id` and `tag_name` are mutually exclusive.
+
 **Parameters:**
 - `table_identifier`: full table name, e.g. `"db_name.table_name"`.
 - `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
@@ -97,6 +117,10 @@ ray_dataset = read_paimon(
 - `filter`: optional `Predicate` to push down into the scan.
 - `projection`: optional list of column names to read.
 - `limit`: optional row limit applied at scan planning time.
+- `snapshot_id`: optional snapshot id to time-travel to. Mutually
+  exclusive with `tag_name`.
+- `tag_name`: optional tag name to time-travel to. Mutually
+  exclusive with `snapshot_id`.
 - `override_num_blocks`: optional override for the number of output blocks.
   Must be `>= 1`.
 - `ray_remote_args`: optional kwargs passed to `ray.remote()` in read tasks
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 3fe2e79455..1e3f789376 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -261,6 +261,16 @@ class CoreOptions:
         .with_description("Optional tag name used in case of 'from-snapshot' 
scan mode.")
     )
 
+    SCAN_SNAPSHOT_ID: ConfigOption[int] = (
+        ConfigOptions.key("scan.snapshot-id")
+        .long_type()
+        .no_default_value()
+        .with_description(
+            "Optional snapshot id used in case of 'from-snapshot' or "
+            "'from-snapshot-full' scan mode."
+        )
+    )
+
     SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
         ConfigOptions.key("source.split.target-size")
         .memory_type()
@@ -568,6 +578,9 @@ class CoreOptions:
     def scan_tag_name(self, default=None):
         return self.options.get(CoreOptions.SCAN_TAG_NAME, default)
 
+    def scan_snapshot_id(self, default=None):
+        return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default)
+
     def source_split_target_size(self, default=None):
         return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, 
default).get_bytes()
 
diff --git a/paimon-python/pypaimon/ray/ray_paimon.py 
b/paimon-python/pypaimon/ray/ray_paimon.py
index 5ea2d21096..c2fcd30c41 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -40,6 +40,8 @@ def read_paimon(
     filter: Optional[Predicate] = None,
     projection: Optional[List[str]] = None,
     limit: Optional[int] = None,
+    snapshot_id: Optional[int] = None,
+    tag_name: Optional[str] = None,
     ray_remote_args: Optional[Dict[str, Any]] = None,
     concurrency: Optional[int] = None,
     override_num_blocks: Optional[int] = None,
@@ -54,6 +56,10 @@ def read_paimon(
         filter: Optional predicate to push down into the scan.
         projection: Optional list of column names to read.
         limit: Optional row limit for the scan.
+        snapshot_id: Optional snapshot id to time-travel to. Mutually
+            exclusive with ``tag_name``.
+        tag_name: Optional tag name to time-travel to. Mutually
+            exclusive with ``snapshot_id``.
         ray_remote_args: Optional kwargs passed to ``ray.remote`` in read 
tasks.
         concurrency: Optional max number of Ray read tasks to run concurrently.
         override_num_blocks: Optional override for the number of output blocks.
@@ -65,6 +71,11 @@ def read_paimon(
     from pypaimon.read.datasource.ray_datasource import RayDatasource
     from pypaimon.read.datasource.split_provider import CatalogSplitProvider
 
+    if snapshot_id is not None and tag_name is not None:
+        raise ValueError(
+            "snapshot_id and tag_name cannot be set at the same time"
+        )
+
     if override_num_blocks is not None and override_num_blocks < 1:
         raise ValueError(
             "override_num_blocks must be at least 1, got 
{}".format(override_num_blocks)
@@ -77,6 +88,8 @@ def read_paimon(
             predicate=filter,
             projection=projection,
             limit=limit,
+            snapshot_id=snapshot_id,
+            tag_name=tag_name,
         )
     )
     return ray.data.read_datasource(
diff --git a/paimon-python/pypaimon/read/datasource/split_provider.py 
b/paimon-python/pypaimon/read/datasource/split_provider.py
index 491e8127d2..22297cc89a 100644
--- a/paimon-python/pypaimon/read/datasource/split_provider.py
+++ b/paimon-python/pypaimon/read/datasource/split_provider.py
@@ -74,16 +74,24 @@ class CatalogSplitProvider(SplitProvider):
         predicate=None,
         projection: Optional[List[str]] = None,
         limit: Optional[int] = None,
+        snapshot_id: Optional[int] = None,
+        tag_name: Optional[str] = None,
     ):
         if not table_identifier:
             raise ValueError("table_identifier is required")
         if catalog_options is None:
             raise ValueError("catalog_options is required")
+        if snapshot_id is not None and tag_name is not None:
+            raise ValueError(
+                "snapshot_id and tag_name cannot be set at the same time"
+            )
         self._table_identifier = table_identifier
         self._catalog_options = catalog_options
         self._predicate = predicate
         self._projection = projection
         self._limit = limit
+        self._snapshot_id = snapshot_id
+        self._tag_name = tag_name
         self._table_cached = None
         self._splits_cached = None
         self._read_type_cached = None
@@ -92,7 +100,15 @@ class CatalogSplitProvider(SplitProvider):
         if self._table_cached is None:
             from pypaimon.catalog.catalog_factory import CatalogFactory
             catalog = CatalogFactory.create(self._catalog_options)
-            self._table_cached = catalog.get_table(self._table_identifier)
+            table = catalog.get_table(self._table_identifier)
+            travel_options = {}
+            if self._snapshot_id is not None:
+                travel_options["scan.snapshot-id"] = str(self._snapshot_id)
+            if self._tag_name is not None:
+                travel_options["scan.tag-name"] = self._tag_name
+            if travel_options:
+                table = table.copy(travel_options)
+            self._table_cached = table
         return self._table_cached
 
     def _ensure_planned(self):
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 562bea26f5..9b661b295a 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -113,6 +113,23 @@ class TableScan:
                 self.predicate,
                 self.limit
             )
+        elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID):  # Handle 
snapshot-id-based reading
+            snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID))
+
+            def snapshot_id_manifest_scanner():
+                snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+                if snapshot is None:
+                    raise ValueError(
+                        "Snapshot id %d does not exist" % snapshot_id
+                    )
+                return manifest_list_manager.read_all(snapshot), snapshot
+
+            return FileScanner(
+                self.table,
+                snapshot_id_manifest_scanner,
+                self.predicate,
+                self.limit
+            )
 
         def all_manifests():
             snapshot = snapshot_manager.get_latest_snapshot()
diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py 
b/paimon-python/pypaimon/snapshot/time_travel_util.py
index df3eeb11cb..4f9e4aa065 100644
--- a/paimon-python/pypaimon/snapshot/time_travel_util.py
+++ b/paimon-python/pypaimon/snapshot/time_travel_util.py
@@ -25,7 +25,8 @@ from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.tag.tag_manager import TagManager
 
 SCAN_KEYS = [
-    CoreOptions.SCAN_TAG_NAME.key()
+    CoreOptions.SCAN_TAG_NAME.key(),
+    CoreOptions.SCAN_SNAPSHOT_ID.key(),
 ]
 
 
@@ -35,21 +36,25 @@ class TimeTravelUtil:
     @staticmethod
     def try_travel_to_snapshot(
             options: Options,
-            tag_manager: TagManager
+            tag_manager: TagManager,
+            snapshot_manager=None,
     ) -> Optional[Snapshot]:
         """
         Try to travel to a snapshot based on the options.
-        
+
         Supports the following time travel options:
         - scan.tag-name: Travel to a specific tag
-        
+        - scan.snapshot-id: Travel to a specific snapshot id
+
         Args:
             options: The options containing time travel parameters
             tag_manager: The tag manager
-            
+            snapshot_manager: The snapshot manager, required when
+                ``scan.snapshot-id`` is set
+
         Returns:
             The Snapshot to travel to, or None if no time travel option is set.
-            
+
         Raises:
             ValueError: If more than one time travel option is set
         """
@@ -71,5 +76,15 @@ class TimeTravelUtil:
             if tag is None:
                 raise ValueError(f"Tag '{tag_name}' doesn't exist.")
             return tag.trim_to_snapshot()
+        elif key == CoreOptions.SCAN_SNAPSHOT_ID.key():
+            if snapshot_manager is None:
+                raise ValueError(
+                    "snapshot_manager is required to resolve scan.snapshot-id"
+                )
+            snapshot_id = int(core_options.scan_snapshot_id())
+            snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+            if snapshot is None:
+                raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
+            return snapshot
         else:
             raise ValueError(f"Unsupported time travel mode: {key}")
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 35addad251..d7c80b016d 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -435,6 +435,7 @@ class FileStoreTable(Table):
 
         Supports the following time travel options:
         - scan.tag-name: Travel to a specific tag
+        - scan.snapshot-id: Travel to a specific snapshot id
 
         Returns:
             The TableSchema at the time travel point, or None if no time 
travel option is set.
@@ -442,7 +443,9 @@ class FileStoreTable(Table):
 
         try:
             from pypaimon.snapshot.time_travel_util import TimeTravelUtil
-            snapshot = TimeTravelUtil.try_travel_to_snapshot(options, 
self.tag_manager())
+            snapshot = TimeTravelUtil.try_travel_to_snapshot(
+                options, self.tag_manager(), self.snapshot_manager()
+            )
             if snapshot is None:
                 return None
             return 
self.schema_manager.get_schema(snapshot.schema_id).copy(new_options=options.to_map())
diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py 
b/paimon-python/pypaimon/table/source/full_text_scan.py
index 50b2381d27..e3cf312bf4 100644
--- a/paimon-python/pypaimon/table/source/full_text_scan.py
+++ b/paimon-python/pypaimon/table/source/full_text_scan.py
@@ -61,7 +61,8 @@ class FullTextScanImpl(FullTextScan):
         from pypaimon.common.options.options import Options
         travel_snapshot = TimeTravelUtil.try_travel_to_snapshot(
             Options(self._table.table_schema.options),
-            self._table.tag_manager()
+            self._table.tag_manager(),
+            self._table.snapshot_manager(),
         )
         if travel_snapshot is not None:
             snapshot = travel_snapshot
diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py 
b/paimon-python/pypaimon/table/source/vector_search_scan.py
index 035e51a846..0285e3374e 100644
--- a/paimon-python/pypaimon/table/source/vector_search_scan.py
+++ b/paimon-python/pypaimon/table/source/vector_search_scan.py
@@ -77,7 +77,8 @@ class VectorSearchScanImpl(VectorSearchScan):
 
         snapshot = TimeTravelUtil.try_travel_to_snapshot(
             Options(self._table.table_schema.options),
-            self._table.tag_manager()
+            self._table.tag_manager(),
+            self._table.snapshot_manager(),
         )
         if snapshot is None:
             snapshot = self._table.snapshot_manager().get_latest_snapshot()
diff --git a/paimon-python/pypaimon/tests/ray_integration_test.py 
b/paimon-python/pypaimon/tests/ray_integration_test.py
index 1b8e2df505..4d9613c499 100644
--- a/paimon-python/pypaimon/tests/ray_integration_test.py
+++ b/paimon-python/pypaimon/tests/ray_integration_test.py
@@ -196,6 +196,65 @@ class RayIntegrationTest(unittest.TestCase):
         ds = read_paimon(identifier, self.catalog_options)
         self.assertEqual(ds.count(), 0)
 
+    def test_read_paimon_with_snapshot_id(self):
+        """read_paimon(snapshot_id=N) time-travels to that snapshot."""
+        from pypaimon.ray import read_paimon
+
+        pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+        identifier = 'default.test_read_snap_id'
+        catalog = CatalogFactory.create(self.catalog_options)
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table(identifier, schema, False)
+        table = catalog.get_table(identifier)
+        for batch in [{'id': [1], 'name': ['a']}, {'id': [2], 'name': ['b']}]:
+            wb = table.new_batch_write_builder()
+            writer = wb.new_write()
+            writer.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
+            wb.new_commit().commit(writer.prepare_commit())
+            writer.close()
+
+        ds_latest = read_paimon(identifier, self.catalog_options)
+        self.assertEqual(ds_latest.count(), 2)
+
+        ds_snap1 = read_paimon(identifier, self.catalog_options, snapshot_id=1)
+        self.assertEqual(ds_snap1.count(), 1)
+        self.assertEqual(ds_snap1.to_pandas()['id'].tolist(), [1])
+
+    def test_read_paimon_with_tag_name(self):
+        """read_paimon(tag_name=...) time-travels to a tagged snapshot."""
+        from pypaimon.ray import read_paimon
+
+        pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+        identifier = 'default.test_read_tag_name'
+        catalog = CatalogFactory.create(self.catalog_options)
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table(identifier, schema, False)
+        table = catalog.get_table(identifier)
+        wb = table.new_batch_write_builder()
+        writer = wb.new_write()
+        writer.write_arrow(pa.Table.from_pydict({'id': [1], 'name': ['a']}, 
schema=pa_schema))
+        wb.new_commit().commit(writer.prepare_commit())
+        writer.close()
+        table.create_tag('v1')
+        wb = table.new_batch_write_builder()
+        writer = wb.new_write()
+        writer.write_arrow(pa.Table.from_pydict({'id': [2], 'name': ['b']}, 
schema=pa_schema))
+        wb.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+        ds_tag = read_paimon(identifier, self.catalog_options, tag_name='v1')
+        self.assertEqual(ds_tag.count(), 1)
+        self.assertEqual(ds_tag.to_pandas()['id'].tolist(), [1])
+
+    def test_read_paimon_rejects_snapshot_id_and_tag_name_together(self):
+        from pypaimon.ray import read_paimon
+
+        with self.assertRaises(ValueError):
+            read_paimon(
+                'default.dummy', self.catalog_options,
+                snapshot_id=1, tag_name='v1',
+            )
+
     def test_write_paimon_basic(self):
         """write_paimon() writes data that read_paimon() can round-trip."""
         from pypaimon.ray import read_paimon, write_paimon
diff --git a/paimon-python/pypaimon/tests/split_provider_test.py 
b/paimon-python/pypaimon/tests/split_provider_test.py
index 31152f28a6..05bedd2f41 100644
--- a/paimon-python/pypaimon/tests/split_provider_test.py
+++ b/paimon-python/pypaimon/tests/split_provider_test.py
@@ -156,6 +156,78 @@ class SplitProviderTest(unittest.TestCase):
                 table_identifier=self.identifier, catalog_options=None
             )
 
+    def test_catalog_provider_rejects_snapshot_id_and_tag_name_together(self):
+        with self.assertRaises(ValueError):
+            CatalogSplitProvider(
+                table_identifier=self.identifier,
+                catalog_options=self.catalog_options,
+                snapshot_id=1,
+                tag_name='v1',
+            )
+
+    def test_catalog_provider_time_travel_by_snapshot_id(self):
+        """Two commits → snapshot_id=1 sees only the first commit's rows."""
+        pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+        identifier = 'default.split_provider_snap_id'
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        catalog = CatalogFactory.create(self.catalog_options)
+        catalog.create_table(identifier, schema, False)
+        table = catalog.get_table(identifier)
+        for batch in [{'id': [10], 'name': ['a']}, {'id': [20], 'name': 
['b']}]:
+            wb = table.new_batch_write_builder()
+            writer = wb.new_write()
+            writer.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
+            wb.new_commit().commit(writer.prepare_commit())
+            writer.close()
+
+        provider = CatalogSplitProvider(
+            table_identifier=identifier,
+            catalog_options=self.catalog_options,
+            snapshot_id=1,
+        )
+        from pypaimon.read.table_read import TableRead
+        tr = TableRead(
+            provider.table(),
+            predicate=None,
+            read_type=provider.read_type(),
+        )
+        rows = tr.to_arrow(provider.splits()).to_pylist()
+        self.assertEqual([r['id'] for r in rows], [10])
+
+    def test_catalog_provider_time_travel_by_tag_name(self):
+        """Tag captures snapshot 1; reading via tag returns only that 
snapshot's rows."""
+        pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+        identifier = 'default.split_provider_tag_name'
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        catalog = CatalogFactory.create(self.catalog_options)
+        catalog.create_table(identifier, schema, False)
+        table = catalog.get_table(identifier)
+        wb = table.new_batch_write_builder()
+        writer = wb.new_write()
+        writer.write_arrow(pa.Table.from_pydict({'id': [11], 'name': ['x']}, 
schema=pa_schema))
+        wb.new_commit().commit(writer.prepare_commit())
+        writer.close()
+        table.create_tag('v1')
+        wb = table.new_batch_write_builder()
+        writer = wb.new_write()
+        writer.write_arrow(pa.Table.from_pydict({'id': [22], 'name': ['y']}, 
schema=pa_schema))
+        wb.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+        provider = CatalogSplitProvider(
+            table_identifier=identifier,
+            catalog_options=self.catalog_options,
+            tag_name='v1',
+        )
+        from pypaimon.read.table_read import TableRead
+        tr = TableRead(
+            provider.table(),
+            predicate=None,
+            read_type=provider.read_type(),
+        )
+        rows = tr.to_arrow(provider.splits()).to_pylist()
+        self.assertEqual([r['id'] for r in rows], [11])
+
     def test_pre_resolved_provider_returns_inputs(self):
         """PreResolvedSplitProvider just hands back what it was given."""
         catalog = CatalogFactory.create(self.catalog_options)
diff --git a/paimon-python/pypaimon/tests/time_travel_util_test.py 
b/paimon-python/pypaimon/tests/time_travel_util_test.py
new file mode 100644
index 0000000000..55bbd18ce5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/time_travel_util_test.py
@@ -0,0 +1,98 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pypaimon.common.options.options import Options
+from pypaimon.snapshot.time_travel_util import SCAN_KEYS, TimeTravelUtil
+
+
+class _StubSnapshot:
+    def __init__(self, snapshot_id, schema_id=0):
+        self.id = snapshot_id
+        self.schema_id = schema_id
+
+
+class _StubSnapshotManager:
+    def __init__(self, snapshots):
+        self._snapshots = {s.id: s for s in snapshots}
+
+    def get_snapshot_by_id(self, snapshot_id):
+        return self._snapshots.get(snapshot_id)
+
+
+class _StubTagManager:
+    def __init__(self, tags):
+        self._tags = tags
+
+    def get(self, name):
+        return self._tags.get(name)
+
+
+class TimeTravelUtilTest(unittest.TestCase):
+
+    def test_returns_none_when_no_scan_option_set(self):
+        result = TimeTravelUtil.try_travel_to_snapshot(
+            Options({}), _StubTagManager({}), _StubSnapshotManager([])
+        )
+        self.assertIsNone(result)
+
+    def test_resolves_snapshot_id(self):
+        snap2 = _StubSnapshot(2)
+        result = TimeTravelUtil.try_travel_to_snapshot(
+            Options({'scan.snapshot-id': '2'}),
+            _StubTagManager({}),
+            _StubSnapshotManager([_StubSnapshot(1), snap2]),
+        )
+        self.assertIs(result, snap2)
+
+    def test_unknown_snapshot_id_raises(self):
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.snapshot-id': '99'}),
+                _StubTagManager({}),
+                _StubSnapshotManager([_StubSnapshot(1)]),
+            )
+
+    def test_snapshot_id_without_snapshot_manager_raises(self):
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.snapshot-id': '1'}),
+                _StubTagManager({}),
+                None,
+            )
+
+    def test_rejects_setting_snapshot_id_and_tag_name_together(self):
+        # Defence-in-depth: even if a caller bypasses read_paimon's check,
+        # the util-level guard still complains.
+        with self.assertRaises(ValueError):
+            TimeTravelUtil.try_travel_to_snapshot(
+                Options({'scan.snapshot-id': '1', 'scan.tag-name': 'v1'}),
+                _StubTagManager({}),
+                _StubSnapshotManager([_StubSnapshot(1)]),
+            )
+
+    def test_scan_keys_contains_both_options(self):
+        # Sanity check: SCAN_KEYS must enumerate both time-travel modes,
+        # otherwise the mutual-exclusion guard above would not trigger.
+        self.assertIn('scan.snapshot-id', SCAN_KEYS)
+        self.assertIn('scan.tag-name', SCAN_KEYS)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to