This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3eb5a4da5f [python][ray] Preserve schema for empty reads (#8118)
3eb5a4da5f is described below
commit 3eb5a4da5f631f4dff661dcae500f32efbd4309f
Author: QuakeWang <[email protected]>
AuthorDate: Thu Jun 4 16:36:30 2026 +0800
[python][ray] Preserve schema for empty reads (#8118)
The top-level Ray `read_paimon` API planned reads through
`RayDatasource`. When a table scan produced no splits,
`RayDatasource.get_read_tasks()` returned no read tasks, so Ray could
create an empty dataset without the Paimon table schema.
This was inconsistent with `TableRead.to_ray()`, which already returns
an empty Arrow-backed Ray dataset with the planned read schema.
This PR makes `read_paimon` use the planned `read_type` to build an
empty Arrow table when there are no splits, so empty reads preserve
schema and projection. It also lazily imports `ray.data` and reports an
actionable `pypaimon[ray]` install hint when Ray is missing.
---
paimon-python/pypaimon/ray/ray_paimon.py | 62 ++++++++++++++++------
.../pypaimon/tests/ray_integration_test.py | 42 ++++++++++++++-
2 files changed, 86 insertions(+), 18 deletions(-)
diff --git a/paimon-python/pypaimon/ray/ray_paimon.py
b/paimon-python/pypaimon/ray/ray_paimon.py
index 3acbe91ace..0e796dd7c2 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -26,12 +26,26 @@ Usage::
write_paimon(ds, "db.table", catalog_options={"warehouse": "/path"})
"""
-from typing import Any, Dict, List, Optional
-
-import ray.data
+import importlib
+from typing import Any, Dict, List, Optional, TYPE_CHECKING
from pypaimon.common.predicate import Predicate
+if TYPE_CHECKING:
+ import ray.data
+
+
+def _require_ray_data():
+ try:
+ return importlib.import_module("ray.data")
+ except ModuleNotFoundError as e:
+ if e.name not in ("ray", "ray.data"):
+ raise
+ raise ImportError(
+ "PyPaimon Ray APIs require the 'ray' package. "
+ "Install it with: pip install pypaimon[ray]"
+ ) from e
+
def read_paimon(
table_identifier: str,
@@ -46,7 +60,7 @@ def read_paimon(
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
**read_args,
-) -> ray.data.Dataset:
+) -> "ray.data.Dataset":
"""Read a Paimon table into a Ray Dataset.
Args:
@@ -68,8 +82,11 @@ def read_paimon(
Returns:
A ``ray.data.Dataset`` containing the table data.
"""
+ ray_data = _require_ray_data()
+
from pypaimon.read.datasource.ray_datasource import RayDatasource
from pypaimon.read.datasource.split_provider import CatalogSplitProvider
+ from pypaimon.schema.data_types import PyarrowFieldParser
if snapshot_id is not None and tag_name is not None:
raise ValueError(
@@ -81,18 +98,29 @@ def read_paimon(
"override_num_blocks must be at least 1, got
{}".format(override_num_blocks)
)
- datasource = RayDatasource(
- CatalogSplitProvider(
- table_identifier=table_identifier,
- catalog_options=catalog_options,
- predicate=filter,
- projection=projection,
- limit=limit,
- snapshot_id=snapshot_id,
- tag_name=tag_name,
- )
+ split_provider = CatalogSplitProvider(
+ table_identifier=table_identifier,
+ catalog_options=catalog_options,
+ predicate=filter,
+ projection=projection,
+ limit=limit,
+ snapshot_id=snapshot_id,
+ tag_name=tag_name,
)
- ds = ray.data.read_datasource(
+
+ if not split_provider.splits():
+ schema = PyarrowFieldParser.from_paimon_schema(
+ split_provider.read_type()
+ )
+ import pyarrow
+ empty_table = pyarrow.Table.from_arrays(
+ [pyarrow.array([], type=field.type) for field in schema],
+ schema=schema,
+ )
+ return ray_data.from_arrow(empty_table)
+
+ datasource = RayDatasource(split_provider)
+ ds = ray_data.read_datasource(
datasource,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
@@ -107,7 +135,7 @@ def read_paimon(
def write_paimon(
- dataset: ray.data.Dataset,
+ dataset: "ray.data.Dataset",
table_identifier: str,
catalog_options: Dict[str, str],
*,
@@ -139,6 +167,8 @@ def write_paimon(
preserves the legacy small-file optimization and its single
group memory bound for HASH_FIXED primary-key tables.
"""
+ _require_ray_data()
+
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.ray.shuffle import maybe_apply_repartition
from pypaimon.write.ray_datasink import PaimonDatasink
diff --git a/paimon-python/pypaimon/tests/ray_integration_test.py
b/paimon-python/pypaimon/tests/ray_integration_test.py
index 275d810dd3..225dc710d6 100644
--- a/paimon-python/pypaimon/tests/ray_integration_test.py
+++ b/paimon-python/pypaimon/tests/ray_integration_test.py
@@ -19,6 +19,7 @@ import os
import shutil
import tempfile
import unittest
+from unittest.mock import patch
import pyarrow as pa
import ray
@@ -183,10 +184,13 @@ class RayIntegrationTest(unittest.TestCase):
self.assertLess(limited_count, 10)
def test_read_paimon_empty_table(self):
- """read_paimon() on a table with no data returns an empty dataset."""
+ """read_paimon() on an empty table preserves the table schema."""
from pypaimon.ray import read_paimon
- pa_schema = pa.schema([('id', pa.int32())])
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
identifier = 'default.test_read_empty'
catalog = CatalogFactory.create(self.catalog_options)
schema = Schema.from_pyarrow_schema(pa_schema)
@@ -194,6 +198,40 @@ class RayIntegrationTest(unittest.TestCase):
ds = read_paimon(identifier, self.catalog_options)
self.assertEqual(ds.count(), 0)
+ self.assertEqual(ds.schema().names, ['id', 'name'])
+
+ def test_read_paimon_empty_table_with_projection(self):
+ """read_paimon() applies projection to empty table schemas."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('value', pa.int64()),
+ ])
+ identifier = 'default.test_read_empty_projection'
+ catalog = CatalogFactory.create(self.catalog_options)
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table(identifier, schema, False)
+
+ ds = read_paimon(
+ identifier, self.catalog_options, projection=['id', 'value']
+ )
+ self.assertEqual(ds.count(), 0)
+ self.assertEqual(ds.schema().names, ['id', 'value'])
+
+ def test_missing_ray_dependency_has_install_hint(self):
+ """Ray facade surfaces an actionable install hint when Ray is
absent."""
+ from pypaimon.ray.ray_paimon import _require_ray_data
+
+ error = ModuleNotFoundError("No module named 'ray'")
+ error.name = 'ray'
+
+ with patch('importlib.import_module', side_effect=error):
+ with self.assertRaises(ImportError) as ctx:
+ _require_ray_data()
+
+ self.assertIn('pip install pypaimon[ray]', str(ctx.exception))
def test_read_paimon_with_snapshot_id(self):
"""read_paimon(snapshot_id=N) time-travels to that snapshot."""