This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 76412d0e3e [python][daft] Add daft on ray e2e test and documentation
(#8173)
76412d0e3e is described below
commit 76412d0e3e6d3dbc0b7b3d956052f372ca67d6bc
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Jun 9 12:10:47 2026 +0800
[python][daft] Add daft on ray e2e test and documentation (#8173)
---
docs/docs/pypaimon/daft.md | 56 ++++-
docs/docs/pypaimon/ray-data.md | 3 +
.../pypaimon/tests/daft/daft_on_ray_e2e_test.py | 225 +++++++++++++++++++++
3 files changed, 283 insertions(+), 1 deletion(-)
diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md
index ae158c3283..d02590d155 100644
--- a/docs/docs/pypaimon/daft.md
+++ b/docs/docs/pypaimon/daft.md
@@ -26,11 +26,18 @@ under the License.
# Daft
[Daft](https://www.daft.io/) is a distributed DataFrame engine for Python.
+See also the [Daft Paimon connector
documentation](https://docs.daft.ai/en/stable/connectors/paimon/).
This requires `daft` to be installed:
```bash
-pip install pypaimon[daft]
+pip install 'pypaimon[daft]'
+```
+
+To execute Daft plans on Ray, install both extras:
+
+```bash
+pip install 'pypaimon[daft,ray]'
```
`pypaimon.daft` exposes a top-level `read_paimon` / `write_paimon` API that
@@ -165,12 +172,59 @@ write_paimon(
)
```
+For unpartitioned tables, overwrite replaces the table contents. For
+partitioned tables, overwrite follows Paimon's dynamic partition overwrite
+semantics by default: only partitions present in the input DataFrame are
+replaced, and existing partitions not present in the input are kept.
+
**Parameters:**
- `df`: the Daft DataFrame to write.
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
- `mode`: write mode — `"append"` (default) or `"overwrite"`.
+## Running Daft on Ray
+
+`pypaimon.daft` works with Daft's Ray runner. Configure the runner before the
+first Daft execution in the process:
+
+```python
+import daft
+import ray
+from daft import runners
+from pypaimon.daft import read_paimon, write_paimon
+
+ray.init() # use address="auto" to connect to an existing Ray cluster
+runners.set_runner_ray()
+
+df = daft.from_pydict({
+ "id": [1, 2, 3],
+ "name": ["alice", "bob", "charlie"],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-02"],
+})
+
+write_paimon(
+ df,
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+)
+
+result = (
+ read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ )
+ .where(daft.col("dt") == "2024-01-01")
+ .select("id", "name")
+)
+
+result.show()
+```
+
+Use `pypaimon.daft` when your application is written with Daft DataFrames and
+you want Daft to schedule the execution on Ray. Use `pypaimon.ray` instead when
+your application directly reads or writes Ray Datasets.
+
## Catalog Abstraction
Paimon catalogs can integrate with Daft's unified `Catalog` / `Table`
interfaces:
diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index a6d98f8fcc..03364aa743 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -34,6 +34,9 @@ Ray's built-in Iceberg integration. The lower-level
`TableRead.to_ray()` and
already resolved a `(read_builder, splits)` pair or constructed a
`table_write` via the regular pypaimon API.
+If your application uses Daft DataFrames and only needs Ray as Daft's execution
+backend, see [Running Daft on Ray](./daft#running-daft-on-ray).
+
## Read
### `read_paimon` (recommended)
diff --git a/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py
b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py
new file mode 100644
index 0000000000..5dd1bcf3fd
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py
@@ -0,0 +1,225 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for pypaimon.daft with Daft's Ray runner.
+
+Daft runners are process-global and cannot be switched after initialization.
+Run the Ray-runner scenario in a fresh Python subprocess.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import os
+import subprocess
+import sys
+import textwrap
+
+import pytest
+
+pytestmark = pytest.mark.skipif(
+ importlib.util.find_spec("daft") is None
+ or importlib.util.find_spec("ray") is None,
+ reason="Daft-on-Ray e2e requires both daft and ray",
+)
+
+
+def test_daft_on_ray_read_write_e2e():
+ python_root = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), "..", "..", "..")
+ )
+ script = textwrap.dedent(
+ r'''
+ import os
+ import shutil
+ import tempfile
+
+ os.environ.setdefault("RAY_TMPDIR", "/tmp")
+
+ import daft
+ import pyarrow as pa
+ import ray
+ from daft import runners
+ from pypaimon import CatalogFactory, Schema
+ from pypaimon.daft import PaimonCatalog, read_paimon, write_paimon
+
+ root = tempfile.mkdtemp(prefix="paimon-daft-on-ray-")
+ try:
+ warehouse = os.path.join(root, "warehouse")
+ catalog_options = {"warehouse": warehouse}
+ catalog = CatalogFactory.create(catalog_options)
+ catalog.create_database("test_db", False)
+
+ ray.init(
+ num_cpus=2,
+ include_dashboard=False,
+ ignore_reinit_error=True,
+ )
+ runners.set_runner_ray()
+ assert runners.get_or_create_runner().name == "ray"
+
+ schema = Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("dt", pa.string()),
+ ]),
+ partition_keys=["dt"],
+ options={"file.format": "parquet"},
+ )
+ catalog.create_table("test_db.t", schema, False)
+
+ df = daft.from_pydict({
+ "id": [3, 1, 2, 4],
+ "name": ["c", "a", "b", "d"],
+ "dt": [
+ "2026-06-07",
+ "2026-06-07",
+ "2026-06-07",
+ "2026-06-08",
+ ],
+ }).into_partitions(2)
+
+ summary = write_paimon(
+ df,
+ "test_db.t",
+ catalog_options=catalog_options,
+ ).to_arrow()
+ assert sum(summary.column("rows").to_pylist()) == 4
+
+ result = (
+ read_paimon("test_db.t", catalog_options=catalog_options)
+ .where(daft.col("dt") == "2026-06-07")
+ .select("id", "name")
+ .sort("id")
+ .to_arrow()
+ )
+ assert result.to_pydict() == {
+ "id": [1, 2, 3],
+ "name": ["a", "b", "c"],
+ }
+
+ write_paimon(
+ daft.from_pydict({
+ "id": [10],
+ "name": ["z"],
+ "dt": ["2026-06-07"],
+ }),
+ "test_db.t",
+ catalog_options=catalog_options,
+ mode="overwrite",
+ ).to_arrow()
+ same_partition = (
+ read_paimon("test_db.t", catalog_options=catalog_options)
+ .sort("id")
+ .to_arrow()
+ )
+ assert same_partition.to_pydict() == {
+ "id": [4, 10],
+ "name": ["d", "z"],
+ "dt": ["2026-06-08", "2026-06-07"],
+ }
+
+ write_paimon(
+ daft.from_pydict({
+ "id": [20],
+ "name": ["new"],
+ "dt": ["2026-06-09"],
+ }),
+ "test_db.t",
+ catalog_options=catalog_options,
+ mode="overwrite",
+ ).to_arrow()
+ new_partition = (
+ read_paimon("test_db.t", catalog_options=catalog_options)
+ .sort("id")
+ .to_arrow()
+ )
+ assert new_partition.to_pydict() == {
+ "id": [4, 10, 20],
+ "name": ["d", "z", "new"],
+ "dt": ["2026-06-08", "2026-06-07", "2026-06-09"],
+ }
+
+ unpartitioned_schema = Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ ]),
+ options={"file.format": "parquet"},
+ )
+ catalog.create_table(
+ "test_db.unpartitioned",
+ unpartitioned_schema,
+ False,
+ )
+ write_paimon(
+ daft.from_pydict({"id": [1, 2], "name": ["a", "b"]}),
+ "test_db.unpartitioned",
+ catalog_options=catalog_options,
+ ).to_arrow()
+ write_paimon(
+ daft.from_pydict({"id": [30], "name": ["only"]}),
+ "test_db.unpartitioned",
+ catalog_options=catalog_options,
+ mode="overwrite",
+ ).to_arrow()
+ unpartitioned = read_paimon(
+ "test_db.unpartitioned",
+ catalog_options=catalog_options,
+ ).to_arrow()
+ assert unpartitioned.to_pydict() == {
+ "id": [30],
+ "name": ["only"],
+ }
+
+ daft_catalog = PaimonCatalog(
+ CatalogFactory.create(catalog_options),
+ name="paimon",
+ )
+ wrapper_result = (
+ daft_catalog.get_table("test_db.unpartitioned")
+ .read()
+ .to_arrow()
+ )
+ assert wrapper_result.to_pydict() == {
+ "id": [30],
+ "name": ["only"],
+ }
+ finally:
+ if ray.is_initialized():
+ ray.shutdown()
+ shutil.rmtree(root, ignore_errors=True)
+ '''
+ )
+
+ env = os.environ.copy()
+ env["PYTHONPATH"] = os.pathsep.join(
+ [python_root, env.get("PYTHONPATH", "")]
+ ).rstrip(os.pathsep)
+ result = subprocess.run(
+ [sys.executable, "-c", script],
+ capture_output=True,
+ env=env,
+ text=True,
+ timeout=120,
+ )
+ assert result.returncode == 0, (
+ f"stdout:\n{result.stdout}\n"
+ f"stderr:\n{result.stderr}"
+ )