This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new be172f4d90 [python] sync to_ray method args with ray data api (#6948)
be172f4d90 is described below

commit be172f4d90984c545fca1a94439f602aa159a95c
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jan 5 02:28:28 2026 +0800

    [python] sync to_ray method args with ray data api (#6948)
---
 docs/content/program-api/python-api.md             | 42 +++++++++++++++++++---
 paimon-python/pypaimon/read/table_read.py          | 41 +++++++++++++++++----
 .../sample/rest_catalog_ray_data_sample.py         |  8 ++---
 paimon-python/pypaimon/tests/ray_data_test.py      | 41 ++++++++++-----------
 4 files changed, 93 insertions(+), 39 deletions(-)

diff --git a/docs/content/program-api/python-api.md 
b/docs/content/program-api/python-api.md
index 2d9d359e18..406c8c1ef6 100644
--- a/docs/content/program-api/python-api.md
+++ b/docs/content/program-api/python-api.md
@@ -426,14 +426,21 @@ print(ray_dataset.to_pandas())
 # ...
 ```
 
-The `to_ray()` method supports a `parallelism` parameter to control 
distributed reading. Use `parallelism=1` for single-task read (default) or 
`parallelism > 1` for distributed read with multiple Ray workers:
+The `to_ray()` method supports Ray Data API parameters for distributed 
processing:
 
 ```python
-# Simple mode (single task)
-ray_dataset = table_read.to_ray(splits, parallelism=1)
+# Basic usage
+ray_dataset = table_read.to_ray(splits)
+
+# Specify number of output blocks
+ray_dataset = table_read.to_ray(splits, override_num_blocks=4)
 
-# Distributed mode with 4 parallel tasks
-ray_dataset = table_read.to_ray(splits, parallelism=4)
+# Configure Ray remote arguments
+ray_dataset = table_read.to_ray(
+    splits,
+    override_num_blocks=4,
+    ray_remote_args={"num_cpus": 2, "max_retries": 3}
+)
 
 # Use Ray Data operations
 mapped_dataset = ray_dataset.map(lambda row: {'value': row['value'] * 2})
@@ -441,6 +448,31 @@ filtered_dataset = ray_dataset.filter(lambda row: 
row['score'] > 80)
 df = ray_dataset.to_pandas()
 ```
 
+**Parameters:**
+- `override_num_blocks`: Optional override for the number of output blocks. By 
default,
+  Ray automatically determines the optimal number.
+- `ray_remote_args`: Optional kwargs passed to `ray.remote()` in read tasks
+  (e.g., `{"num_cpus": 2, "max_retries": 3}`).
+- `concurrency`: Optional max number of Ray tasks to run concurrently. By 
default,
+  dynamically decided based on available resources.
+- `**read_args`: Additional kwargs passed to the datasource (e.g., 
`per_task_row_limit`
+  in Ray 2.52.0+).
+
+**Ray Block Size Configuration:**
+
+If you need to configure Ray's block size (e.g., when Paimon splits exceed 
Ray's default
+128MB block size), set it before calling `to_ray()`:
+
+```python
+from ray.data import DataContext
+
+ctx = DataContext.get_current()
+ctx.target_max_block_size = 256 * 1024 * 1024  # 256MB (default is 128MB)
+ray_dataset = table_read.to_ray(splits)
+```
+
+See [Ray Data API 
Documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)
 for more details.
+
 ### Incremental Read
 
 This API allows reading data committed between two snapshot timestamps. The 
steps are as follows.
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 626cbc2be7..953384cc7d 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from typing import Iterator, List, Optional
+from typing import Any, Dict, Iterator, List, Optional
 
 import pandas
 import pyarrow
@@ -128,8 +128,30 @@ class TableRead:
         con.register(table_name, self.to_arrow(splits))
         return con
 
-    def to_ray(self, splits: List[Split], parallelism: int = 1) -> 
"ray.data.dataset.Dataset":
-        """Convert Paimon table data to Ray Dataset."""
+    def to_ray(
+        self,
+        splits: List[Split],
+        *,
+        ray_remote_args: Optional[Dict[str, Any]] = None,
+        concurrency: Optional[int] = None,
+        override_num_blocks: Optional[int] = None,
+        **read_args,
+    ) -> "ray.data.dataset.Dataset":
+        """Convert Paimon table data to Ray Dataset.
+        Args:
+            splits: List of splits to read from the Paimon table.
+            ray_remote_args: Optional kwargs passed to :func:`ray.remote` in 
read tasks.
+                For example, ``{"num_cpus": 2, "max_retries": 3}``.
+            concurrency: Optional max number of Ray tasks to run concurrently.
+                By default, dynamically decided based on available resources.
+            override_num_blocks: Optional override for the number of output 
blocks.
+                You needn't manually set this in most cases.
+            **read_args: Additional kwargs passed to the datasource.
+                For example, ``per_task_row_limit`` (Ray 2.52.0+).
+        
+        See `Ray Data API 
<https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html>`_
+        for details.
+        """
         import ray
 
         if not splits:
@@ -140,13 +162,18 @@ class TableRead:
             )
             return ray.data.from_arrow(empty_table)
 
-        # Validate parallelism parameter
-        if parallelism < 1:
-            raise ValueError(f"parallelism must be at least 1, got 
{parallelism}")
+        if override_num_blocks is not None and override_num_blocks < 1:
+            raise ValueError(f"override_num_blocks must be at least 1, got 
{override_num_blocks}")
 
         from pypaimon.read.ray_datasource import PaimonDatasource
         datasource = PaimonDatasource(self, splits)
-        return ray.data.read_datasource(datasource, parallelism=parallelism)
+        return ray.data.read_datasource(
+            datasource,
+            ray_remote_args=ray_remote_args,
+            concurrency=concurrency,
+            override_num_blocks=override_num_blocks,
+            **read_args
+        )
 
     def _create_split_read(self, split: Split) -> SplitRead:
         if self.table.is_primary_key_table and not split.raw_convertible:
diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py 
b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
index 268946b5a2..fafe31b2f3 100644
--- a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
+++ b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
@@ -121,7 +121,7 @@ def main():
         print(f"Number of splits: {len(splits)}")
 
         # Convert to Ray Dataset
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
         print("✓ Ray Dataset created successfully")
         print(f"  - Total rows: {ray_dataset.count()}")
         # Note: num_blocks() requires materialized dataset, so we skip it for 
simplicity
@@ -135,7 +135,7 @@ def main():
         print("\n" + "="*60)
         print("Step 3: Comparison with simple read mode")
         print("="*60)
-        ray_dataset_simple = table_read.to_ray(splits, parallelism=1)
+        ray_dataset_simple = table_read.to_ray(splits, override_num_blocks=1)
         df_simple = ray_dataset_simple.to_pandas()
 
         print(f"Distributed mode rows: {ray_dataset.count()}")
@@ -193,7 +193,7 @@ def main():
         table_scan_filtered = read_builder_filtered.new_scan()
         splits_filtered = table_scan_filtered.plan().splits()
 
-        ray_dataset_filtered = table_read_filtered.to_ray(splits_filtered, 
parallelism=2)
+        ray_dataset_filtered = table_read_filtered.to_ray(splits_filtered, 
override_num_blocks=2)
         df_filtered_at_read = ray_dataset_filtered.to_pandas()
         print(f"✓ Filtered at read time: {ray_dataset_filtered.count()} rows")
         print(f"  - All categories are 'A': 
{all(df_filtered_at_read['category'] == 'A')}")
@@ -207,7 +207,7 @@ def main():
         table_scan_projected = read_builder_projected.new_scan()
         splits_projected = table_scan_projected.plan().splits()
 
-        ray_dataset_projected = table_read_projected.to_ray(splits_projected, 
parallelism=2)
+        ray_dataset_projected = table_read_projected.to_ray(splits_projected, 
override_num_blocks=2)
         df_projected = ray_dataset_projected.to_pandas()
         print(f"✓ Projected columns: {list(df_projected.columns)}")
         print("  - Expected: ['id', 'name', 'value']")
diff --git a/paimon-python/pypaimon/tests/ray_data_test.py 
b/paimon-python/pypaimon/tests/ray_data_test.py
index fc9e967ddf..42bca3d60b 100644
--- a/paimon-python/pypaimon/tests/ray_data_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_test.py
@@ -109,7 +109,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
 
         # Verify Ray dataset
         self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
@@ -169,7 +169,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
 
         # Verify filtered results
         self.assertEqual(ray_dataset.count(), 2, "Should have 2 rows after 
filtering")
@@ -215,7 +215,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
 
         # Verify projection
         self.assertEqual(ray_dataset.count(), 3, "Should have 3 rows")
@@ -256,7 +256,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
 
         # Apply map operation (double the value)
         def double_value(row):
@@ -303,7 +303,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
 
         # Apply filter operation (score >= 80)
         filtered_dataset = ray_dataset.filter(lambda row: row['score'] >= 80)
@@ -346,8 +346,8 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset_distributed = table_read.to_ray(splits, parallelism=2)
-        ray_dataset_simple = table_read.to_ray(splits, parallelism=1)
+        ray_dataset_distributed = table_read.to_ray(splits, 
override_num_blocks=2)
+        ray_dataset_simple = table_read.to_ray(splits, override_num_blocks=1)
 
         # Both should produce the same results
         self.assertEqual(ray_dataset_distributed.count(), 3, "Distributed mode 
should have 3 rows")
@@ -394,7 +394,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=1)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=1)
         self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
         self.assertEqual(ray_dataset.count(), 5, "Should have 5 rows")
 
@@ -453,7 +453,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
 
         self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
         self.assertEqual(ray_dataset.count(), 4, "Should have 4 rows after 
upsert")
@@ -510,7 +510,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=1)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=1)
 
         # Verify filtered results
         self.assertEqual(ray_dataset.count(), 2, "Should have 2 rows after 
filtering")
@@ -528,7 +528,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=1)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=1)
 
         # Verify filtered results by partition
         self.assertEqual(ray_dataset.count(), 2, "Should have 2 rows in 
partition 2024-01-01")
@@ -588,7 +588,7 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        ray_dataset = table_read.to_ray(splits, parallelism=2)
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
 
         self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
         self.assertEqual(ray_dataset.count(), 4, "Should have 4 rows after 
upsert")
@@ -604,7 +604,6 @@ class RayDataTest(unittest.TestCase):
         self.assertEqual(list(df_sorted['value']), [150, 250, 300, 400], 
"Value column should reflect updates")
 
     def test_ray_data_invalid_parallelism(self):
-        """Test that invalid parallelism values raise ValueError."""
         pa_schema = pa.schema([
             ('id', pa.int32()),
             ('name', pa.string()),
@@ -633,21 +632,17 @@ class RayDataTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         splits = table_scan.plan().splits()
 
-        # Test with parallelism = 0
         with self.assertRaises(ValueError) as context:
-            table_read.to_ray(splits, parallelism=0)
-        self.assertIn("parallelism must be at least 1", str(context.exception))
+            table_read.to_ray(splits, override_num_blocks=0)
+        self.assertIn("override_num_blocks must be at least 1", 
str(context.exception))
 
-        # Test with parallelism < 0
         with self.assertRaises(ValueError) as context:
-            table_read.to_ray(splits, parallelism=-1)
-        self.assertIn("parallelism must be at least 1", str(context.exception))
+            table_read.to_ray(splits, override_num_blocks=-1)
+        self.assertIn("override_num_blocks must be at least 1", 
str(context.exception))
 
-        # Test with parallelism = -10
         with self.assertRaises(ValueError) as context:
-            table_read.to_ray(splits, parallelism=-10)
-        self.assertIn("parallelism must be at least 1", str(context.exception))
-
+            table_read.to_ray(splits, override_num_blocks=-10)
+        self.assertIn("override_num_blocks must be at least 1", 
str(context.exception))
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to