This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eb5a4da5f [python][ray] Preserve schema for empty reads (#8118)
3eb5a4da5f is described below

commit 3eb5a4da5f631f4dff661dcae500f32efbd4309f
Author: QuakeWang <[email protected]>
AuthorDate: Thu Jun 4 16:36:30 2026 +0800

    [python][ray] Preserve schema for empty reads (#8118)
    
    The top-level Ray `read_paimon` API planned reads through
    `RayDatasource`. When a table scan produced no splits,
    `RayDatasource.get_read_tasks()` returned no read tasks, so Ray could
    create an empty dataset without the Paimon table schema.
    
    This was inconsistent with `TableRead.to_ray()`, which already returns
    an empty Arrow-backed Ray dataset with the planned read schema.
    
    This PR makes `read_paimon` use the planned `read_type` to build an
    empty Arrow table when there are no splits, so empty reads preserve
    schema and projection. It also lazily imports `ray.data` and reports an
    actionable `pypaimon[ray]` install hint when Ray is missing.
---
 paimon-python/pypaimon/ray/ray_paimon.py           | 62 ++++++++++++++++------
 .../pypaimon/tests/ray_integration_test.py         | 42 ++++++++++++++-
 2 files changed, 86 insertions(+), 18 deletions(-)

diff --git a/paimon-python/pypaimon/ray/ray_paimon.py 
b/paimon-python/pypaimon/ray/ray_paimon.py
index 3acbe91ace..0e796dd7c2 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -26,12 +26,26 @@ Usage::
     write_paimon(ds, "db.table", catalog_options={"warehouse": "/path"})
 """
 
-from typing import Any, Dict, List, Optional
-
-import ray.data
+import importlib
+from typing import Any, Dict, List, Optional, TYPE_CHECKING
 
 from pypaimon.common.predicate import Predicate
 
+if TYPE_CHECKING:
+    import ray.data
+
+
+def _require_ray_data():
+    try:
+        return importlib.import_module("ray.data")
+    except ModuleNotFoundError as e:
+        if e.name not in ("ray", "ray.data"):
+            raise
+        raise ImportError(
+            "PyPaimon Ray APIs require the 'ray' package. "
+            "Install it with: pip install pypaimon[ray]"
+        ) from e
+
 
 def read_paimon(
     table_identifier: str,
@@ -46,7 +60,7 @@ def read_paimon(
     concurrency: Optional[int] = None,
     override_num_blocks: Optional[int] = None,
     **read_args,
-) -> ray.data.Dataset:
+) -> "ray.data.Dataset":
     """Read a Paimon table into a Ray Dataset.
 
     Args:
@@ -68,8 +82,11 @@ def read_paimon(
     Returns:
         A ``ray.data.Dataset`` containing the table data.
     """
+    ray_data = _require_ray_data()
+
     from pypaimon.read.datasource.ray_datasource import RayDatasource
     from pypaimon.read.datasource.split_provider import CatalogSplitProvider
+    from pypaimon.schema.data_types import PyarrowFieldParser
 
     if snapshot_id is not None and tag_name is not None:
         raise ValueError(
@@ -81,18 +98,29 @@ def read_paimon(
             "override_num_blocks must be at least 1, got 
{}".format(override_num_blocks)
         )
 
-    datasource = RayDatasource(
-        CatalogSplitProvider(
-            table_identifier=table_identifier,
-            catalog_options=catalog_options,
-            predicate=filter,
-            projection=projection,
-            limit=limit,
-            snapshot_id=snapshot_id,
-            tag_name=tag_name,
-        )
+    split_provider = CatalogSplitProvider(
+        table_identifier=table_identifier,
+        catalog_options=catalog_options,
+        predicate=filter,
+        projection=projection,
+        limit=limit,
+        snapshot_id=snapshot_id,
+        tag_name=tag_name,
     )
-    ds = ray.data.read_datasource(
+
+    if not split_provider.splits():
+        schema = PyarrowFieldParser.from_paimon_schema(
+            split_provider.read_type()
+        )
+        import pyarrow
+        empty_table = pyarrow.Table.from_arrays(
+            [pyarrow.array([], type=field.type) for field in schema],
+            schema=schema,
+        )
+        return ray_data.from_arrow(empty_table)
+
+    datasource = RayDatasource(split_provider)
+    ds = ray_data.read_datasource(
         datasource,
         ray_remote_args=ray_remote_args,
         concurrency=concurrency,
@@ -107,7 +135,7 @@ def read_paimon(
 
 
 def write_paimon(
-    dataset: ray.data.Dataset,
+    dataset: "ray.data.Dataset",
     table_identifier: str,
     catalog_options: Dict[str, str],
     *,
@@ -139,6 +167,8 @@ def write_paimon(
             preserves the legacy small-file optimization and its single
             group memory bound for HASH_FIXED primary-key tables.
     """
+    _require_ray_data()
+
     from pypaimon.catalog.catalog_factory import CatalogFactory
     from pypaimon.ray.shuffle import maybe_apply_repartition
     from pypaimon.write.ray_datasink import PaimonDatasink
diff --git a/paimon-python/pypaimon/tests/ray_integration_test.py 
b/paimon-python/pypaimon/tests/ray_integration_test.py
index 275d810dd3..225dc710d6 100644
--- a/paimon-python/pypaimon/tests/ray_integration_test.py
+++ b/paimon-python/pypaimon/tests/ray_integration_test.py
@@ -19,6 +19,7 @@ import os
 import shutil
 import tempfile
 import unittest
+from unittest.mock import patch
 
 import pyarrow as pa
 import ray
@@ -183,10 +184,13 @@ class RayIntegrationTest(unittest.TestCase):
         self.assertLess(limited_count, 10)
 
     def test_read_paimon_empty_table(self):
-        """read_paimon() on a table with no data returns an empty dataset."""
+        """read_paimon() on an empty table preserves the table schema."""
         from pypaimon.ray import read_paimon
 
-        pa_schema = pa.schema([('id', pa.int32())])
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+        ])
         identifier = 'default.test_read_empty'
         catalog = CatalogFactory.create(self.catalog_options)
         schema = Schema.from_pyarrow_schema(pa_schema)
@@ -194,6 +198,40 @@ class RayIntegrationTest(unittest.TestCase):
 
         ds = read_paimon(identifier, self.catalog_options)
         self.assertEqual(ds.count(), 0)
+        self.assertEqual(ds.schema().names, ['id', 'name'])
+
+    def test_read_paimon_empty_table_with_projection(self):
+        """read_paimon() applies projection to empty table schemas."""
+        from pypaimon.ray import read_paimon
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('value', pa.int64()),
+        ])
+        identifier = 'default.test_read_empty_projection'
+        catalog = CatalogFactory.create(self.catalog_options)
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table(identifier, schema, False)
+
+        ds = read_paimon(
+            identifier, self.catalog_options, projection=['id', 'value']
+        )
+        self.assertEqual(ds.count(), 0)
+        self.assertEqual(ds.schema().names, ['id', 'value'])
+
+    def test_missing_ray_dependency_has_install_hint(self):
+        """Ray facade surfaces an actionable install hint when Ray is 
absent."""
+        from pypaimon.ray.ray_paimon import _require_ray_data
+
+        error = ModuleNotFoundError("No module named 'ray'")
+        error.name = 'ray'
+
+        with patch('importlib.import_module', side_effect=error):
+            with self.assertRaises(ImportError) as ctx:
+                _require_ray_data()
+
+        self.assertIn('pip install pypaimon[ray]', str(ctx.exception))
 
     def test_read_paimon_with_snapshot_id(self):
         """read_paimon(snapshot_id=N) time-travels to that snapshot."""

Reply via email to