This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 14ab97b363 [python][ray] Support dynamic_table_options in read_paimon 
(#8195)
14ab97b363 is described below

commit 14ab97b363739ec78af7a9e3c1650dfd09fe61dc
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jun 11 15:58:27 2026 +0800

    [python][ray] Support dynamic_table_options in read_paimon (#8195)
---
 paimon-python/pypaimon/ray/ray_paimon.py           |  3 +
 .../pypaimon/read/datasource/split_provider.py     | 32 +++++++--
 .../tests/ray_data_evolution_merge_into_test.py    | 81 +++++++++++++++++++++-
 .../pypaimon/tests/split_provider_test.py          | 38 ++++++++++
 4 files changed, 148 insertions(+), 6 deletions(-)

diff --git a/paimon-python/pypaimon/ray/ray_paimon.py 
b/paimon-python/pypaimon/ray/ray_paimon.py
index 0e796dd7c2..d0e7706c83 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -56,6 +56,7 @@ def read_paimon(
     limit: Optional[int] = None,
     snapshot_id: Optional[int] = None,
     tag_name: Optional[str] = None,
+    dynamic_options: Optional[Dict[str, str]] = None,
     ray_remote_args: Optional[Dict[str, Any]] = None,
     concurrency: Optional[int] = None,
     override_num_blocks: Optional[int] = None,
@@ -74,6 +75,7 @@ def read_paimon(
             exclusive with ``tag_name``.
         tag_name: Optional tag name to time-travel to. Mutually
             exclusive with ``snapshot_id``.
+        dynamic_options: Optional dynamic options to override at read time.
         ray_remote_args: Optional kwargs passed to ``ray.remote`` in read 
tasks.
         concurrency: Optional max number of Ray read tasks to run concurrently.
         override_num_blocks: Optional override for the number of output blocks.
@@ -106,6 +108,7 @@ def read_paimon(
         limit=limit,
         snapshot_id=snapshot_id,
         tag_name=tag_name,
+        dynamic_options=dynamic_options,
     )
 
     if not split_provider.splits():
diff --git a/paimon-python/pypaimon/read/datasource/split_provider.py 
b/paimon-python/pypaimon/read/datasource/split_provider.py
index e981eebb96..430eeb7cac 100644
--- a/paimon-python/pypaimon/read/datasource/split_provider.py
+++ b/paimon-python/pypaimon/read/datasource/split_provider.py
@@ -85,15 +85,34 @@ class CatalogSplitProvider(SplitProvider):
         limit: Optional[int] = None,
         snapshot_id: Optional[int] = None,
         tag_name: Optional[str] = None,
+        dynamic_options: Optional[Dict[str, str]] = None,
     ):
         if not table_identifier:
             raise ValueError("table_identifier is required")
         if catalog_options is None:
             raise ValueError("catalog_options is required")
+        from pypaimon.snapshot.time_travel_util import SCAN_KEYS
+        scan_keys = set(SCAN_KEYS)
+
         if snapshot_id is not None and tag_name is not None:
             raise ValueError(
                 "snapshot_id and tag_name cannot be set at the same time"
             )
+
+        if dynamic_options:
+            dynamic_tt_keys = scan_keys & dynamic_options.keys()
+            if (snapshot_id is not None or tag_name is not None) and 
dynamic_tt_keys:
+                raise ValueError(
+                    "snapshot_id/tag_name and dynamic_options "
+                    "time-travel keys cannot be set at the same time, "
+                    "got: {}".format(", ".join(sorted(dynamic_tt_keys)))
+                )
+            if len(dynamic_tt_keys) > 1:
+                raise ValueError(
+                    "dynamic_options contains multiple time-travel "
+                    "keys which are mutually exclusive: {}".format(
+                        ", ".join(sorted(dynamic_tt_keys)))
+                )
         self._table_identifier = table_identifier
         self._catalog_options = catalog_options
         self._predicate = predicate
@@ -101,6 +120,7 @@ class CatalogSplitProvider(SplitProvider):
         self._limit = limit
         self._snapshot_id = snapshot_id
         self._tag_name = tag_name
+        self._dynamic_options = dynamic_options
         self._table_cached = None
         self._splits_cached = None
         self._read_type_cached = None
@@ -110,13 +130,15 @@ class CatalogSplitProvider(SplitProvider):
             from pypaimon.catalog.catalog_factory import CatalogFactory
             catalog = CatalogFactory.create(self._catalog_options)
             table = catalog.get_table(self._table_identifier)
-            travel_options = {}
+            dynamic_options = {}
             if self._snapshot_id is not None:
-                travel_options["scan.snapshot-id"] = str(self._snapshot_id)
+                dynamic_options["scan.snapshot-id"] = str(self._snapshot_id)
             if self._tag_name is not None:
-                travel_options["scan.tag-name"] = self._tag_name
-            if travel_options:
-                table = table.copy(travel_options)
+                dynamic_options["scan.tag-name"] = self._tag_name
+            if self._dynamic_options:
+                dynamic_options.update(self._dynamic_options)
+            if dynamic_options:
+                table = table.copy(dynamic_options)
             self._table_cached = table
         return self._table_cached
 
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py 
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index b70ef40a58..4782b1dc93 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -523,7 +523,8 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
             ('feature', pa.int32()),
         ])
         name = f'default.tbl_{uuid.uuid4().hex[:8]}'
-        schema = Schema.from_pyarrow_schema(blob_schema, 
options=self.de_options)
+        schema = Schema.from_pyarrow_schema(
+            blob_schema, options=self.de_options)
         self.catalog.create_table(name, schema, False)
         self._write(
             name,
@@ -586,6 +587,84 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
             'num_matched': 2, 'num_inserted': 0, 'num_unchanged': 0,
         })
 
