This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3271b21440 [python] Support snapshot_id and tag_name in Ray
read_paimon API (#7802)
3271b21440 is described below
commit 3271b214403b70152eb22a0ec96c7d42198de5db
Author: chaoyang <[email protected]>
AuthorDate: Mon May 11 15:39:55 2026 +0800
[python] Support snapshot_id and tag_name in Ray read_paimon API (#7802)
---
docs/content/pypaimon/ray-data.md | 24 ++++++
.../pypaimon/common/options/core_options.py | 13 +++
paimon-python/pypaimon/ray/ray_paimon.py | 13 +++
.../pypaimon/read/datasource/split_provider.py | 18 +++-
paimon-python/pypaimon/read/table_scan.py | 17 ++++
.../pypaimon/snapshot/time_travel_util.py | 27 ++++--
paimon-python/pypaimon/table/file_store_table.py | 5 +-
.../pypaimon/table/source/full_text_scan.py | 3 +-
.../pypaimon/table/source/vector_search_scan.py | 3 +-
.../pypaimon/tests/ray_integration_test.py | 59 +++++++++++++
.../pypaimon/tests/split_provider_test.py | 72 ++++++++++++++++
.../pypaimon/tests/time_travel_util_test.py | 98 ++++++++++++++++++++++
12 files changed, 342 insertions(+), 10 deletions(-)
diff --git a/docs/content/pypaimon/ray-data.md
b/docs/content/pypaimon/ray-data.md
index 4e049248de..b0ea958849 100644
--- a/docs/content/pypaimon/ray-data.md
+++ b/docs/content/pypaimon/ray-data.md
@@ -90,6 +90,26 @@ ray_dataset = read_paimon(
)
```
+**Time travel:**
+
+```python
+# Read a specific snapshot.
+ray_dataset = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ snapshot_id=42,
+)
+
+# Read a tagged snapshot.
+ray_dataset = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ tag_name="release-2026-04",
+)
+```
+
+`snapshot_id` and `tag_name` are mutually exclusive.
+
**Parameters:**
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
@@ -97,6 +117,10 @@ ray_dataset = read_paimon(
- `filter`: optional `Predicate` to push down into the scan.
- `projection`: optional list of column names to read.
- `limit`: optional row limit applied at scan planning time.
+- `snapshot_id`: optional snapshot id to time-travel to. Mutually
+ exclusive with `tag_name`.
+- `tag_name`: optional tag name to time-travel to. Mutually
+ exclusive with `snapshot_id`.
- `override_num_blocks`: optional override for the number of output blocks.
Must be `>= 1`.
- `ray_remote_args`: optional kwargs passed to `ray.remote()` in read tasks
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 3fe2e79455..1e3f789376 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -261,6 +261,16 @@ class CoreOptions:
.with_description("Optional tag name used in case of 'from-snapshot'
scan mode.")
)
+ SCAN_SNAPSHOT_ID: ConfigOption[int] = (
+ ConfigOptions.key("scan.snapshot-id")
+ .long_type()
+ .no_default_value()
+ .with_description(
+ "Optional snapshot id used in case of 'from-snapshot' or "
+ "'from-snapshot-full' scan mode."
+ )
+ )
+
SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
@@ -568,6 +578,9 @@ class CoreOptions:
def scan_tag_name(self, default=None):
return self.options.get(CoreOptions.SCAN_TAG_NAME, default)
+ def scan_snapshot_id(self, default=None):
+ return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default)
+
def source_split_target_size(self, default=None):
return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE,
default).get_bytes()
diff --git a/paimon-python/pypaimon/ray/ray_paimon.py
b/paimon-python/pypaimon/ray/ray_paimon.py
index 5ea2d21096..c2fcd30c41 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -40,6 +40,8 @@ def read_paimon(
filter: Optional[Predicate] = None,
projection: Optional[List[str]] = None,
limit: Optional[int] = None,
+ snapshot_id: Optional[int] = None,
+ tag_name: Optional[str] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
@@ -54,6 +56,10 @@ def read_paimon(
filter: Optional predicate to push down into the scan.
projection: Optional list of column names to read.
limit: Optional row limit for the scan.
+ snapshot_id: Optional snapshot id to time-travel to. Mutually
+ exclusive with ``tag_name``.
+ tag_name: Optional tag name to time-travel to. Mutually
+ exclusive with ``snapshot_id``.
ray_remote_args: Optional kwargs passed to ``ray.remote`` in read
tasks.
concurrency: Optional max number of Ray read tasks to run concurrently.
override_num_blocks: Optional override for the number of output blocks.
@@ -65,6 +71,11 @@ def read_paimon(
from pypaimon.read.datasource.ray_datasource import RayDatasource
from pypaimon.read.datasource.split_provider import CatalogSplitProvider
+ if snapshot_id is not None and tag_name is not None:
+ raise ValueError(
+ "snapshot_id and tag_name cannot be set at the same time"
+ )
+
if override_num_blocks is not None and override_num_blocks < 1:
raise ValueError(
"override_num_blocks must be at least 1, got
{}".format(override_num_blocks)
@@ -77,6 +88,8 @@ def read_paimon(
predicate=filter,
projection=projection,
limit=limit,
+ snapshot_id=snapshot_id,
+ tag_name=tag_name,
)
)
return ray.data.read_datasource(
diff --git a/paimon-python/pypaimon/read/datasource/split_provider.py
b/paimon-python/pypaimon/read/datasource/split_provider.py
index 491e8127d2..22297cc89a 100644
--- a/paimon-python/pypaimon/read/datasource/split_provider.py
+++ b/paimon-python/pypaimon/read/datasource/split_provider.py
@@ -74,16 +74,24 @@ class CatalogSplitProvider(SplitProvider):
predicate=None,
projection: Optional[List[str]] = None,
limit: Optional[int] = None,
+ snapshot_id: Optional[int] = None,
+ tag_name: Optional[str] = None,
):
if not table_identifier:
raise ValueError("table_identifier is required")
if catalog_options is None:
raise ValueError("catalog_options is required")
+ if snapshot_id is not None and tag_name is not None:
+ raise ValueError(
+ "snapshot_id and tag_name cannot be set at the same time"
+ )
self._table_identifier = table_identifier
self._catalog_options = catalog_options
self._predicate = predicate
self._projection = projection
self._limit = limit
+ self._snapshot_id = snapshot_id
+ self._tag_name = tag_name
self._table_cached = None
self._splits_cached = None
self._read_type_cached = None
@@ -92,7 +100,15 @@ class CatalogSplitProvider(SplitProvider):
if self._table_cached is None:
from pypaimon.catalog.catalog_factory import CatalogFactory
catalog = CatalogFactory.create(self._catalog_options)
- self._table_cached = catalog.get_table(self._table_identifier)
+ table = catalog.get_table(self._table_identifier)
+ travel_options = {}
+ if self._snapshot_id is not None:
+ travel_options["scan.snapshot-id"] = str(self._snapshot_id)
+ if self._tag_name is not None:
+ travel_options["scan.tag-name"] = self._tag_name
+ if travel_options:
+ table = table.copy(travel_options)
+ self._table_cached = table
return self._table_cached
def _ensure_planned(self):
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 562bea26f5..9b661b295a 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -113,6 +113,23 @@ class TableScan:
self.predicate,
self.limit
)
+ elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID): # Handle
snapshot-id-based reading
+ snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID))
+
+ def snapshot_id_manifest_scanner():
+ snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+ if snapshot is None:
+ raise ValueError(
+ "Snapshot id %d does not exist" % snapshot_id
+ )
+ return manifest_list_manager.read_all(snapshot), snapshot
+
+ return FileScanner(
+ self.table,
+ snapshot_id_manifest_scanner,
+ self.predicate,
+ self.limit
+ )
def all_manifests():
snapshot = snapshot_manager.get_latest_snapshot()
diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py
b/paimon-python/pypaimon/snapshot/time_travel_util.py
index df3eeb11cb..4f9e4aa065 100644
--- a/paimon-python/pypaimon/snapshot/time_travel_util.py
+++ b/paimon-python/pypaimon/snapshot/time_travel_util.py
@@ -25,7 +25,8 @@ from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.tag.tag_manager import TagManager
SCAN_KEYS = [
- CoreOptions.SCAN_TAG_NAME.key()
+ CoreOptions.SCAN_TAG_NAME.key(),
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
]
@@ -35,21 +36,25 @@ class TimeTravelUtil:
@staticmethod
def try_travel_to_snapshot(
options: Options,
- tag_manager: TagManager
+ tag_manager: TagManager,
+ snapshot_manager=None,
) -> Optional[Snapshot]:
"""
Try to travel to a snapshot based on the options.
-
+
Supports the following time travel options:
- scan.tag-name: Travel to a specific tag
-
+ - scan.snapshot-id: Travel to a specific snapshot id
+
Args:
options: The options containing time travel parameters
tag_manager: The tag manager
-
+ snapshot_manager: The snapshot manager, required when
+ ``scan.snapshot-id`` is set
+
Returns:
The Snapshot to travel to, or None if no time travel option is set.
-
+
Raises:
ValueError: If more than one time travel option is set
"""
@@ -71,5 +76,15 @@ class TimeTravelUtil:
if tag is None:
raise ValueError(f"Tag '{tag_name}' doesn't exist.")
return tag.trim_to_snapshot()
+ elif key == CoreOptions.SCAN_SNAPSHOT_ID.key():
+ if snapshot_manager is None:
+ raise ValueError(
+ "snapshot_manager is required to resolve scan.snapshot-id"
+ )
+ snapshot_id = int(core_options.scan_snapshot_id())
+ snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+ if snapshot is None:
+ raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
+ return snapshot
else:
raise ValueError(f"Unsupported time travel mode: {key}")
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 35addad251..d7c80b016d 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -435,6 +435,7 @@ class FileStoreTable(Table):
Supports the following time travel options:
- scan.tag-name: Travel to a specific tag
+ - scan.snapshot-id: Travel to a specific snapshot id
Returns:
The TableSchema at the time travel point, or None if no time
travel option is set.
@@ -442,7 +443,9 @@ class FileStoreTable(Table):
try:
from pypaimon.snapshot.time_travel_util import TimeTravelUtil
- snapshot = TimeTravelUtil.try_travel_to_snapshot(options,
self.tag_manager())
+ snapshot = TimeTravelUtil.try_travel_to_snapshot(
+ options, self.tag_manager(), self.snapshot_manager()
+ )
if snapshot is None:
return None
return
self.schema_manager.get_schema(snapshot.schema_id).copy(new_options=options.to_map())
diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py
b/paimon-python/pypaimon/table/source/full_text_scan.py
index 50b2381d27..e3cf312bf4 100644
--- a/paimon-python/pypaimon/table/source/full_text_scan.py
+++ b/paimon-python/pypaimon/table/source/full_text_scan.py
@@ -61,7 +61,8 @@ class FullTextScanImpl(FullTextScan):
from pypaimon.common.options.options import Options
travel_snapshot = TimeTravelUtil.try_travel_to_snapshot(
Options(self._table.table_schema.options),
- self._table.tag_manager()
+ self._table.tag_manager(),
+ self._table.snapshot_manager(),
)
if travel_snapshot is not None:
snapshot = travel_snapshot
diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py
b/paimon-python/pypaimon/table/source/vector_search_scan.py
index 035e51a846..0285e3374e 100644
--- a/paimon-python/pypaimon/table/source/vector_search_scan.py
+++ b/paimon-python/pypaimon/table/source/vector_search_scan.py
@@ -77,7 +77,8 @@ class VectorSearchScanImpl(VectorSearchScan):
snapshot = TimeTravelUtil.try_travel_to_snapshot(
Options(self._table.table_schema.options),
- self._table.tag_manager()
+ self._table.tag_manager(),
+ self._table.snapshot_manager(),
)
if snapshot is None:
snapshot = self._table.snapshot_manager().get_latest_snapshot()
diff --git a/paimon-python/pypaimon/tests/ray_integration_test.py
b/paimon-python/pypaimon/tests/ray_integration_test.py
index 1b8e2df505..4d9613c499 100644
--- a/paimon-python/pypaimon/tests/ray_integration_test.py
+++ b/paimon-python/pypaimon/tests/ray_integration_test.py
@@ -196,6 +196,65 @@ class RayIntegrationTest(unittest.TestCase):
ds = read_paimon(identifier, self.catalog_options)
self.assertEqual(ds.count(), 0)
+ def test_read_paimon_with_snapshot_id(self):
+ """read_paimon(snapshot_id=N) time-travels to that snapshot."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+ identifier = 'default.test_read_snap_id'
+ catalog = CatalogFactory.create(self.catalog_options)
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table(identifier, schema, False)
+ table = catalog.get_table(identifier)
+ for batch in [{'id': [1], 'name': ['a']}, {'id': [2], 'name': ['b']}]:
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ ds_latest = read_paimon(identifier, self.catalog_options)
+ self.assertEqual(ds_latest.count(), 2)
+
+ ds_snap1 = read_paimon(identifier, self.catalog_options, snapshot_id=1)
+ self.assertEqual(ds_snap1.count(), 1)
+ self.assertEqual(ds_snap1.to_pandas()['id'].tolist(), [1])
+
+ def test_read_paimon_with_tag_name(self):
+ """read_paimon(tag_name=...) time-travels to a tagged snapshot."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+ identifier = 'default.test_read_tag_name'
+ catalog = CatalogFactory.create(self.catalog_options)
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table(identifier, schema, False)
+ table = catalog.get_table(identifier)
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(pa.Table.from_pydict({'id': [1], 'name': ['a']},
schema=pa_schema))
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+ table.create_tag('v1')
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(pa.Table.from_pydict({'id': [2], 'name': ['b']},
schema=pa_schema))
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ ds_tag = read_paimon(identifier, self.catalog_options, tag_name='v1')
+ self.assertEqual(ds_tag.count(), 1)
+ self.assertEqual(ds_tag.to_pandas()['id'].tolist(), [1])
+
+ def test_read_paimon_rejects_snapshot_id_and_tag_name_together(self):
+ from pypaimon.ray import read_paimon
+
+ with self.assertRaises(ValueError):
+ read_paimon(
+ 'default.dummy', self.catalog_options,
+ snapshot_id=1, tag_name='v1',
+ )
+
def test_write_paimon_basic(self):
"""write_paimon() writes data that read_paimon() can round-trip."""
from pypaimon.ray import read_paimon, write_paimon
diff --git a/paimon-python/pypaimon/tests/split_provider_test.py
b/paimon-python/pypaimon/tests/split_provider_test.py
index 31152f28a6..05bedd2f41 100644
--- a/paimon-python/pypaimon/tests/split_provider_test.py
+++ b/paimon-python/pypaimon/tests/split_provider_test.py
@@ -156,6 +156,78 @@ class SplitProviderTest(unittest.TestCase):
table_identifier=self.identifier, catalog_options=None
)
+ def test_catalog_provider_rejects_snapshot_id_and_tag_name_together(self):
+ with self.assertRaises(ValueError):
+ CatalogSplitProvider(
+ table_identifier=self.identifier,
+ catalog_options=self.catalog_options,
+ snapshot_id=1,
+ tag_name='v1',
+ )
+
+ def test_catalog_provider_time_travel_by_snapshot_id(self):
+ """Two commits → snapshot_id=1 sees only the first commit's rows."""
+ pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+ identifier = 'default.split_provider_snap_id'
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog = CatalogFactory.create(self.catalog_options)
+ catalog.create_table(identifier, schema, False)
+ table = catalog.get_table(identifier)
+ for batch in [{'id': [10], 'name': ['a']}, {'id': [20], 'name':
['b']}]:
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ provider = CatalogSplitProvider(
+ table_identifier=identifier,
+ catalog_options=self.catalog_options,
+ snapshot_id=1,
+ )
+ from pypaimon.read.table_read import TableRead
+ tr = TableRead(
+ provider.table(),
+ predicate=None,
+ read_type=provider.read_type(),
+ )
+ rows = tr.to_arrow(provider.splits()).to_pylist()
+ self.assertEqual([r['id'] for r in rows], [10])
+
+ def test_catalog_provider_time_travel_by_tag_name(self):
+ """Tag captures snapshot 1; reading via tag returns only that
snapshot's rows."""
+ pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+ identifier = 'default.split_provider_tag_name'
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog = CatalogFactory.create(self.catalog_options)
+ catalog.create_table(identifier, schema, False)
+ table = catalog.get_table(identifier)
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(pa.Table.from_pydict({'id': [11], 'name': ['x']},
schema=pa_schema))
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+ table.create_tag('v1')
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(pa.Table.from_pydict({'id': [22], 'name': ['y']},
schema=pa_schema))
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ provider = CatalogSplitProvider(
+ table_identifier=identifier,
+ catalog_options=self.catalog_options,
+ tag_name='v1',
+ )
+ from pypaimon.read.table_read import TableRead
+ tr = TableRead(
+ provider.table(),
+ predicate=None,
+ read_type=provider.read_type(),
+ )
+ rows = tr.to_arrow(provider.splits()).to_pylist()
+ self.assertEqual([r['id'] for r in rows], [11])
+
def test_pre_resolved_provider_returns_inputs(self):
"""PreResolvedSplitProvider just hands back what it was given."""
catalog = CatalogFactory.create(self.catalog_options)
diff --git a/paimon-python/pypaimon/tests/time_travel_util_test.py
b/paimon-python/pypaimon/tests/time_travel_util_test.py
new file mode 100644
index 0000000000..55bbd18ce5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/time_travel_util_test.py
@@ -0,0 +1,98 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pypaimon.common.options.options import Options
+from pypaimon.snapshot.time_travel_util import SCAN_KEYS, TimeTravelUtil
+
+
+class _StubSnapshot:
+ def __init__(self, snapshot_id, schema_id=0):
+ self.id = snapshot_id
+ self.schema_id = schema_id
+
+
+class _StubSnapshotManager:
+ def __init__(self, snapshots):
+ self._snapshots = {s.id: s for s in snapshots}
+
+ def get_snapshot_by_id(self, snapshot_id):
+ return self._snapshots.get(snapshot_id)
+
+
+class _StubTagManager:
+ def __init__(self, tags):
+ self._tags = tags
+
+ def get(self, name):
+ return self._tags.get(name)
+
+
+class TimeTravelUtilTest(unittest.TestCase):
+
+ def test_returns_none_when_no_scan_option_set(self):
+ result = TimeTravelUtil.try_travel_to_snapshot(
+ Options({}), _StubTagManager({}), _StubSnapshotManager([])
+ )
+ self.assertIsNone(result)
+
+ def test_resolves_snapshot_id(self):
+ snap2 = _StubSnapshot(2)
+ result = TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.snapshot-id': '2'}),
+ _StubTagManager({}),
+ _StubSnapshotManager([_StubSnapshot(1), snap2]),
+ )
+ self.assertIs(result, snap2)
+
+ def test_unknown_snapshot_id_raises(self):
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.snapshot-id': '99'}),
+ _StubTagManager({}),
+ _StubSnapshotManager([_StubSnapshot(1)]),
+ )
+
+ def test_snapshot_id_without_snapshot_manager_raises(self):
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.snapshot-id': '1'}),
+ _StubTagManager({}),
+ None,
+ )
+
+ def test_rejects_setting_snapshot_id_and_tag_name_together(self):
+ # Defence-in-depth: even if a caller bypasses read_paimon's check,
+ # the util-level guard still complains.
+ with self.assertRaises(ValueError):
+ TimeTravelUtil.try_travel_to_snapshot(
+ Options({'scan.snapshot-id': '1', 'scan.tag-name': 'v1'}),
+ _StubTagManager({}),
+ _StubSnapshotManager([_StubSnapshot(1)]),
+ )
+
+ def test_scan_keys_contains_both_options(self):
+ # Sanity check: SCAN_KEYS must enumerate both time-travel modes,
+ # otherwise the mutual-exclusion guard above would not trigger.
+ self.assertIn('scan.snapshot-id', SCAN_KEYS)
+ self.assertIn('scan.tag-name', SCAN_KEYS)
+
+
+if __name__ == '__main__':
+ unittest.main()