This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 14ab97b363 [python][ray] Support dynamic_table_options in read_paimon
(#8195)
14ab97b363 is described below
commit 14ab97b363739ec78af7a9e3c1650dfd09fe61dc
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jun 11 15:58:27 2026 +0800
[python][ray] Support dynamic_table_options in read_paimon (#8195)
---
paimon-python/pypaimon/ray/ray_paimon.py | 3 +
.../pypaimon/read/datasource/split_provider.py | 32 +++++++--
.../tests/ray_data_evolution_merge_into_test.py | 81 +++++++++++++++++++++-
.../pypaimon/tests/split_provider_test.py | 38 ++++++++++
4 files changed, 148 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/ray/ray_paimon.py
b/paimon-python/pypaimon/ray/ray_paimon.py
index 0e796dd7c2..d0e7706c83 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -56,6 +56,7 @@ def read_paimon(
limit: Optional[int] = None,
snapshot_id: Optional[int] = None,
tag_name: Optional[str] = None,
+ dynamic_options: Optional[Dict[str, str]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
@@ -74,6 +75,7 @@ def read_paimon(
exclusive with ``tag_name``.
tag_name: Optional tag name to time-travel to. Mutually
exclusive with ``snapshot_id``.
+ dynamic_options: Optional dynamic options to override at read time.
ray_remote_args: Optional kwargs passed to ``ray.remote`` in read
tasks.
concurrency: Optional max number of Ray read tasks to run concurrently.
override_num_blocks: Optional override for the number of output blocks.
@@ -106,6 +108,7 @@ def read_paimon(
limit=limit,
snapshot_id=snapshot_id,
tag_name=tag_name,
+ dynamic_options=dynamic_options,
)
if not split_provider.splits():
diff --git a/paimon-python/pypaimon/read/datasource/split_provider.py
b/paimon-python/pypaimon/read/datasource/split_provider.py
index e981eebb96..430eeb7cac 100644
--- a/paimon-python/pypaimon/read/datasource/split_provider.py
+++ b/paimon-python/pypaimon/read/datasource/split_provider.py
@@ -85,15 +85,34 @@ class CatalogSplitProvider(SplitProvider):
limit: Optional[int] = None,
snapshot_id: Optional[int] = None,
tag_name: Optional[str] = None,
+ dynamic_options: Optional[Dict[str, str]] = None,
):
if not table_identifier:
raise ValueError("table_identifier is required")
if catalog_options is None:
raise ValueError("catalog_options is required")
+ from pypaimon.snapshot.time_travel_util import SCAN_KEYS
+ scan_keys = set(SCAN_KEYS)
+
if snapshot_id is not None and tag_name is not None:
raise ValueError(
"snapshot_id and tag_name cannot be set at the same time"
)
+
+ if dynamic_options:
+ dynamic_tt_keys = scan_keys & dynamic_options.keys()
+ if (snapshot_id is not None or tag_name is not None) and
dynamic_tt_keys:
+ raise ValueError(
+ "snapshot_id/tag_name and dynamic_options "
+ "time-travel keys cannot be set at the same time, "
+ "got: {}".format(", ".join(sorted(dynamic_tt_keys)))
+ )
+ if len(dynamic_tt_keys) > 1:
+ raise ValueError(
+ "dynamic_options contains multiple time-travel "
+ "keys which are mutually exclusive: {}".format(
+ ", ".join(sorted(dynamic_tt_keys)))
+ )
self._table_identifier = table_identifier
self._catalog_options = catalog_options
self._predicate = predicate
@@ -101,6 +120,7 @@ class CatalogSplitProvider(SplitProvider):
self._limit = limit
self._snapshot_id = snapshot_id
self._tag_name = tag_name
+ self._dynamic_options = dynamic_options
self._table_cached = None
self._splits_cached = None
self._read_type_cached = None
@@ -110,13 +130,15 @@ class CatalogSplitProvider(SplitProvider):
from pypaimon.catalog.catalog_factory import CatalogFactory
catalog = CatalogFactory.create(self._catalog_options)
table = catalog.get_table(self._table_identifier)
- travel_options = {}
+ dynamic_options = {}
if self._snapshot_id is not None:
- travel_options["scan.snapshot-id"] = str(self._snapshot_id)
+ dynamic_options["scan.snapshot-id"] = str(self._snapshot_id)
if self._tag_name is not None:
- travel_options["scan.tag-name"] = self._tag_name
- if travel_options:
- table = table.copy(travel_options)
+ dynamic_options["scan.tag-name"] = self._tag_name
+ if self._dynamic_options:
+ dynamic_options.update(self._dynamic_options)
+ if dynamic_options:
+ table = table.copy(dynamic_options)
self._table_cached = table
return self._table_cached
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index b70ef40a58..4782b1dc93 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -523,7 +523,8 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
('feature', pa.int32()),
])
name = f'default.tbl_{uuid.uuid4().hex[:8]}'
- schema = Schema.from_pyarrow_schema(blob_schema,
options=self.de_options)
+ schema = Schema.from_pyarrow_schema(
+ blob_schema, options=self.de_options)
self.catalog.create_table(name, schema, False)
self._write(
name,
@@ -586,6 +587,84 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
'num_matched': 2, 'num_inserted': 0, 'num_unchanged': 0,
})
+ def test_blob_descriptor_resolve_and_merge(self):
+ from pypaimon.table.row.blob import BlobDescriptor, Blob
+ from pypaimon.common.uri_reader import UriReaderFactory
+
+ blob_schema = pa.schema([
+ ('id', pa.int32()),
+ ('payload', pa.large_binary()),
+ ('feature', pa.int32()),
+ ])
+ name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+ schema = Schema.from_pyarrow_schema(
+ blob_schema, options=self.de_options)
+ self.catalog.create_table(name, schema, False)
+ self._write(
+ name,
+ pa.Table.from_pydict(
+ {
+ 'id': pa.array([1, 2, 3], type=pa.int32()),
+ 'payload': [b'aa', b'bbb', b'cccc'],
+ 'feature': pa.array([10, 20, 30], type=pa.int32()),
+ },
+ schema=blob_schema,
+ ),
+ )
+
+ num_partitions = _TEST_NUM_PARTITIONS
+ input_ids = ray.data.from_arrow(pa.Table.from_pydict({
+ 'id': pa.array([1, 3], type=pa.int32()),
+ }))
+
+ target_rows = read_paimon(
+ name,
+ self.catalog_options,
+ projection=['id', 'payload'],
+ dynamic_options={'blob-as-descriptor': 'true'},
+ )
+
+ matched = input_ids.join(
+ target_rows, join_type='inner',
+ num_partitions=num_partitions, on=['id'],
+ )
+
+ uri_factory = UriReaderFactory(self.catalog_options)
+
+ def resolve_and_compute(batch):
+ features = []
+ for desc_bytes in batch['payload'].to_pylist():
+ desc = BlobDescriptor.deserialize(desc_bytes)
+ reader = uri_factory.create(desc.uri)
+ data = Blob.from_descriptor(reader, desc).to_data()
+ features.append(len(data) * 100)
+ return pa.Table.from_pydict({
+ 'id': batch['id'],
+ 'new_feature': pa.array(features, type=pa.int32()),
+ })
+
+ updates = matched.map_batches(
+ resolve_and_compute, batch_format='pyarrow')
+ metrics = merge_into(
+ target=name,
+ source=updates,
+ catalog_options=self.catalog_options,
+ on=['id'],
+ when_matched=[
+ WhenMatched(update={'feature': source_col('new_feature')})
+ ],
+ num_partitions=num_partitions,
+ )
+
+ table = self.catalog.get_table(name)
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(out['id'], [1, 2, 3])
+ self.assertEqual(out['feature'], [200, 20, 400])
+ self.assertEqual(out['payload'], [b'aa', b'bbb', b'cccc'])
+ self.assertEqual(metrics['num_matched'], 2)
+
def test_combined_writes_single_snapshot(self):
target = self._create_table()
self._write(
diff --git a/paimon-python/pypaimon/tests/split_provider_test.py
b/paimon-python/pypaimon/tests/split_provider_test.py
index 19525f8e5e..61e14005f5 100644
--- a/paimon-python/pypaimon/tests/split_provider_test.py
+++ b/paimon-python/pypaimon/tests/split_provider_test.py
@@ -227,6 +227,44 @@ class SplitProviderTest(unittest.TestCase):
rows = tr.to_arrow(provider.splits()).to_pylist()
self.assertEqual([r['id'] for r in rows], [11])
+ def test_dynamic_options_blob_as_descriptor(self):
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ identifier = 'default.split_provider_blob_desc'
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'blob-as-descriptor': 'false',
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ catalog = CatalogFactory.create(self.catalog_options)
+ catalog.create_table(identifier, schema, False)
+
+ provider = CatalogSplitProvider(
+ table_identifier=identifier,
+ catalog_options=self.catalog_options,
+ dynamic_options={'blob-as-descriptor': 'true'},
+ )
+ table = provider.table()
+ self.assertTrue(table.options.blob_as_descriptor())
+
+ def test_dynamic_options_rejects_tt_conflict(self):
+ args = dict(table_identifier=self.identifier,
+ catalog_options=self.catalog_options)
+ with self.assertRaises(ValueError):
+ CatalogSplitProvider(
+ **args, snapshot_id=1,
+ dynamic_options={'scan.tag-name': 'v1'})
+ with self.assertRaises(ValueError):
+ CatalogSplitProvider(
+ **args, tag_name='v1',
+ dynamic_options={'scan.snapshot-id': '1'})
+ with self.assertRaises(ValueError):
+ CatalogSplitProvider(
+ **args, dynamic_options={
+ 'scan.snapshot-id': '1', 'scan.tag-name': 'v1'})
+
def test_pre_resolved_provider_returns_inputs(self):
"""PreResolvedSplitProvider just hands back what it was given."""
catalog = CatalogFactory.create(self.catalog_options)