This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ec63ed1cbf [python][daft] Add public API integration tests (#7990)
ec63ed1cbf is described below

commit ec63ed1cbf9af9c1bd395f6df184293f97caba4e
Author: QuakeWang <[email protected]>
AuthorDate: Thu May 28 16:54:03 2026 +0800

    [python][daft] Add public API integration tests (#7990)
---
 paimon-python/pypaimon/daft/daft_paimon.py         |   5 +
 .../pypaimon/tests/daft/daft_integration_test.py   | 236 +++++++++++++++++++++
 2 files changed, 241 insertions(+)

diff --git a/paimon-python/pypaimon/daft/daft_paimon.py 
b/paimon-python/pypaimon/daft/daft_paimon.py
index cde1e23d80..245cea534b 100644
--- a/paimon-python/pypaimon/daft/daft_paimon.py
+++ b/paimon-python/pypaimon/daft/daft_paimon.py
@@ -143,6 +143,11 @@ def read_paimon(
     Returns:
         A lazy ``daft.DataFrame`` backed by this Paimon table.
     """
+    if snapshot_id is not None and tag_name is not None:
+        raise ValueError(
+            "snapshot_id and tag_name cannot be set at the same time"
+        )
+
     from pypaimon.catalog.catalog_factory import CatalogFactory
 
     catalog = CatalogFactory.create(catalog_options)
diff --git a/paimon-python/pypaimon/tests/daft/daft_integration_test.py 
b/paimon-python/pypaimon/tests/daft/daft_integration_test.py
new file mode 100644
index 0000000000..3c5709739d
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_integration_test.py
@@ -0,0 +1,236 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for the public pypaimon.daft read_paimon() / write_paimon() API."""
+
+from __future__ import annotations
+
+import pyarrow as pa
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from daft import col
+
+from pypaimon.daft import read_paimon, write_paimon
+
+
[email protected]
+def catalog_options(tmp_path):
+    options = {"warehouse": str(tmp_path)}
+    catalog = pypaimon.CatalogFactory.create(options)
+    catalog.create_database("test_db", ignore_if_exists=True)
+    return options
+
+
+def _create_table(
+    catalog_options,
+    table_name: str,
+    pa_schema: pa.Schema,
+    *,
+    partition_keys: list[str] | None = None,
+    options: dict[str, str] | None = None,
+):
+    identifier = f"test_db.{table_name}"
+    catalog = pypaimon.CatalogFactory.create(catalog_options)
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa_schema,
+        partition_keys=partition_keys,
+        options=options,
+    )
+    catalog.create_table(identifier, schema, ignore_if_exists=False)
+    return identifier, catalog.get_table(identifier)
+
+
+def _write_arrow(table, data: pa.Table) -> None:
+    write_builder = table.new_batch_write_builder()
+    table_write = write_builder.new_write()
+    table_commit = write_builder.new_commit()
+    try:
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+    finally:
+        table_write.close()
+        table_commit.close()
+
+
+def _create_and_populate_table(
+    catalog_options,
+    table_name: str,
+    data: pa.Table,
+    *,
+    partition_keys: list[str] | None = None,
+    options: dict[str, str] | None = None,
+) -> str:
+    identifier, table = _create_table(
+        catalog_options,
+        table_name,
+        data.schema,
+        partition_keys=partition_keys,
+        options=options,
+    )
+    _write_arrow(table, data)
+    return identifier
+
+
+def test_read_paimon_basic(catalog_options):
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], pa.int64()),
+            "name": pa.array(["alice", "bob", "carol"], pa.string()),
+            "value": pa.array([10, 20, 30], pa.int64()),
+        }
+    )
+    identifier = _create_and_populate_table(catalog_options, "read_basic", 
data)
+
+    result = read_paimon(identifier, catalog_options).sort("id").to_pydict()
+
+    assert result == {
+        "id": [1, 2, 3],
+        "name": ["alice", "bob", "carol"],
+        "value": [10, 20, 30],
+    }
+
+
+def test_read_paimon_projection(catalog_options):
+    data = pa.table(
+        {
+            "id": pa.array([1, 2], pa.int64()),
+            "name": pa.array(["alice", "bob"], pa.string()),
+            "value": pa.array([10, 20], pa.int64()),
+        }
+    )
+    identifier = _create_and_populate_table(catalog_options, 
"read_projection", data)
+
+    result = read_paimon(identifier, catalog_options).select("id", 
"name").sort("id").to_pydict()
+
+    assert result == {
+        "id": [1, 2],
+        "name": ["alice", "bob"],
+    }
+
+
+def test_read_paimon_filter(catalog_options):
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3, 4], pa.int64()),
+            "category": pa.array(["A", "B", "A", "C"], pa.string()),
+            "amount": pa.array([100, 200, 150, 300], pa.int64()),
+        }
+    )
+    identifier = _create_and_populate_table(catalog_options, "read_filter", 
data)
+
+    result = (
+        read_paimon(identifier, catalog_options)
+        .where((col("category") == "A") & (col("amount") >= 120))
+        .sort("id")
+        .to_pydict()
+    )
+
+    assert result == {
+        "id": [3],
+        "category": ["A"],
+        "amount": [150],
+    }
+
+
+def test_read_paimon_limit(catalog_options):
+    data = pa.table(
+        {
+            "id": pa.array(list(range(10)), pa.int64()),
+            "name": pa.array([f"name-{i}" for i in range(10)], pa.string()),
+        }
+    )
+    identifier = _create_and_populate_table(catalog_options, "read_limit", 
data)
+
+    result = read_paimon(identifier, catalog_options).limit(3).to_pydict()
+
+    assert len(result["id"]) == 3
+
+
+def test_read_paimon_with_snapshot_id(catalog_options):
+    pa_schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
+    identifier, table = _create_table(catalog_options, "read_snapshot_id", 
pa_schema)
+    _write_arrow(table, pa.table({"id": [1], "name": ["first"]}, 
schema=pa_schema))
+    _write_arrow(table, pa.table({"id": [2], "name": ["second"]}, 
schema=pa_schema))
+
+    latest = read_paimon(identifier, catalog_options).sort("id").to_pydict()
+    snap1 = read_paimon(identifier, catalog_options, snapshot_id=1).to_pydict()
+
+    assert latest["id"] == [1, 2]
+    assert snap1 == {"id": [1], "name": ["first"]}
+
+
+def test_read_paimon_with_tag_name(catalog_options):
+    pa_schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
+    identifier, table = _create_table(catalog_options, "read_tag_name", 
pa_schema)
+    _write_arrow(table, pa.table({"id": [1], "name": ["tagged"]}, 
schema=pa_schema))
+    table.create_tag("v1")
+    _write_arrow(table, pa.table({"id": [2], "name": ["latest"]}, 
schema=pa_schema))
+
+    result = read_paimon(identifier, catalog_options, 
tag_name="v1").to_pydict()
+
+    assert result == {"id": [1], "name": ["tagged"]}
+
+
+def 
test_read_paimon_rejects_snapshot_id_and_tag_name_together(catalog_options):
+    with pytest.raises(ValueError, match="snapshot_id and tag_name cannot be 
set at the same time"):
+        read_paimon(
+            "test_db.dummy",
+            catalog_options,
+            snapshot_id=1,
+            tag_name="v1",
+        )
+
+
+def test_write_paimon_append(catalog_options):
+    pa_schema = pa.schema([
+        ("id", pa.int64()),
+        ("name", pa.string()),
+    ])
+    identifier, _ = _create_table(catalog_options, "write_append", pa_schema)
+    df = daft.from_pydict({"id": [1, 2, 3], "name": ["a", "b", "c"]})
+
+    write_paimon(df, identifier, catalog_options)
+
+    result = read_paimon(identifier, catalog_options).sort("id").to_pydict()
+    assert result == {"id": [1, 2, 3], "name": ["a", "b", "c"]}
+
+
+def test_write_paimon_overwrite(catalog_options):
+    pa_schema = pa.schema([
+        ("id", pa.int64()),
+        ("name", pa.string()),
+    ])
+    identifier, _ = _create_table(catalog_options, "write_overwrite", 
pa_schema)
+    write_paimon(
+        daft.from_pydict({"id": [1, 2], "name": ["old-a", "old-b"]}),
+        identifier,
+        catalog_options,
+    )
+
+    write_paimon(
+        daft.from_pydict({"id": [3], "name": ["new"]}),
+        identifier,
+        catalog_options,
+        mode="overwrite",
+    )
+
+    result = read_paimon(identifier, catalog_options).to_pydict()
+    assert result == {"id": [3], "name": ["new"]}

Reply via email to