+    def test_blob_descriptor_resolve_and_merge(self):
+        from pypaimon.table.row.blob import BlobDescriptor, Blob
+        from pypaimon.common.uri_reader import UriReaderFactory
+
+        blob_schema = pa.schema([
+            ('id', pa.int32()),
+            ('payload', pa.large_binary()),
+            ('feature', pa.int32()),
+        ])
+        name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+        schema = Schema.from_pyarrow_schema(
+            blob_schema, options=self.de_options)
+        self.catalog.create_table(name, schema, False)
+        self._write(
+            name,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'payload': [b'aa', b'bbb', b'cccc'],
+                    'feature': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=blob_schema,
+            ),
+        )
+
+        num_partitions = _TEST_NUM_PARTITIONS
+        input_ids = ray.data.from_arrow(pa.Table.from_pydict({
+            'id': pa.array([1, 3], type=pa.int32()),
+        }))
+
+        target_rows = read_paimon(
+            name,
+            self.catalog_options,
+            projection=['id', 'payload'],
+            dynamic_options={'blob-as-descriptor': 'true'},
+        )
+
+        matched = input_ids.join(
+            target_rows, join_type='inner',
+            num_partitions=num_partitions, on=['id'],
+        )
+
+        uri_factory = UriReaderFactory(self.catalog_options)
+
+        def resolve_and_compute(batch):
+            features = []
+            for desc_bytes in batch['payload'].to_pylist():
+                desc = BlobDescriptor.deserialize(desc_bytes)
+                reader = uri_factory.create(desc.uri)
+                data = Blob.from_descriptor(reader, desc).to_data()
+                features.append(len(data) * 100)
+            return pa.Table.from_pydict({
+                'id': batch['id'],
+                'new_feature': pa.array(features, type=pa.int32()),
+            })
+
+        updates = matched.map_batches(
+            resolve_and_compute, batch_format='pyarrow')
+        metrics = merge_into(
+            target=name,
+            source=updates,
+            catalog_options=self.catalog_options,
+            on=['id'],
+            when_matched=[
+                WhenMatched(update={'feature': source_col('new_feature')})
+            ],
+            num_partitions=num_partitions,
+        )
+
+        table = self.catalog.get_table(name)
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(out['id'], [1, 2, 3])
+        self.assertEqual(out['feature'], [200, 20, 400])
+        self.assertEqual(out['payload'], [b'aa', b'bbb', b'cccc'])
+        self.assertEqual(metrics['num_matched'], 2)
+
     def test_combined_writes_single_snapshot(self):
         target = self._create_table()
         self._write(
diff --git a/paimon-python/pypaimon/tests/split_provider_test.py 
b/paimon-python/pypaimon/tests/split_provider_test.py
index 19525f8e5e..61e14005f5 100644
--- a/paimon-python/pypaimon/tests/split_provider_test.py
+++ b/paimon-python/pypaimon/tests/split_provider_test.py
@@ -227,6 +227,44 @@ class SplitProviderTest(unittest.TestCase):
         rows = tr.to_arrow(provider.splits()).to_pylist()
         self.assertEqual([r['id'] for r in rows], [11])
 
+    def test_dynamic_options_blob_as_descriptor(self):
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        identifier = 'default.split_provider_blob_desc'
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'blob-as-descriptor': 'false',
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        catalog = CatalogFactory.create(self.catalog_options)
+        catalog.create_table(identifier, schema, False)
+
+        provider = CatalogSplitProvider(
+            table_identifier=identifier,
+            catalog_options=self.catalog_options,
+            dynamic_options={'blob-as-descriptor': 'true'},
+        )
+        table = provider.table()
+        self.assertTrue(table.options.blob_as_descriptor())
+
+    def test_dynamic_options_rejects_tt_conflict(self):
+        args = dict(table_identifier=self.identifier,
+                    catalog_options=self.catalog_options)
+        with self.assertRaises(ValueError):
+            CatalogSplitProvider(
+                **args, snapshot_id=1,
+                dynamic_options={'scan.tag-name': 'v1'})
+        with self.assertRaises(ValueError):
+            CatalogSplitProvider(
+                **args, tag_name='v1',
+                dynamic_options={'scan.snapshot-id': '1'})
+        with self.assertRaises(ValueError):
+            CatalogSplitProvider(
+                **args, dynamic_options={
+                    'scan.snapshot-id': '1', 'scan.tag-name': 'v1'})
+
     def test_pre_resolved_provider_returns_inputs(self):
         """PreResolvedSplitProvider just hands back what it was given."""
         catalog = CatalogFactory.create(self.catalog_options)

Reply via email to