This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 76412d0e3e [python][daft] Add daft on ray e2e test and documentation 
(#8173)
76412d0e3e is described below

commit 76412d0e3e6d3dbc0b7b3d956052f372ca67d6bc
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Jun 9 12:10:47 2026 +0800

    [python][daft] Add daft on ray e2e test and documentation (#8173)
---
 docs/docs/pypaimon/daft.md                         |  56 ++++-
 docs/docs/pypaimon/ray-data.md                     |   3 +
 .../pypaimon/tests/daft/daft_on_ray_e2e_test.py    | 225 +++++++++++++++++++++
 3 files changed, 283 insertions(+), 1 deletion(-)

diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md
index ae158c3283..d02590d155 100644
--- a/docs/docs/pypaimon/daft.md
+++ b/docs/docs/pypaimon/daft.md
@@ -26,11 +26,18 @@ under the License.
 # Daft
 
 [Daft](https://www.daft.io/) is a distributed DataFrame engine for Python.
+See also the [Daft Paimon connector 
documentation](https://docs.daft.ai/en/stable/connectors/paimon/).
 
 This requires `daft` to be installed:
 
 ```bash
-pip install pypaimon[daft]
+pip install 'pypaimon[daft]'
+```
+
+To execute Daft plans on Ray, install both extras:
+
+```bash
+pip install 'pypaimon[daft,ray]'
 ```
 
 `pypaimon.daft` exposes a top-level `read_paimon` / `write_paimon` API that
@@ -165,12 +172,59 @@ write_paimon(
 )
 ```
 
+For unpartitioned tables, overwrite replaces the table contents. For
+partitioned tables, overwrite follows Paimon's dynamic partition overwrite
+semantics by default: only partitions present in the input DataFrame are
+replaced, and existing partitions not present in the input are kept.
+
 **Parameters:**
 - `df`: the Daft DataFrame to write.
 - `table_identifier`: full table name, e.g. `"db_name.table_name"`.
 - `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
 - `mode`: write mode — `"append"` (default) or `"overwrite"`.
 
+## Running Daft on Ray
+
+`pypaimon.daft` works with Daft's Ray runner. Configure the runner before the
+first Daft execution in the process:
+
+```python
+import daft
+import ray
+from daft import runners
+from pypaimon.daft import read_paimon, write_paimon
+
+ray.init()  # use address="auto" to connect to an existing Ray cluster
+runners.set_runner_ray()
+
+df = daft.from_pydict({
+    "id": [1, 2, 3],
+    "name": ["alice", "bob", "charlie"],
+    "dt": ["2024-01-01", "2024-01-01", "2024-01-02"],
+})
+
+write_paimon(
+    df,
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+)
+
+result = (
+    read_paimon(
+        "database_name.table_name",
+        catalog_options={"warehouse": "/path/to/warehouse"},
+    )
+    .where(daft.col("dt") == "2024-01-01")
+    .select("id", "name")
+)
+
+result.show()
+```
+
+Use `pypaimon.daft` when your application is written with Daft DataFrames and
+you want Daft to schedule the execution on Ray. Use `pypaimon.ray` instead when
+your application directly reads or writes Ray Datasets.
+
 ## Catalog Abstraction
 
 Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` 
interfaces:
diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index a6d98f8fcc..03364aa743 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -34,6 +34,9 @@ Ray's built-in Iceberg integration. The lower-level 
`TableRead.to_ray()` and
 already resolved a `(read_builder, splits)` pair or constructed a
 `table_write` via the regular pypaimon API.
 
+If your application uses Daft DataFrames and only needs Ray as Daft's execution
+backend, see [Running Daft on Ray](./daft#running-daft-on-ray).
+
 ## Read
 
 ### `read_paimon` (recommended)
diff --git a/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py 
b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py
new file mode 100644
index 0000000000..5dd1bcf3fd
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py
@@ -0,0 +1,225 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for pypaimon.daft with Daft's Ray runner.
+
+Daft runners are process-global and cannot be switched after initialization.
+Run the Ray-runner scenario in a fresh Python subprocess.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import os
+import subprocess
+import sys
+import textwrap
+
+import pytest
+
+pytestmark = pytest.mark.skipif(
+    importlib.util.find_spec("daft") is None
+    or importlib.util.find_spec("ray") is None,
+    reason="Daft-on-Ray e2e requires both daft and ray",
+)
+
+
+def test_daft_on_ray_read_write_e2e():
+    python_root = os.path.abspath(
+        os.path.join(os.path.dirname(__file__), "..", "..", "..")
+    )
+    script = textwrap.dedent(
+        r'''
+        import os
+        import shutil
+        import tempfile
+
+        os.environ.setdefault("RAY_TMPDIR", "/tmp")
+
+        import daft
+        import pyarrow as pa
+        import ray
+        from daft import runners
+        from pypaimon import CatalogFactory, Schema
+        from pypaimon.daft import PaimonCatalog, read_paimon, write_paimon
+
+        root = tempfile.mkdtemp(prefix="paimon-daft-on-ray-")
+        try:
+            warehouse = os.path.join(root, "warehouse")
+            catalog_options = {"warehouse": warehouse}
+            catalog = CatalogFactory.create(catalog_options)
+            catalog.create_database("test_db", False)
+
+            ray.init(
+                num_cpus=2,
+                include_dashboard=False,
+                ignore_reinit_error=True,
+            )
+            runners.set_runner_ray()
+            assert runners.get_or_create_runner().name == "ray"
+
+            schema = Schema.from_pyarrow_schema(
+                pa.schema([
+                    pa.field("id", pa.int64()),
+                    pa.field("name", pa.string()),
+                    pa.field("dt", pa.string()),
+                ]),
+                partition_keys=["dt"],
+                options={"file.format": "parquet"},
+            )
+            catalog.create_table("test_db.t", schema, False)
+
+            df = daft.from_pydict({
+                "id": [3, 1, 2, 4],
+                "name": ["c", "a", "b", "d"],
+                "dt": [
+                    "2026-06-07",
+                    "2026-06-07",
+                    "2026-06-07",
+                    "2026-06-08",
+                ],
+            }).into_partitions(2)
+
+            summary = write_paimon(
+                df,
+                "test_db.t",
+                catalog_options=catalog_options,
+            ).to_arrow()
+            assert sum(summary.column("rows").to_pylist()) == 4
+
+            result = (
+                read_paimon("test_db.t", catalog_options=catalog_options)
+                .where(daft.col("dt") == "2026-06-07")
+                .select("id", "name")
+                .sort("id")
+                .to_arrow()
+            )
+            assert result.to_pydict() == {
+                "id": [1, 2, 3],
+                "name": ["a", "b", "c"],
+            }
+
+            write_paimon(
+                daft.from_pydict({
+                    "id": [10],
+                    "name": ["z"],
+                    "dt": ["2026-06-07"],
+                }),
+                "test_db.t",
+                catalog_options=catalog_options,
+                mode="overwrite",
+            ).to_arrow()
+            same_partition = (
+                read_paimon("test_db.t", catalog_options=catalog_options)
+                .sort("id")
+                .to_arrow()
+            )
+            assert same_partition.to_pydict() == {
+                "id": [4, 10],
+                "name": ["d", "z"],
+                "dt": ["2026-06-08", "2026-06-07"],
+            }
+
+            write_paimon(
+                daft.from_pydict({
+                    "id": [20],
+                    "name": ["new"],
+                    "dt": ["2026-06-09"],
+                }),
+                "test_db.t",
+                catalog_options=catalog_options,
+                mode="overwrite",
+            ).to_arrow()
+            new_partition = (
+                read_paimon("test_db.t", catalog_options=catalog_options)
+                .sort("id")
+                .to_arrow()
+            )
+            assert new_partition.to_pydict() == {
+                "id": [4, 10, 20],
+                "name": ["d", "z", "new"],
+                "dt": ["2026-06-08", "2026-06-07", "2026-06-09"],
+            }
+
+            unpartitioned_schema = Schema.from_pyarrow_schema(
+                pa.schema([
+                    pa.field("id", pa.int64()),
+                    pa.field("name", pa.string()),
+                ]),
+                options={"file.format": "parquet"},
+            )
+            catalog.create_table(
+                "test_db.unpartitioned",
+                unpartitioned_schema,
+                False,
+            )
+            write_paimon(
+                daft.from_pydict({"id": [1, 2], "name": ["a", "b"]}),
+                "test_db.unpartitioned",
+                catalog_options=catalog_options,
+            ).to_arrow()
+            write_paimon(
+                daft.from_pydict({"id": [30], "name": ["only"]}),
+                "test_db.unpartitioned",
+                catalog_options=catalog_options,
+                mode="overwrite",
+            ).to_arrow()
+            unpartitioned = read_paimon(
+                "test_db.unpartitioned",
+                catalog_options=catalog_options,
+            ).to_arrow()
+            assert unpartitioned.to_pydict() == {
+                "id": [30],
+                "name": ["only"],
+            }
+
+            daft_catalog = PaimonCatalog(
+                CatalogFactory.create(catalog_options),
+                name="paimon",
+            )
+            wrapper_result = (
+                daft_catalog.get_table("test_db.unpartitioned")
+                .read()
+                .to_arrow()
+            )
+            assert wrapper_result.to_pydict() == {
+                "id": [30],
+                "name": ["only"],
+            }
+        finally:
+            if ray.is_initialized():
+                ray.shutdown()
+            shutil.rmtree(root, ignore_errors=True)
+        '''
+    )
+
+    env = os.environ.copy()
+    env["PYTHONPATH"] = os.pathsep.join(
+        [python_root, env.get("PYTHONPATH", "")]
+    ).rstrip(os.pathsep)
+    result = subprocess.run(
+        [sys.executable, "-c", script],
+        capture_output=True,
+        env=env,
+        text=True,
+        timeout=120,
+    )
+    assert result.returncode == 0, (
+        f"stdout:\n{result.stdout}\n"
+        f"stderr:\n{result.stderr}"
+    )

Reply via email to