This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eee080d0d0 [python] Add self-contained Ray datasource and top-level
read_paimon/write_paimon API (#7740)
eee080d0d0 is described below
commit eee080d0d0b206474f9f37d510721cfad3feb07a
Author: chaoyang <[email protected]>
AuthorDate: Sat May 9 09:09:20 2026 +0800
[python] Add self-contained Ray datasource and top-level
read_paimon/write_paimon API (#7740)
Today, reading a Paimon table into a Ray Dataset requires the caller to
first build a TableRead by hand:
```python
catalog = CatalogFactory.create({"warehouse": ...})
table = catalog.get_table("db.table")
rb = table.new_read_builder()
read = rb.new_read()
splits = rb.new_scan().plan().splits()
ds = read.to_ray(splits)
```
That works for the existing `TableRead.to_ray()` helper, but it forces
every user to repeat the same catalog → table → builder boilerplate. The
Iceberg integration has long had a single-line
`IcebergDatasource(table_identifier, catalog_options, ...)`, and that's
the missing surface here.
This PR makes `RayDatasource` self-contained and adds a top-level facade
so reading and writing a Paimon table from Ray is one call.
---
docs/content/pypaimon/ray-data.md | 177 ++++++++++---
paimon-python/pypaimon/ray/__init__.py | 21 ++
paimon-python/pypaimon/ray/ray_paimon.py | 124 +++++++++
.../pypaimon/read/datasource/ray_datasource.py | 83 +++---
.../pypaimon/read/datasource/split_provider.py | 164 ++++++++++++
paimon-python/pypaimon/read/table_read.py | 11 +-
.../pypaimon/tests/ray_integration_test.py | 291 +++++++++++++++++++++
.../pypaimon/tests/split_provider_test.py | 178 +++++++++++++
8 files changed, 973 insertions(+), 76 deletions(-)
diff --git a/docs/content/pypaimon/ray-data.md
b/docs/content/pypaimon/ray-data.md
index 2cc728756a..4e049248de 100644
--- a/docs/content/pypaimon/ray-data.md
+++ b/docs/content/pypaimon/ray-data.md
@@ -27,15 +27,26 @@ under the License.
# Ray Data
-## Read
-
This requires `ray` to be installed.
-You can convert the splits into a Ray Dataset and handle it by Ray Data API
for distributed processing:
+`pypaimon.ray` exposes a top-level `read_paimon` / `write_paimon` facade that
+takes a table identifier and catalog options directly, mirroring the shape of
+Ray's built-in Iceberg integration. The lower-level `TableRead.to_ray()` and
+`TableWrite.write_ray()` entry points remain available for callers that have
+already resolved a `(read_builder, splits)` pair or constructed a
+`table_write` via the regular pypaimon API.
+
+## Read
+
+### `read_paimon` (recommended)
```python
-table_read = read_builder.new_read()
-ray_dataset = table_read.to_ray(splits)
+from pypaimon.ray import read_paimon
+
+ray_dataset = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+)
print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
@@ -52,65 +63,154 @@ print(ray_dataset.to_pandas())
# ...
```
-The `to_ray()` method supports Ray Data API parameters for distributed
processing:
+`read_paimon` opens its own catalog and resolves the table, so it is the
+single-call equivalent of the four-step `CatalogFactory.create → get_table →
+new_read_builder → to_ray` boilerplate.
+
+**Projection and limit:**
```python
-# Basic usage
-ray_dataset = table_read.to_ray(splits)
+ray_dataset = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ projection=["id", "score"],
+ limit=1000,
+)
+```
-# Specify number of output blocks
-ray_dataset = table_read.to_ray(splits, override_num_blocks=4)
+**Distribution / scheduling:**
-# Configure Ray remote arguments
+```python
+ray_dataset = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ override_num_blocks=4,
+ ray_remote_args={"num_cpus": 2, "max_retries": 3},
+ concurrency=8,
+)
+```
+
+**Parameters:**
+- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
+- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
+ e.g. `{"warehouse": "/path/to/warehouse"}`.
+- `filter`: optional `Predicate` to push down into the scan.
+- `projection`: optional list of column names to read.
+- `limit`: optional row limit applied at scan planning time.
+- `override_num_blocks`: optional override for the number of output blocks.
+ Must be `>= 1`.
+- `ray_remote_args`: optional kwargs passed to `ray.remote()` in read tasks
+ (e.g. `{"num_cpus": 2, "max_retries": 3}`).
+- `concurrency`: optional max number of Ray tasks to run concurrently.
+- `**read_args`: additional kwargs forwarded to `ray.data.read_datasource`
+ (e.g. `per_task_row_limit` in Ray 2.52.0+).
+
+### `TableRead.to_ray()` (lower-level)
+
+If you already have a `read_builder` and `splits`, you can convert them to a
+Ray Dataset directly:
+
+```python
+table_read = read_builder.new_read()
+splits = read_builder.new_scan().plan().splits()
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=4,
- ray_remote_args={"num_cpus": 2, "max_retries": 3}
+ ray_remote_args={"num_cpus": 2, "max_retries": 3},
)
-
-# Use Ray Data operations
-mapped_dataset = ray_dataset.map(lambda row: {'value': row['value'] * 2})
-filtered_dataset = ray_dataset.filter(lambda row: row['score'] > 80)
-df = ray_dataset.to_pandas()
```
-**Parameters:**
-- `override_num_blocks`: Optional override for the number of output blocks. By
default,
- Ray automatically determines the optimal number.
-- `ray_remote_args`: Optional kwargs passed to `ray.remote()` in read tasks
- (e.g., `{"num_cpus": 2, "max_retries": 3}`).
-- `concurrency`: Optional max number of Ray tasks to run concurrently. By
default,
- dynamically decided based on available resources.
-- `**read_args`: Additional kwargs passed to the datasource (e.g.,
`per_task_row_limit`
- in Ray 2.52.0+).
+`to_ray()` accepts the same `override_num_blocks`, `ray_remote_args`,
+`concurrency`, and `**read_args` parameters as `read_paimon`.
-**Ray Block Size Configuration:**
+### Ray Block Size Configuration
-If you need to configure Ray's block size (e.g., when Paimon splits exceed
Ray's default
-128MB block size), set it before calling `to_ray()`:
+If you need to configure Ray's block size (e.g., when Paimon splits exceed
+Ray's default 128MB block size), set it on the `DataContext` before calling
+either `read_paimon` or `to_ray`:
```python
from ray.data import DataContext
ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB (default is 128MB)
-ray_dataset = table_read.to_ray(splits)
```
-See [Ray Data API
Documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)
for more details.
+See the [Ray Data API
documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)
+for more details.
## Write
+### `write_paimon` (recommended)
+
+```python
+import ray
+from pypaimon.ray import write_paimon
+
+ray_dataset = ray.data.read_json("/path/to/data.jsonl")
+
+write_paimon(
+ ray_dataset,
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+)
+```
+
+`write_paimon` opens its own catalog, resolves the table, and commits the
+write through Ray's Datasink API — there is no separate `prepare_commit` or
+`close` step to run.
+
+**Overwrite mode:**
+
+```python
+write_paimon(
+ ray_dataset,
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ overwrite=True,
+)
+```
+
+**Distribution / scheduling:**
+
```python
+write_paimon(
+ ray_dataset,
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ concurrency=4,
+ ray_remote_args={"num_cpus": 2},
+)
+```
+
+**Parameters:**
+- `dataset`: the Ray Dataset to write.
+- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
+- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
+- `overwrite`: if `True`, overwrite existing data in the table.
+- `concurrency`: optional max number of Ray write tasks to run concurrently.
+- `ray_remote_args`: optional kwargs passed to `ray.remote()` in write tasks
+ (e.g. `{"num_cpus": 2}`).
+
+### `TableWrite.write_ray()` (lower-level)
+
+If you have already constructed a `table_write` from a write builder, you can
+hand a Ray Dataset directly to it. `write_ray()` commits through the Ray
+Datasink API, so there is no `prepare_commit` / `commit` step to run for the
+Ray write itself — just close the writer when you are done with it:
+
+```python
+import ray
+
table = catalog.get_table('database_name.table_name')
-# 1. Create table write and commit
+# 1. Create table write and commit (commit is only needed for non-Ray writes
+# on the same table_write instance — see below).
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
-# 2 Write Ray Dataset (requires ray to be installed)
-import ray
+# 2. Write Ray Dataset
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
table_write.write_ray(ray_dataset, overwrite=False, concurrency=2)
# Parameters:
@@ -118,8 +218,6 @@ table_write.write_ray(ray_dataset, overwrite=False,
concurrency=2)
# - overwrite: Whether to overwrite existing data (default: False)
# - concurrency: Optional max number of concurrent Ray tasks
# - ray_remote_args: Optional kwargs passed to ray.remote() (e.g.,
{"num_cpus": 2})
-# Note: write_ray() handles commit internally through Ray Datasink API.
-# Skip steps 3-4 if using write_ray() - just close the writer.
# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only)
commit_messages = table_write.prepare_commit()
@@ -130,8 +228,11 @@ table_write.close()
table_commit.close()
```
-By default, the data will be appended to table. If you want to overwrite
table, you should use `TableWrite#overwrite`
-API:
+### Overwrite at builder level
+
+The recommended way to overwrite via `write_paimon` is the `overwrite=True`
+flag above. When using the lower-level builder API, you can also configure
+overwrite mode on the write builder itself:
```python
# overwrite whole table
diff --git a/paimon-python/pypaimon/ray/__init__.py
b/paimon-python/pypaimon/ray/__init__.py
new file mode 100644
index 0000000000..cb5307efd9
--- /dev/null
+++ b/paimon-python/pypaimon/ray/__init__.py
@@ -0,0 +1,21 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.ray.ray_paimon import read_paimon, write_paimon
+
+__all__ = ["read_paimon", "write_paimon"]
diff --git a/paimon-python/pypaimon/ray/ray_paimon.py
b/paimon-python/pypaimon/ray/ray_paimon.py
new file mode 100644
index 0000000000..5ea2d21096
--- /dev/null
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -0,0 +1,124 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Top-level API for reading and writing Paimon tables with Ray Datasets.
+
+Usage::
+
+ from pypaimon.ray import read_paimon, write_paimon
+
+ ds = read_paimon("db.table", catalog_options={"warehouse": "/path"})
+ write_paimon(ds, "db.table", catalog_options={"warehouse": "/path"})
+"""
+
+from typing import Any, Dict, List, Optional
+
+import ray.data
+
+from pypaimon.common.predicate import Predicate
+
+
+def read_paimon(
+ table_identifier: str,
+ catalog_options: Dict[str, str],
+ *,
+ filter: Optional[Predicate] = None,
+ projection: Optional[List[str]] = None,
+ limit: Optional[int] = None,
+ ray_remote_args: Optional[Dict[str, Any]] = None,
+ concurrency: Optional[int] = None,
+ override_num_blocks: Optional[int] = None,
+ **read_args,
+) -> ray.data.Dataset:
+ """Read a Paimon table into a Ray Dataset.
+
+ Args:
+ table_identifier: Full table name, e.g. ``"db_name.table_name"``.
+ catalog_options: Options passed to ``CatalogFactory.create()``,
+ e.g. ``{"warehouse": "/path/to/warehouse"}``.
+ filter: Optional predicate to push down into the scan.
+ projection: Optional list of column names to read.
+ limit: Optional row limit for the scan.
+ ray_remote_args: Optional kwargs passed to ``ray.remote`` in read
tasks.
+ concurrency: Optional max number of Ray read tasks to run concurrently.
+ override_num_blocks: Optional override for the number of output blocks.
+ **read_args: Additional kwargs forwarded to
``ray.data.read_datasource``.
+
+ Returns:
+ A ``ray.data.Dataset`` containing the table data.
+ """
+ from pypaimon.read.datasource.ray_datasource import RayDatasource
+ from pypaimon.read.datasource.split_provider import CatalogSplitProvider
+
+ if override_num_blocks is not None and override_num_blocks < 1:
+ raise ValueError(
+ "override_num_blocks must be at least 1, got
{}".format(override_num_blocks)
+ )
+
+ datasource = RayDatasource(
+ CatalogSplitProvider(
+ table_identifier=table_identifier,
+ catalog_options=catalog_options,
+ predicate=filter,
+ projection=projection,
+ limit=limit,
+ )
+ )
+ return ray.data.read_datasource(
+ datasource,
+ ray_remote_args=ray_remote_args,
+ concurrency=concurrency,
+ override_num_blocks=override_num_blocks,
+ **read_args,
+ )
+
+
+def write_paimon(
+ dataset: ray.data.Dataset,
+ table_identifier: str,
+ catalog_options: Dict[str, str],
+ *,
+ overwrite: bool = False,
+ concurrency: Optional[int] = None,
+ ray_remote_args: Optional[Dict[str, Any]] = None,
+) -> None:
+ """Write a Ray Dataset to a Paimon table.
+
+ Args:
+ dataset: The Ray Dataset to write.
+ table_identifier: Full table name, e.g. ``"db_name.table_name"``.
+ catalog_options: Options passed to ``CatalogFactory.create()``.
+ overwrite: If ``True``, overwrite existing data in the table.
+ concurrency: Optional max number of Ray write tasks to run
concurrently.
+ ray_remote_args: Optional kwargs passed to ``ray.remote`` in write
tasks.
+ """
+ from pypaimon.catalog.catalog_factory import CatalogFactory
+ from pypaimon.write.ray_datasink import PaimonDatasink
+
+ catalog = CatalogFactory.create(catalog_options)
+ table = catalog.get_table(table_identifier)
+
+ datasink = PaimonDatasink(table, overwrite=overwrite)
+
+ write_kwargs = {}
+ if ray_remote_args is not None:
+ write_kwargs["ray_remote_args"] = ray_remote_args
+ if concurrency is not None:
+ write_kwargs["concurrency"] = concurrency
+
+ dataset.write_datasink(datasink, **write_kwargs)
diff --git a/paimon-python/pypaimon/read/datasource/ray_datasource.py
b/paimon-python/pypaimon/read/datasource/ray_datasource.py
index 33ba6904e5..b08d4c1e50 100644
--- a/paimon-python/pypaimon/read/datasource/ray_datasource.py
+++ b/paimon-python/pypaimon/read/datasource/ray_datasource.py
@@ -22,15 +22,15 @@ import heapq
import itertools
import logging
from functools import partial
-from typing import List, Optional, Iterable
+from typing import Iterable, List, Optional
import pyarrow
from packaging.version import parse
import ray
from ray.data.datasource import Datasource
+from pypaimon.read.datasource.split_provider import SplitProvider
from pypaimon.read.split import Split
-from pypaimon.read.table_read import TableRead
from pypaimon.schema.data_types import PyarrowFieldParser
logger = logging.getLogger(__name__)
@@ -41,35 +41,45 @@ RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0" #
per_task_row_limit parameter introd
class RayDatasource(Datasource):
- """
- Ray Data Datasource implementation for reading Paimon tables.
+ """Ray Data ``Datasource`` implementation for reading Paimon tables.
+
+ Holds a :class:`SplitProvider` that supplies the four planning artefacts
+ needed to build read tasks (table, splits, read_type, predicate). Two
+ provider implementations exist today:
- This datasource enables distributed parallel reading of Paimon table
splits,
- allowing Ray to read multiple splits concurrently across the cluster.
+ * :class:`CatalogSplitProvider` — resolves a fully-qualified table
+ identifier through the catalog and runs the ``ReadBuilder`` plan.
+ Used by the public :func:`pypaimon.ray.read_paimon` facade.
+ * :class:`PreResolvedSplitProvider` — wraps an already-resolved
+ ``(table, splits, read_type, predicate)`` tuple. Used by the legacy
+ ``TableRead.to_ray()`` bridge to skip a second catalog round-trip.
+
+ Both providers are cheap to instantiate; they defer the catalog
+ round-trip and split planning until the first read.
"""
- def __init__(self, table_read: TableRead, splits: List[Split]):
- """
- Initialize PaimonDatasource.
+ def __init__(self, split_provider: SplitProvider):
+ """Initialize a RayDatasource.
Args:
- table_read: TableRead instance for reading data
- splits: List of splits to read
+ split_provider: The :class:`SplitProvider` that supplies the
+ table, splits, read_type, and predicate. Construct one with
+ :class:`CatalogSplitProvider` (from a table identifier +
+ catalog options) or :class:`PreResolvedSplitProvider` (from
+ an already-resolved ``TableRead``).
"""
- self.table_read = table_read
- self.splits = splits
+ self._split_provider = split_provider
self._schema = None
def get_name(self) -> str:
- identifier = self.table_read.table.identifier
- table_name = identifier.get_full_name() if hasattr(identifier,
'get_full_name') else str(identifier)
- return f"PaimonTable({table_name})"
+ return f"PaimonTable({self._split_provider.display_name()})"
def estimate_inmemory_data_size(self) -> Optional[int]:
- if not self.splits:
+ splits = self._split_provider.splits()
+ if not splits:
return 0
- total_size = sum(split.file_size for split in self.splits)
+ total_size = sum(split.file_size for split in splits)
return total_size if total_size > 0 else None
@staticmethod
@@ -108,22 +118,26 @@ class RayDatasource(Datasource):
if parallelism < 1:
raise ValueError(f"parallelism must be at least 1, got
{parallelism}")
+ # Pull provider state into locals once: avoids capturing self in the
+ # ReadTask closure (see ray-project/ray#49107) and amortises the
+ # provider-method dispatch over all chunks.
+ table = self._split_provider.table()
+ predicate = self._split_provider.predicate()
+ read_type = self._split_provider.read_type()
+ splits = self._split_provider.splits()
+ if not splits:
+ return []
+
if self._schema is None:
- self._schema =
PyarrowFieldParser.from_paimon_schema(self.table_read.read_type)
+ self._schema = PyarrowFieldParser.from_paimon_schema(read_type)
+ schema = self._schema
- if parallelism > len(self.splits):
- parallelism = len(self.splits)
+ if parallelism > len(splits):
+ parallelism = len(splits)
logger.warning(
f"Reducing the parallelism to {parallelism}, as that is the
number of splits"
)
- # Store necessary information for creating readers in Ray workers
- # Extract these to avoid serializing the entire self object in closures
- table = self.table_read.table
- predicate = self.table_read.predicate
- read_type = self.table_read.read_type
- schema = self._schema
-
# Create a partial function to avoid capturing self in closure
# This reduces serialization overhead (see
https://github.com/ray-project/ray/issues/49107)
def _get_read_task(
@@ -163,7 +177,7 @@ class RayDatasource(Datasource):
read_tasks = []
# Distribute splits across tasks using load balancing algorithm
- for chunk_splits in
self._distribute_splits_into_equal_chunks(self.splits, parallelism):
+ for chunk_splits in self._distribute_splits_into_equal_chunks(splits,
parallelism):
if not chunk_splits:
continue
@@ -174,14 +188,9 @@ class RayDatasource(Datasource):
for split in chunk_splits:
if predicate is None:
# Only estimate rows if no predicate (predicate filtering
changes row count)
- row_count = None
- if hasattr(split, 'merged_row_count'):
- merged_count = split.merged_row_count()
- if merged_count is not None:
- row_count = merged_count
- if row_count is None and hasattr(split, 'row_count') and
split.row_count > 0:
- row_count = split.row_count
- if row_count is not None and row_count > 0:
+ merged = split.merged_row_count()
+ row_count = merged if merged is not None else
split.row_count
+ if row_count > 0:
total_rows += row_count
if hasattr(split, 'file_size') and split.file_size > 0:
total_size += split.file_size
diff --git a/paimon-python/pypaimon/read/datasource/split_provider.py
b/paimon-python/pypaimon/read/datasource/split_provider.py
new file mode 100644
index 0000000000..491e8127d2
--- /dev/null
+++ b/paimon-python/pypaimon/read/datasource/split_provider.py
@@ -0,0 +1,164 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""SplitProvider abstraction used by ``RayDatasource``.
+
+The datasource only needs four things to build read tasks: the underlying
+table, the planned splits, the scan read type, and the optional predicate.
+``SplitProvider`` decouples how those four items are obtained so the same
+datasource can serve both the public ``read_paimon`` facade (which only has
+a table identifier + catalog options) and the legacy ``TableRead.to_ray()``
+bridge (which already has a fully resolved ``TableRead``).
+"""
+
+from abc import ABC, abstractmethod
+from typing import Dict, List, Optional
+
+from pypaimon.read.split import Split
+
+
+class SplitProvider(ABC):
+ """Source of the planning artefacts required by ``RayDatasource``."""
+
+ @abstractmethod
+ def table(self):
+ """Return the ``FileStoreTable`` to read."""
+
+ @abstractmethod
+ def splits(self) -> List[Split]:
+ """Return the planned splits."""
+
+ @abstractmethod
+ def read_type(self):
+ """Return the scan read type (row / record type)."""
+
+ @abstractmethod
+ def predicate(self):
+ """Return the scan-time predicate, or ``None``."""
+
+ @abstractmethod
+ def display_name(self) -> str:
+ """Return a short, human-readable name for the source.
+
+ Used by ``RayDatasource.get_name()`` so the datasource doesn't have
+ to peek at concrete provider types to format its name.
+ """
+
+
+class CatalogSplitProvider(SplitProvider):
+ """Plan splits from a fully-qualified table identifier and catalog options.
+
+ Resolves the catalog and the table lazily on first access, then runs a
+ single ``ReadBuilder`` plan to populate splits + read type together. The
+ same provider should be reused across calls — the planning is cached.
+ """
+
+ def __init__(
+ self,
+ table_identifier: str,
+ catalog_options: Dict[str, str],
+ predicate=None,
+ projection: Optional[List[str]] = None,
+ limit: Optional[int] = None,
+ ):
+ if not table_identifier:
+ raise ValueError("table_identifier is required")
+ if catalog_options is None:
+ raise ValueError("catalog_options is required")
+ self._table_identifier = table_identifier
+ self._catalog_options = catalog_options
+ self._predicate = predicate
+ self._projection = projection
+ self._limit = limit
+ self._table_cached = None
+ self._splits_cached = None
+ self._read_type_cached = None
+
+ def _ensure_table(self):
+ if self._table_cached is None:
+ from pypaimon.catalog.catalog_factory import CatalogFactory
+ catalog = CatalogFactory.create(self._catalog_options)
+ self._table_cached = catalog.get_table(self._table_identifier)
+ return self._table_cached
+
+ def _ensure_planned(self):
+ if self._splits_cached is not None and self._read_type_cached is not
None:
+ return
+ from pypaimon.read.read_builder import ReadBuilder
+ rb = ReadBuilder(self._ensure_table())
+ if self._predicate is not None:
+ rb = rb.with_filter(self._predicate)
+ if self._projection is not None:
+ rb = rb.with_projection(self._projection)
+ if self._limit is not None:
+ rb = rb.with_limit(self._limit)
+ self._read_type_cached = rb.read_type()
+ self._splits_cached = rb.new_scan().plan().splits()
+
+ @property
+ def table_identifier(self) -> str:
+ return self._table_identifier
+
+ def table(self):
+ return self._ensure_table()
+
+ def splits(self) -> List[Split]:
+ self._ensure_planned()
+ return self._splits_cached
+
+ def read_type(self):
+ self._ensure_planned()
+ return self._read_type_cached
+
+ def predicate(self):
+ return self._predicate
+
+ def display_name(self) -> str:
+ return self._table_identifier
+
+
+class PreResolvedSplitProvider(SplitProvider):
+ """Wrap an already-planned ``(table, splits, read_type, predicate)`` tuple.
+
+ Used by ``TableRead.to_ray()`` where the caller has already built a
+ ``TableRead`` and planned splits, so the catalog round-trip should be
+ skipped.
+ """
+
+ def __init__(self, table, splits: List[Split], read_type, predicate=None):
+ self._table = table
+ self._splits = splits
+ self._read_type = read_type
+ self._predicate = predicate
+
+ def table(self):
+ return self._table
+
+ def splits(self) -> List[Split]:
+ return self._splits
+
+ def read_type(self):
+ return self._read_type
+
+ def predicate(self):
+ return self._predicate
+
+ def display_name(self) -> str:
+ identifier = self._table.identifier
+ if hasattr(identifier, 'get_full_name'):
+ return identifier.get_full_name()
+ return str(identifier)
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 0aef1d3ca9..40cc337aaa 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -232,7 +232,16 @@ class TableRead:
raise ValueError(f"override_num_blocks must be at least 1, got
{override_num_blocks}")
from pypaimon.read.datasource.ray_datasource import RayDatasource
- datasource = RayDatasource(self, splits)
+ from pypaimon.read.datasource.split_provider import
PreResolvedSplitProvider
+
+ datasource = RayDatasource(
+ PreResolvedSplitProvider(
+ table=self.table,
+ splits=splits,
+ read_type=self.read_type,
+ predicate=self.predicate,
+ )
+ )
return ray.data.read_datasource(
datasource,
ray_remote_args=ray_remote_args,
diff --git a/paimon-python/pypaimon/tests/ray_integration_test.py
b/paimon-python/pypaimon/tests/ray_integration_test.py
new file mode 100644
index 0000000000..1b8e2df505
--- /dev/null
+++ b/paimon-python/pypaimon/tests/ray_integration_test.py
@@ -0,0 +1,291 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+import ray
+
+from pypaimon import CatalogFactory, Schema
+
+
+class RayIntegrationTest(unittest.TestCase):
+ """Tests for the top-level read_paimon() / write_paimon() API."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog_options = {'warehouse': cls.warehouse}
+
+ catalog = CatalogFactory.create(cls.catalog_options)
+ catalog.create_database('default', True)
+
+ if not ray.is_initialized():
+ ray.init(ignore_reinit_error=True, num_cpus=2)
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ if ray.is_initialized():
+ ray.shutdown()
+ except Exception:
+ pass
+ try:
+ shutil.rmtree(cls.tempdir)
+ except OSError:
+ pass
+
+ def _create_and_populate_table(self, table_name, pa_schema, data_dict,
+ primary_keys=None, partition_keys=None,
options=None):
+ """Helper to create a table and write a single batch of data."""
+ identifier = 'default.{}'.format(table_name)
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=primary_keys,
+ partition_keys=partition_keys,
+ options=options,
+ )
+ catalog = CatalogFactory.create(self.catalog_options)
+ catalog.create_table(identifier, schema, False)
+ table = catalog.get_table(identifier)
+
+ test_data = pa.Table.from_pydict(data_dict, schema=pa_schema)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ return identifier
+
+ def test_read_paimon_basic(self):
+ """read_paimon() reads back the data we wrote."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('value', pa.int64()),
+ ])
+ identifier = self._create_and_populate_table(
+ 'test_read_basic', pa_schema,
+ {'id': [1, 2, 3], 'name': ['a', 'b', 'c'], 'value': [10, 20, 30]},
+ )
+
+ ds = read_paimon(identifier, self.catalog_options,
override_num_blocks=1)
+ self.assertEqual(ds.count(), 3)
+
+ df = ds.to_pandas().sort_values('id').reset_index(drop=True)
+ self.assertEqual(list(df['id']), [1, 2, 3])
+ self.assertEqual(list(df['name']), ['a', 'b', 'c'])
+
+ def test_read_paimon_with_projection(self):
+ """read_paimon() respects column projection."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('value', pa.int64()),
+ ])
+ identifier = self._create_and_populate_table(
+ 'test_read_proj', pa_schema,
+ {'id': [1, 2], 'name': ['a', 'b'], 'value': [10, 20]},
+ )
+
+ ds = read_paimon(identifier, self.catalog_options, projection=['id',
'name'])
+ df = ds.to_pandas()
+ self.assertEqual(set(df.columns), {'id', 'name'})
+ self.assertEqual(len(df), 2)
+
+ def test_read_paimon_with_filter(self):
+ """read_paimon() pushes down a predicate filter."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('category', pa.string()),
+ ])
+ identifier = self._create_and_populate_table(
+ 'test_read_filter', pa_schema,
+ {'id': [1, 2, 3], 'category': ['A', 'B', 'A']},
+ )
+
+ catalog = CatalogFactory.create(self.catalog_options)
+ table = catalog.get_table(identifier)
+ pb = table.new_read_builder().new_predicate_builder()
+ predicate = pb.equal('category', 'A')
+
+ ds = read_paimon(identifier, self.catalog_options, filter=predicate)
+ self.assertEqual(ds.count(), 2)
+ df = ds.to_pandas()
+ self.assertEqual(set(df['category'].tolist()), {'A'})
+
+ def test_read_paimon_with_limit(self):
+ """``read_paimon(limit=N)`` propagates the limit into the scan plan.
+
+ Writes 10 rows across two partitions (5 + 5) so the scan produces two
+ raw-convertible splits. ``limit=3`` causes ``FileScanner`` to drop the
+ second split once the first already covers the limit, so the Ray
+ Dataset contains strictly fewer than the full 10 rows.
+
+ We assert ``< 10`` (not ``== N``) because Paimon's scan-time limit is
+ a per-split cap — whole-split granularity at this layer — not a
+ row-exact hard limit. Row-exact short-circuiting in the reader is a
+ separate follow-up.
+ """
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('part', pa.string()),
+ ('value', pa.string()),
+ ])
+ identifier = self._create_and_populate_table(
+ 'test_read_limit', pa_schema,
+ {
+ 'id': list(range(10)),
+ 'part': ['a'] * 5 + ['b'] * 5,
+ 'value': [str(i) for i in range(10)],
+ },
+ partition_keys=['part'],
+ )
+
+ # Sanity baseline: the full unbounded scan returns all 10 rows.
+ ds_full = read_paimon(identifier, self.catalog_options)
+ self.assertEqual(ds_full.count(), 10)
+
+ # With limit=3, the scan plan drops the second partition's split
+ # once the first split's row count already covers the limit.
+ ds = read_paimon(identifier, self.catalog_options, limit=3)
+ limited_count = ds.count()
+ self.assertGreater(limited_count, 0)
+ self.assertLess(limited_count, 10)
+
+ def test_read_paimon_empty_table(self):
+ """read_paimon() on a table with no data returns an empty dataset."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([('id', pa.int32())])
+ identifier = 'default.test_read_empty'
+ catalog = CatalogFactory.create(self.catalog_options)
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table(identifier, schema, False)
+
+ ds = read_paimon(identifier, self.catalog_options)
+ self.assertEqual(ds.count(), 0)
+
+ def test_write_paimon_basic(self):
+ """write_paimon() writes data that read_paimon() can round-trip."""
+ from pypaimon.ray import read_paimon, write_paimon
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+ identifier = 'default.test_write_basic'
+ catalog = CatalogFactory.create(self.catalog_options)
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table(identifier, schema, False)
+
+ source = pa.Table.from_pydict(
+ {'id': [1, 2, 3], 'name': ['x', 'y', 'z']}, schema=pa_schema,
+ )
+ ds = ray.data.from_arrow(source)
+ write_paimon(ds, identifier, self.catalog_options)
+
+ result = read_paimon(identifier, self.catalog_options)
+ self.assertEqual(result.count(), 3)
+ df = result.to_pandas().sort_values('id').reset_index(drop=True)
+ self.assertEqual(list(df['name']), ['x', 'y', 'z'])
+
+ def test_write_paimon_overwrite(self):
+ """write_paimon(overwrite=True) replaces existing data."""
+ from pypaimon.ray import read_paimon, write_paimon
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('val', pa.int64()),
+ ])
+ identifier = 'default.test_write_overwrite'
+ catalog = CatalogFactory.create(self.catalog_options)
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table(identifier, schema, False)
+
+ ds1 = ray.data.from_arrow(
+ pa.Table.from_pydict({'id': [1, 2], 'val': [10, 20]},
schema=pa_schema)
+ )
+ write_paimon(ds1, identifier, self.catalog_options)
+
+ ds2 = ray.data.from_arrow(
+ pa.Table.from_pydict({'id': [3], 'val': [30]}, schema=pa_schema)
+ )
+ write_paimon(ds2, identifier, self.catalog_options, overwrite=True)
+
+ result = read_paimon(identifier, self.catalog_options)
+ self.assertEqual(result.count(), 1)
+ df = result.to_pandas()
+ self.assertEqual(list(df['id']), [3])
+
+ def test_read_paimon_primary_key(self):
+ """read_paimon() merges PK rows correctly after an upsert."""
+ from pypaimon.ray import read_paimon
+
+ pa_schema = pa.schema([
+ pa.field('id', pa.int32(), nullable=False),
+ ('name', pa.string()),
+ ])
+ identifier = self._create_and_populate_table(
+ 'test_read_pk', pa_schema,
+ {'id': [1, 2, 3], 'name': ['a', 'b', 'c']},
+ primary_keys=['id'],
+ options={'bucket': '2'},
+ )
+
+ catalog = CatalogFactory.create(self.catalog_options)
+ table = catalog.get_table(identifier)
+ update = pa.Table.from_pydict({'id': [1, 4], 'name': ['a2', 'd']},
schema=pa_schema)
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(update)
+ msgs = w.prepare_commit()
+ wb.new_commit().commit(msgs)
+ w.close()
+
+ ds = read_paimon(identifier, self.catalog_options)
+ self.assertEqual(ds.count(), 4)
+ df = ds.to_pandas().sort_values('id').reset_index(drop=True)
+ self.assertEqual(list(df['name']), ['a2', 'b', 'c', 'd'])
+
+ def test_read_paimon_invalid_override_num_blocks(self):
+ """override_num_blocks below 1 is rejected with a clear error."""
+ from pypaimon.ray import read_paimon
+
+ with self.assertRaises(ValueError):
+ read_paimon('default.does_not_matter', self.catalog_options,
+ override_num_blocks=0)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/split_provider_test.py
b/paimon-python/pypaimon/tests/split_provider_test.py
new file mode 100644
index 0000000000..31152f28a6
--- /dev/null
+++ b/paimon-python/pypaimon/tests/split_provider_test.py
@@ -0,0 +1,178 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.read.datasource.split_provider import (
+ CatalogSplitProvider,
+ PreResolvedSplitProvider,
+)
+
+
+class SplitProviderTest(unittest.TestCase):
+ """Unit tests for the two SplitProvider implementations."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog_options = {'warehouse': cls.warehouse}
+
+ catalog = CatalogFactory.create(cls.catalog_options)
+ catalog.create_database('default', True)
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+ cls.identifier = 'default.split_provider_test'
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table(cls.identifier, schema, False)
+ table = catalog.get_table(cls.identifier)
+
+ data = pa.Table.from_pydict(
+ {'id': [1, 2, 3], 'name': ['a', 'b', 'c']}, schema=pa_schema
+ )
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(data)
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ shutil.rmtree(cls.tempdir)
+ except OSError:
+ pass
+
+ def test_catalog_provider_resolves_table_and_splits(self):
+ """CatalogSplitProvider does the catalog→table→ReadBuilder→Scan dance
lazily."""
+ provider = CatalogSplitProvider(
+ table_identifier=self.identifier,
+ catalog_options=self.catalog_options,
+ )
+
+ self.assertIsNone(provider._table_cached)
+ self.assertIsNone(provider._splits_cached)
+ self.assertIsNone(provider._read_type_cached)
+
+ table = provider.table()
+ self.assertIsNotNone(table)
+ self.assertIs(provider.table(), table) # cached
+
+ splits = provider.splits()
+ self.assertGreater(len(splits), 0)
+ self.assertIs(provider.splits(), splits) # cached
+ self.assertIsNotNone(provider.read_type())
+ self.assertIsNone(provider.predicate())
+
+ def test_catalog_provider_propagates_projection(self):
+ """``projection`` reaches ``ReadBuilder.with_projection`` (visible via
read_type)."""
+ provider = CatalogSplitProvider(
+ table_identifier=self.identifier,
+ catalog_options=self.catalog_options,
+ projection=['id'],
+ )
+
+ read_type = provider.read_type()
+ field_names = [f.name for f in read_type]
+ self.assertEqual(field_names, ['id'])
+
+ def test_catalog_provider_propagates_predicate(self):
+ """``predicate`` is held on the provider and surfaced via
predicate()."""
+ catalog = CatalogFactory.create(self.catalog_options)
+ table = catalog.get_table(self.identifier)
+ pb = table.new_read_builder().new_predicate_builder()
+ pred = pb.equal('id', 2)
+
+ provider = CatalogSplitProvider(
+ table_identifier=self.identifier,
+ catalog_options=self.catalog_options,
+ predicate=pred,
+ )
+
+ self.assertIs(provider.predicate(), pred)
+
+ def test_catalog_provider_propagates_limit(self):
+ """``limit`` reaches ``ReadBuilder.with_limit``: splits are pruned once
+ the per-split row budget is met. Uses a fresh partitioned table so
+ each commit produces its own split."""
+ pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+ identifier = 'default.split_provider_limit'
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['id'])
+ catalog = CatalogFactory.create(self.catalog_options)
+ catalog.create_table(identifier, schema, False)
+ table = catalog.get_table(identifier)
+ for i in range(3):
+ data = pa.Table.from_pydict({'id': [i], 'name': [f'r{i}']},
schema=pa_schema)
+ wb = table.new_batch_write_builder()
+ writer = wb.new_write()
+ writer.write_arrow(data)
+ wb.new_commit().commit(writer.prepare_commit())
+ writer.close()
+
+ unlimited = CatalogSplitProvider(
+ table_identifier=identifier, catalog_options=self.catalog_options,
+ )
+ limited = CatalogSplitProvider(
+ table_identifier=identifier, catalog_options=self.catalog_options,
+ limit=1,
+ )
+
+ # Three single-row commits → three splits; limit=1 prunes after the
+ # first split meets the budget.
+ self.assertEqual(len(unlimited.splits()), 3)
+ self.assertLess(len(limited.splits()), len(unlimited.splits()))
+
+ def test_catalog_provider_requires_identifier_and_options(self):
+ with self.assertRaises(ValueError):
+ CatalogSplitProvider(
+ table_identifier='', catalog_options=self.catalog_options
+ )
+ with self.assertRaises(ValueError):
+ CatalogSplitProvider(
+ table_identifier=self.identifier, catalog_options=None
+ )
+
+ def test_pre_resolved_provider_returns_inputs(self):
+ """PreResolvedSplitProvider just hands back what it was given."""
+ catalog = CatalogFactory.create(self.catalog_options)
+ table = catalog.get_table(self.identifier)
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ read_type = rb.read_type()
+
+ provider = PreResolvedSplitProvider(
+ table=table, splits=splits, read_type=read_type, predicate=None
+ )
+
+ self.assertIs(provider.table(), table)
+ self.assertIs(provider.splits(), splits)
+ self.assertIs(provider.read_type(), read_type)
+ self.assertIsNone(provider.predicate())
+
+
+if __name__ == '__main__':
+ unittest.main()