This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4133576330 [python] initialize daft connector for paimon (#7844)
4133576330 is described below
commit 4133576330ff7bbc71c4caf60414fe5fa7f79d34
Author: ChengHui Chen <[email protected]>
AuthorDate: Sat May 16 00:21:03 2026 +0800
[python] initialize daft connector for paimon (#7844)
Initialize `paimon-daft` module in the Paimon main repository.
Based on discussion from https://github.com/Eventual-Inc/Daft/pull/6839,
the community decided to maintain the Daft-Paimon connector in the
Paimon repository going forward — for more efficient maintenance and
stronger ownership by the Paimon community.
This is the init PR that migrates the connector code from the Daft
repository and adds basic project infrastructure.
---
.github/workflows/paimon-python-checks.yml | 2 +-
docs/content/pypaimon/daft.md | 231 +++++++++
paimon-python/dev/lint-python.sh | 7 +-
paimon-python/pypaimon/daft/__init__.py | 31 ++
paimon-python/pypaimon/daft/daft_blob.py | 86 ++++
paimon-python/pypaimon/daft/daft_catalog.py | 285 +++++++++++
paimon-python/pypaimon/daft/daft_compat.py | 60 +++
paimon-python/pypaimon/daft/daft_datasink.py | 125 +++++
paimon-python/pypaimon/daft/daft_datasource.py | 316 ++++++++++++
paimon-python/pypaimon/daft/daft_io_config.py | 89 ++++
paimon-python/pypaimon/daft/daft_paimon.py | 161 ++++++
.../pypaimon/daft/daft_predicate_visitor.py | 248 +++++++++
.../pypaimon/tests/daft/daft_catalog_rest_test.py | 233 +++++++++
.../pypaimon/tests/daft/daft_catalog_test.py | 333 ++++++++++++
.../pypaimon/tests/daft/daft_data_test.py | 557 +++++++++++++++++++++
.../pypaimon/tests/daft/daft_sink_test.py | 416 +++++++++++++++
paimon-python/pypaimon/write/writer/data_writer.py | 4 +-
paimon-python/setup.py | 3 +
18 files changed, 3184 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 5302d1267d..580ea4cccc 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -132,7 +132,7 @@ jobs:
else
python -m pip install --upgrade pip
pip install torch --index-url https://download.pytorch.org/whl/cpu
- python -m pip install pyroaring readerwriterlock==1.0.9
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0
py4j==0.10.9.9 requests parameterized==0.9.0
+ python -m pip install pyroaring readerwriterlock==1.0.9
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0
py4j==0.10.9.9 requests parameterized==0.9.0 'daft>=0.7.6'
python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION
}}' -i https://pypi.org/simple/
if python -c "import sys; sys.exit(0 if sys.version_info >= (3,
11) else 1)"; then
python -m pip install vortex-data
diff --git a/docs/content/pypaimon/daft.md b/docs/content/pypaimon/daft.md
new file mode 100644
index 0000000000..b2f4043bbe
--- /dev/null
+++ b/docs/content/pypaimon/daft.md
@@ -0,0 +1,231 @@
+---
+title: "Daft"
+weight: 4
+type: docs
+aliases:
+ - /pypaimon/daft.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Daft
+
+[Daft](https://www.daft.io/) is a distributed DataFrame engine for Python.
+
+This requires `daft` to be installed:
+
+```bash
+pip install pypaimon[daft]
+```
+
+`pypaimon.daft` exposes a top-level `read_paimon` / `write_paimon` API that
+takes a table identifier and catalog options directly.
+
+## Read
+
+### `read_paimon` (recommended)
+
+```python
+from pypaimon.daft import read_paimon
+
+df = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+)
+
+df.show()
+```
+
+`read_paimon` opens its own catalog and resolves the table in a single call.
+
+The returned DataFrame is lazy. Use standard Daft operations for filtering,
+projection, and limit — they are automatically pushed down into the Paimon scan
+via Daft's DataSource protocol:
+
+```python
+import daft
+
+df = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+)
+
+# Filter pushdown (partition pruning + file-level skipping)
+df = df.where(daft.col("dt") == "2024-01-01")
+
+# Projection pushdown (only requested columns are read from disk)
+df = df.select("id", "name")
+
+# Limit pushdown
+df = df.limit(100)
+
+df.show()
+```
+
+**Time travel:**
+
+```python
+# Read a specific snapshot.
+df = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ snapshot_id=42,
+)
+
+# Read a tagged snapshot.
+df = read_paimon(
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ tag_name="release-2026-04",
+)
+```
+
+`snapshot_id` and `tag_name` are mutually exclusive.
+
+**Parameters:**
+- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
+- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
+ e.g. `{"warehouse": "/path/to/warehouse"}`.
+- `snapshot_id`: optional snapshot id to time-travel to. Mutually
+ exclusive with `tag_name`.
+- `tag_name`: optional tag name to time-travel to. Mutually
+ exclusive with `snapshot_id`.
+- `io_config`: optional Daft `IOConfig` for accessing object storage.
+ If `None`, will be inferred from the catalog options.
+
+For tables on object stores, credentials are inferred from the catalog options
+automatically, or you can pass an explicit `IOConfig`:
+
+```python
+from daft.io import IOConfig, S3Config
+
+df = read_paimon(
+ "my_db.my_table",
+ catalog_options={
+ "warehouse": "s3://my-bucket/warehouse",
+ "fs.s3.accessKeyId": "...",
+ "fs.s3.accessKeySecret": "...",
+ },
+)
+df.show()
+```
+
+**Features:**
+- Append-only tables with Parquet format use Daft's native high-performance
Parquet reader.
+- Primary-key tables that require LSM merge fall back to pypaimon's built-in
reader.
+- Partition pruning, predicate pushdown, projection pushdown, and limit
pushdown are all supported.
+
+## Write
+
+### `write_paimon` (recommended)
+
+```python
+import daft
+from pypaimon.daft import write_paimon
+
+df = daft.from_pydict({
+ "id": [1, 2, 3],
+ "name": ["alice", "bob", "charlie"],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
+})
+
+write_paimon(
+ df,
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+)
+```
+
+`write_paimon` opens its own catalog, resolves the table, and commits the
+write through Daft's DataSink API.
+
+**Overwrite mode:**
+
+```python
+write_paimon(
+ df,
+ "database_name.table_name",
+ catalog_options={"warehouse": "/path/to/warehouse"},
+ mode="overwrite",
+)
+```
+
+**Parameters:**
+- `df`: the Daft DataFrame to write.
+- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
+- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
+- `mode`: write mode — `"append"` (default) or `"overwrite"`.
+
+## Catalog Abstraction
+
+Paimon catalogs can integrate with Daft's unified `Catalog` / `Table`
interfaces:
+
+```python
+import pypaimon
+from pypaimon.daft import PaimonCatalog
+
+inner = pypaimon.CatalogFactory.create({"warehouse": "/path/to/warehouse"})
+catalog = PaimonCatalog(inner, name="my_paimon")
+
+# Browse
+catalog.list_namespaces()
+catalog.list_tables()
+
+# Read / write through catalog
+table = catalog.get_table("my_db.my_table")
+df = table.read()
+table.append(df)
+table.overwrite(df)
+```
+
+You can also wrap a single table directly:
+
+```python
+from pypaimon.daft import PaimonTable
+
+inner_table = inner.get_table("my_db.my_table")
+table = PaimonTable(inner_table)
+df = table.read()
+```
+
+### Creating Tables
+
+```python
+import daft
+from daft.io.partitioning import PartitionField
+
+schema = daft.from_pydict({"id": [1], "name": ["a"], "dt":
["2024-01-01"]}).schema()
+dt_field = schema["dt"]
+partition_fields = [PartitionField.create(dt_field)]
+
+table = catalog.create_table(
+ "my_db.new_table",
+ schema,
+ partition_fields=partition_fields,
+)
+
+# Primary-key table
+table = catalog.create_table(
+ "my_db.pk_table",
+ schema,
+ properties={"primary_keys": ["id", "dt"]},
+ partition_fields=partition_fields,
+)
+```
diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index a8e21c2d34..ffee46cbef 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -152,7 +152,12 @@ function license_check() {
# Flake8 check
function flake8_check() {
- local PYTHON_SOURCE="$(find . \( -path ./dev -o -path ./.tox -o -path
./.venv \) -prune -o -type f -name "*.py" -print )"
+ local PRUNE_PATHS="\( -path ./dev -o -path ./.tox -o -path ./.venv"
+ if python -c "import sys; sys.exit(0 if sys.version_info < (3, 10) else
1)" 2>/dev/null; then
+ PRUNE_PATHS="$PRUNE_PATHS -o -path ./pypaimon/daft -o -path
./pypaimon/tests/daft"
+ fi
+ PRUNE_PATHS="$PRUNE_PATHS \)"
+ local PYTHON_SOURCE="$(eval "find . $PRUNE_PATHS -prune -o -type f -name
'*.py' -print")"
print_function "STAGE" "flake8 checks"
if [ ! -f "$FLAKE8_PATH" ]; then
diff --git a/paimon-python/pypaimon/daft/__init__.py
b/paimon-python/pypaimon/daft/__init__.py
new file mode 100644
index 0000000000..e9651dd477
--- /dev/null
+++ b/paimon-python/pypaimon/daft/__init__.py
@@ -0,0 +1,31 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.daft.daft_paimon import read_paimon, write_paimon
+
+__all__ = ["read_paimon", "write_paimon", "PaimonCatalog", "PaimonTable"]
+
+
+def __getattr__(name):
+ if name in ("PaimonCatalog", "PaimonTable"):
+ from pypaimon.daft.daft_catalog import PaimonCatalog, PaimonTable
+
+ globals()["PaimonCatalog"] = PaimonCatalog
+ globals()["PaimonTable"] = PaimonTable
+ return globals()[name]
+ raise AttributeError(f"module 'pypaimon.daft' has no attribute {name!r}")
diff --git a/paimon-python/pypaimon/daft/daft_blob.py
b/paimon-python/pypaimon/daft/daft_blob.py
new file mode 100644
index 0000000000..156cda52bf
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_blob.py
@@ -0,0 +1,86 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Utilities for deserializing Paimon BlobDescriptor bytes into FileReference
arrays."""
+
+from __future__ import annotations
+
+import struct
+
+import pyarrow as pa
+
+FILE_PHYSICAL_TYPE = pa.struct(
+ [
+ pa.field("url", pa.large_utf8()),
+ pa.field("io_config", pa.large_binary()),
+ pa.field("offset", pa.int64()),
+ pa.field("length", pa.int64()),
+ ]
+)
+
+
+def _deserialize_one(data: bytes) -> tuple[str, int, int]:
+ """Deserialize a single BlobDescriptor -> (url, offset, length)."""
+ pos = 0
+ version = data[pos]
+ pos += 1
+
+ if version > 1:
+ pos += 8 # skip magic
+
+ uri_len = struct.unpack_from("<I", data, pos)[0]
+ pos += 4
+
+ uri = data[pos:pos + uri_len].decode("utf-8")
+ pos += uri_len
+
+ offset = struct.unpack_from("<q", data, pos)[0]
+ pos += 8
+
+ length = struct.unpack_from("<q", data, pos)[0]
+ return uri, offset, length
+
+
+def blob_column_to_file_array(column: pa.Array) -> pa.Array:
+ """Convert a large_binary column of serialized BlobDescriptors to a
FileReference-compatible struct."""
+ urls: list[str | None] = []
+ offsets: list[int | None] = []
+ lengths: list[int | None] = []
+
+ for value in column:
+ if value is None or not value.is_valid:
+ urls.append(None)
+ offsets.append(None)
+ lengths.append(None)
+ else:
+ raw = value.as_py()
+ uri, off, length = _deserialize_one(raw)
+ urls.append(uri)
+ offsets.append(off)
+ lengths.append(length)
+
+ n = len(urls)
+ return pa.StructArray.from_arrays(
+ [
+ pa.array(urls, type=pa.large_utf8()),
+ pa.nulls(n, type=pa.large_binary()),
+ pa.array(offsets, type=pa.int64()),
+ pa.array(lengths, type=pa.int64()),
+ ],
+ names=["url", "io_config", "offset", "length"],
+ )
diff --git a/paimon-python/pypaimon/daft/daft_catalog.py
b/paimon-python/pypaimon/daft/daft_catalog.py
new file mode 100644
index 0000000000..aaf3f6879d
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_catalog.py
@@ -0,0 +1,285 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Paimon catalog and table wrappers for Daft's Catalog/Table interfaces."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pyarrow as pa
+from pypaimon.catalog.catalog import Catalog as InnerCatalog
+from pypaimon.catalog.catalog_exception import (
+ DatabaseNotExistException,
+ TableNotExistException,
+)
+from pypaimon.table.table import Table as InnerTable
+
+from daft.catalog import Catalog, Function, Identifier, NotFoundError,
Properties, Schema, Table
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+ from daft.dataframe import DataFrame
+ from daft.io.partitioning import PartitionField
+
+
+class PaimonCatalog(Catalog):
+ _inner: InnerCatalog
+ _name: str
+ _catalog_options: dict[str, str]
+
+ def __init__(
+ self,
+ inner: InnerCatalog,
+ name: str = "paimon",
+ catalog_options: dict[str, str] | None = None,
+ ) -> None:
+ self._inner = inner
+ self._name = name
+ if catalog_options is not None:
+ self._catalog_options = catalog_options
+ else:
+ opts = getattr(inner, "catalog_options", None)
+ if opts is None:
+ ctx = getattr(inner, "context", None)
+ opts = getattr(ctx, "options", None) if ctx is not None else
None
+ self._catalog_options = opts.to_map() if opts is not None and
hasattr(opts, "to_map") else {}
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ ###
+ # create_*
+ ###
+
+ def _create_function(self, ident: Identifier, function: Function |
Callable[..., Any]) -> None:
+ raise NotImplementedError("Paimon does not support function
registration.")
+
+ def _create_namespace(self, ident: Identifier) -> None:
+ db_name = _to_paimon_ident(ident)
+ self._inner.create_database(db_name, ignore_if_exists=False)
+
+ def _create_table(
+ self,
+ ident: Identifier,
+ schema: Schema,
+ properties: Properties | None = None,
+ partition_fields: list[PartitionField] | None = None,
+ ) -> Table:
+ import pypaimon
+
+ pa_schema = _cast_large_types(schema.to_pyarrow_schema())
+ partition_keys = [pf.field.name for pf in (partition_fields or [])]
+ primary_keys = list((properties or {}).get("primary_keys", []))
+ options = {k: str(v) for k, v in (properties or {}).items() if k !=
"primary_keys"} if properties else {}
+
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(
+ pa_schema,
+ partition_keys=partition_keys,
+ primary_keys=primary_keys,
+ options=options,
+ )
+
+ paimon_ident = _to_paimon_ident(ident)
+ self._inner.create_table(paimon_ident, paimon_schema,
ignore_if_exists=False)
+
+ inner_table = self._inner.get_table(paimon_ident)
+ return PaimonTable(inner_table, catalog_options=self._catalog_options)
+
+ ###
+ # drop_*
+ ###
+
+ def _drop_namespace(self, ident: Identifier) -> None:
+ db_name = _to_paimon_ident(ident)
+ try:
+ self._inner.drop_database(db_name, ignore_if_not_exists=False)
+ except DatabaseNotExistException as ex:
+ raise NotFoundError(f"Namespace '{db_name}' not found.") from ex
+
+ def _drop_table(self, ident: Identifier) -> None:
+ paimon_ident = _to_paimon_ident(ident)
+ try:
+ self._inner.drop_table(paimon_ident, ignore_if_not_exists=False)
+ except TableNotExistException as ex:
+ raise NotFoundError(f"Table '{paimon_ident}' not found.") from ex
+
+ ###
+ # has_*
+ ###
+
+ def _has_namespace(self, ident: Identifier) -> bool:
+ db_name = _to_paimon_ident(ident)
+ try:
+ self._inner.get_database(db_name)
+ return True
+ except DatabaseNotExistException:
+ return False
+
+ def _has_table(self, ident: Identifier) -> bool:
+ paimon_ident = _to_paimon_ident(ident)
+ try:
+ self._inner.get_table(paimon_ident)
+ return True
+ except (TableNotExistException, DatabaseNotExistException):
+ return False
+
+ ###
+ # get_*
+ ###
+
+ def _get_function(self, ident: Identifier) -> Function:
+ raise NotFoundError(f"Function '{ident}' not found in catalog
'{self.name}'")
+
+ def _get_table(self, ident: Identifier) -> PaimonTable:
+ paimon_ident = _to_paimon_ident(ident)
+ try:
+ inner = self._inner.get_table(paimon_ident)
+ return PaimonTable(inner, catalog_options=self._catalog_options)
+ except TableNotExistException as ex:
+ raise NotFoundError() from ex
+
+ ###
+ # list_*
+ ###
+
+ def _list_namespaces(self, pattern: str | None = None) -> list[Identifier]:
+ databases: list[str] = self._inner.list_databases()
+ return [Identifier(db) for db in databases if pattern is None or
db.startswith(pattern)]
+
+ def _list_tables(self, pattern: str | None = None) -> list[Identifier]:
+ result = []
+ for db in self._inner.list_databases():
+ for table_name in self._inner.list_tables(db):
+ ident = Identifier(db, table_name)
+ if pattern is None or str(ident).startswith(pattern):
+ result.append(ident)
+ return result
+
+
+class PaimonTable(Table):
+ _inner: InnerTable
+ _catalog_options: dict[str, str]
+
+ def __init__(self, inner: InnerTable, catalog_options: dict[str, str] |
None = None) -> None:
+ self._inner = inner
+ self._catalog_options = catalog_options or {}
+
+ @property
+ def name(self) -> str:
+ identifier = self._inner.identifier
+ return identifier.object
+
+ @property
+ def primary_keys(self) -> list[str]:
+ return list(self._inner.primary_keys)
+
+ @property
+ def partition_keys(self) -> list[str]:
+ return list(self._inner.partition_keys)
+
+ @property
+ def is_primary_key_table(self) -> bool:
+ return self._inner.is_primary_key_table
+
+ @property
+ def bucket_count(self) -> int:
+ return self._inner.total_buckets
+
+ @property
+ def table_options(self) -> dict[str, str]:
+ return dict(self._inner.options.options.to_map())
+
+ def schema(self) -> Schema:
+ return self.read().schema()
+
+ def read(self, **options: Any) -> DataFrame:
+ from pypaimon.daft.daft_paimon import _read_table
+
+ Table._validate_options("Paimon read", options, set())
+ return _read_table(self._inner, catalog_options=self._catalog_options)
+
+ def append(self, df: DataFrame, **options: Any) -> None:
+ from pypaimon.daft.daft_paimon import _write_table
+
+ Table._validate_options("Paimon write", options, set())
+ _write_table(df, self._inner, mode="append")
+
+ def overwrite(self, df: DataFrame, **options: Any) -> None:
+ from pypaimon.daft.daft_paimon import _write_table
+
+ Table._validate_options("Paimon write", options, set())
+ _write_table(df, self._inner, mode="overwrite")
+
+ def truncate(self) -> None:
+ """Remove all data from this table."""
+ write_builder = self._inner.new_batch_write_builder()
+ table_commit = write_builder.new_commit()
+ try:
+ table_commit.truncate_table()
+ finally:
+ table_commit.close()
+
+ def truncate_partitions(self, partitions: list[dict[str, str]]) -> None:
+ """Remove data from specific partitions."""
+ write_builder = self._inner.new_batch_write_builder()
+ table_commit = write_builder.new_commit()
+ try:
+ table_commit.truncate_partitions(partitions)
+ finally:
+ table_commit.close()
+
+
+def _to_paimon_ident(ident: Identifier) -> str:
+ """Convert a Daft identifier to a pypaimon identifier string.
+
+ - 1 part (table,) -> 'table'
+ - 2 parts (db, table) -> 'db.table'
+ - 3 parts (catalog, db, table) -> 'db.table' (catalog prefix stripped)
+ """
+ if isinstance(ident, Identifier):
+ parts = tuple(ident)
+ if len(parts) == 3:
+ return f"{parts[1]}.{parts[2]}"
+ if len(parts) == 2:
+ return f"{parts[0]}.{parts[1]}"
+ return str(parts[0])
+ return ident
+
+
+def _cast_large_types(arrow_schema: pa.Schema) -> pa.Schema:
+ """Convert PyArrow schema to be compatible with pypaimon.
+
+ pypaimon doesn't support large_string, so we convert it to regular string.
+ large_binary is kept as-is because pypaimon 1.4+ maps it to the BLOB type.
+ """
+ new_fields = []
+ need_conversion = False
+
+ for field in arrow_schema:
+ field_type = field.type
+ if pa.types.is_large_string(field_type):
+ field_type = pa.string()
+ need_conversion = True
+ new_fields.append(pa.field(field.name, field_type,
nullable=field.nullable, metadata=field.metadata))
+
+ if need_conversion:
+ return pa.schema(new_fields, metadata=arrow_schema.metadata)
+ return arrow_schema
diff --git a/paimon-python/pypaimon/daft/daft_compat.py
b/paimon-python/pypaimon/daft/daft_compat.py
new file mode 100644
index 0000000000..cfc3e35c36
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_compat.py
@@ -0,0 +1,60 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Daft version compatibility checks."""
+
+from __future__ import annotations
+
+from importlib.metadata import version
+
+_daft_version: tuple[int, ...] | None = None
+
+
+def _parse_daft_version() -> tuple[int, ...]:
+ global _daft_version
+ if _daft_version is not None:
+ return _daft_version
+ raw = version("daft")
+ parts = []
+ for seg in raw.split(".")[:3]:
+ digits = ""
+ for c in seg:
+ if c.isdigit():
+ digits += c
+ else:
+ break
+ if digits:
+ parts.append(int(digits))
+ _daft_version = tuple(parts)
+ return _daft_version
+
+
+def has_file_range_reads() -> bool:
+ """True if the installed Daft supports File offset/length (>= 0.7.11)."""
+ return _parse_daft_version() >= (0, 7, 11)
+
+
+def require_file_range_reads(feature: str = "BLOB columns") -> None:
+ """Raise if Daft is too old for File offset/length support (requires >=
0.7.11)."""
+ if not has_file_range_reads():
+ v = ".".join(str(x) for x in _parse_daft_version())
+ raise NotImplementedError(
+ f"{feature} require daft >= 0.7.11 (File offset/length support), "
+ f"but found {v}. "
+ f"Please upgrade: pip install 'daft>=0.7.11'"
+ )
diff --git a/paimon-python/pypaimon/daft/daft_datasink.py
b/paimon-python/pypaimon/daft/daft_datasink.py
new file mode 100644
index 0000000000..1f37cc4165
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_datasink.py
@@ -0,0 +1,125 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pyarrow as pa
+from daft.datatype import DataType
+from daft.io.sink import DataSink, WriteResult
+from daft.recordbatch.micropartition import MicroPartition
+from daft.schema import Schema
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ from pypaimon.table.file_store_table import FileStoreTable
+
+
+class PaimonDataSink(DataSink[list[Any]]):
+ """DataSink for writing data to an Apache Paimon table.
+
+ Delegates all file I/O and commit logic to pypaimon's BatchTableWrite /
+ BatchTableCommit APIs. Each `write()` call (one per parallel worker) opens
+ an independent BatchTableWrite, accumulates micro-partitions, then calls
+ `prepare_commit()` to obtain CommitMessage objects. `finalize()` gathers
+ all CommitMessages from all workers and performs a single atomic commit.
+ """
+
+ def __init__(self, table: FileStoreTable, mode: str = "append") -> None:
+ if mode not in ("append", "overwrite"):
+ raise ValueError(f"Only 'append' or 'overwrite' mode is supported,
got: {mode!r}")
+ self._table = table
+ self._mode = mode
+
+ from pypaimon.schema.data_types import PyarrowFieldParser
+
+ self._target_schema: pa.Schema =
PyarrowFieldParser.from_paimon_schema(table.fields)
+ self._write_builder = table.new_batch_write_builder()
+ if mode == "overwrite":
+ self._write_builder.overwrite({})
+
+ def name(self) -> str:
+ return "Paimon Write"
+
+ def schema(self) -> Schema:
+ return Schema._from_field_name_and_types(
+ [
+ ("operation", DataType.string()),
+ ("rows", DataType.int64()),
+ ("file_size", DataType.int64()),
+ ("file_name", DataType.string()),
+ ]
+ )
+
+ def write(self, micropartitions: Iterator[MicroPartition]) ->
Iterator[WriteResult[list[Any]]]:
+ table_write = self._write_builder.new_write()
+
+ cast_fields: list[tuple[int, pa.DataType]] | None = None
+
+ total_rows = 0
+ total_bytes = 0
+ try:
+ for mp in micropartitions:
+ for rb in mp.get_record_batches():
+ batch = rb.to_arrow_record_batch()
+ if cast_fields is None:
+ cast_fields = [
+ (i, field.type)
+ for i, field in enumerate(self._target_schema)
+ if batch.column(i).type != field.type
+ ]
+ if cast_fields:
+ arrays = list(batch.columns)
+ for i, target_type in cast_fields:
+ arrays[i] = arrays[i].cast(target_type)
+ batch = pa.RecordBatch.from_arrays(arrays,
schema=self._target_schema)
+ table_write.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+ total_bytes += batch.nbytes
+ commit_messages = table_write.prepare_commit()
+ finally:
+ table_write.close()
+
+ yield WriteResult(
+ result=list(commit_messages),
+ bytes_written=total_bytes,
+ rows_written=total_rows,
+ )
+
+ def finalize(self, write_results: list[WriteResult[list[Any]]]) ->
MicroPartition:
+ all_commit_messages = [msg for wr in write_results for msg in
wr.result]
+
+ table_commit = self._write_builder.new_commit()
+ try:
+ table_commit.commit(all_commit_messages)
+ finally:
+ table_commit.close()
+
+ operation_label = "OVERWRITE" if self._mode == "overwrite" else "ADD"
+ all_files = [f for msg in all_commit_messages for f in msg.new_files]
+
+ return MicroPartition.from_pydict(
+ {
+ "operation": pa.array([operation_label] * len(all_files),
type=pa.string()),
+ "rows": pa.array([f.row_count for f in all_files],
type=pa.int64()),
+ "file_size": pa.array([f.file_size for f in all_files],
type=pa.int64()),
+ "file_name": pa.array([f.file_name for f in all_files],
type=pa.string()),
+ }
+ )
diff --git a/paimon-python/pypaimon/daft/daft_datasource.py
b/paimon-python/pypaimon/daft/daft_datasource.py
new file mode 100644
index 0000000000..f6fb6f8f4c
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -0,0 +1,316 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlparse
+
+import daft
+from daft.dependencies import pa
+from daft.expressions import ExpressionsProjection
+from daft.io.partitioning import PartitionField
+from daft.io.source import DataSource, DataSourceTask
+from daft.logical.schema import Schema
+from daft.recordbatch import RecordBatch
+
+from pypaimon.daft.daft_compat import require_file_range_reads
+from pypaimon.daft.daft_predicate_visitor import convert_filters_to_paimon
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncIterator
+
+ from pypaimon.common.predicate import Predicate
+ from pypaimon.read.split import Split
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ from daft.daft import PyExpr, StorageConfig
+ from daft.io.pushdowns import Pushdowns
+
+logger = logging.getLogger(__name__)
+
+PAIMON_FILE_FORMAT_PARQUET = "parquet"
+PAIMON_FILE_FORMAT_ORC = "orc"
+PAIMON_FILE_FORMAT_AVRO = "avro"
+
+
+class _PaimonPKSplitTask(DataSourceTask):
+ """DataSourceTask for PK-table splits that require LSM-tree merge.
+
+ Used when split.raw_convertible is False (overlapping levels exist) or
+ when the file format is not Parquet (ORC, Avro). Delegates to pypaimon's
+ native reader which handles LSM merging internally.
+ """
+
+ def __init__(
+ self,
+ table: FileStoreTable,
+ split: Split,
+ schema: Schema,
+ blob_column_names: set[str] | None = None,
+ ) -> None:
+ self._table = table
+ self._split = split
+ self._schema = schema
+ self._blob_column_names = blob_column_names or set()
+
+ @property
+ def schema(self) -> Schema:
+ return self._schema
+
+ async def read(self) -> AsyncIterator[RecordBatch]:
+ table_read = self._table.new_read_builder().new_read()
+ reader = table_read.to_arrow_batch_reader([self._split])
+ for batch in iter(reader.read_next_batch, None):
+ if self._blob_column_names:
+ batch = _convert_blob_columns(batch, self._blob_column_names)
+ rb = RecordBatch.from_arrow_record_batches([batch], batch.schema)
+ if self._blob_column_names:
+ rb = _cast_blob_columns_to_file(rb, self._blob_column_names)
+ yield rb
+
+
+def _convert_blob_columns(batch: pa.RecordBatch, blob_column_names: set[str])
-> pa.RecordBatch:
+ """Replace serialized BlobDescriptor columns with the File physical struct
layout."""
+ from pypaimon.daft.daft_blob import FILE_PHYSICAL_TYPE,
blob_column_to_file_array
+
+ arrays = []
+ fields = []
+ for i, field in enumerate(batch.schema):
+ col = batch.column(i)
+ if field.name in blob_column_names and
(pa.types.is_large_binary(field.type) or pa.types.is_binary(field.type)):
+ arrays.append(blob_column_to_file_array(col))
+ fields.append(pa.field(field.name, FILE_PHYSICAL_TYPE,
nullable=field.nullable))
+ else:
+ arrays.append(col)
+ fields.append(field)
+ return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields))
+
+
+def _cast_blob_columns_to_file(rb: RecordBatch, blob_column_names: set[str])
-> RecordBatch:
+ """Cast struct-typed blob columns in a RecordBatch to DataType.file()."""
+ from daft.datatype import DataType
+
+ file_dtype = DataType.file()
+ columns = {}
+ for i, field in enumerate(rb.schema()):
+ col = rb.get_column(i)
+ if field.name in blob_column_names:
+ col = col.cast(file_dtype)
+ columns[field.name] = col
+ return RecordBatch.from_pydict(columns)
+
+
+class PaimonDataSource(DataSource):
+ """DataSource for Apache Paimon tables.
+
+ Uses pypaimon for catalog metadata and scan planning (file listing,
+ partition pruning, statistics-based file skipping), then yields
+ DataSourceTask objects executed by Daft's native Parquet reader.
+
+ For primary-key tables whose splits cannot be read directly without an
+ LSM-tree merge, a _PaimonPKSplitTask is yielded which delegates back
+ to pypaimon's native reader.
+ """
+
+ def __init__(
+ self,
+ table: FileStoreTable,
+ storage_config: StorageConfig,
+ catalog_options: dict[str, str],
+ ) -> None:
+ self._table = table
+ self._storage_config = storage_config
+ self._catalog_options = catalog_options
+
+ from pypaimon.schema.data_types import PyarrowFieldParser
+
+ pa_schema = PyarrowFieldParser.from_paimon_schema(table.fields)
+
+ self._blob_column_names: set[str] = {field.name for field in pa_schema
if pa.types.is_large_binary(field.type)}
+ self._has_blob_columns = bool(self._blob_column_names)
+
+ if self._has_blob_columns:
+ require_file_range_reads()
+ from daft.datatype import DataType
+
+ base_schema = Schema.from_pyarrow_schema(pa_schema)
+ fields = []
+ for f in base_schema:
+ if f.name in self._blob_column_names:
+ fields.append((f.name, DataType.file()))
+ else:
+ fields.append((f.name, f.dtype))
+ self._schema = Schema.from_field_name_and_types(fields)
+ else:
+ self._schema = Schema.from_pyarrow_schema(pa_schema)
+
+ warehouse = catalog_options.get("warehouse", "")
+ self._warehouse_scheme = urlparse(warehouse).scheme
+
+ self._file_format = table.options.file_format().lower()
+ self._is_parquet = self._file_format == PAIMON_FILE_FORMAT_PARQUET
+
+ self._partition_field_arrow_types: dict[str, pa.DataType] = (
+ {f.name: PyarrowFieldParser.from_paimon_type(f.type) for f in
table.partition_keys_fields}
+ if table.partition_keys
+ else {}
+ )
+
+ self._paimon_predicate: Predicate | None = None
+
+ @property
+ def name(self) -> str:
+ table_path = getattr(self._table, "table_path", None)
+ return f"PaimonDataSource({table_path})"
+
+ @property
+ def schema(self) -> Schema:
+ return self._schema
+
+ def get_partition_fields(self) -> list[PartitionField]:
+ partition_key_names = set(self._table.partition_keys)
+ return [PartitionField.create(f) for f in self._schema if f.name in
partition_key_names]
+
+ def push_filters(self, filters: list[PyExpr]) -> tuple[list[PyExpr],
list[PyExpr]]:
+ """Push filters down to Paimon scan.
+
+ Converts Daft expressions to Paimon predicates where possible.
+ Returns (pushed_filters, remaining_filters).
+ """
+ pushed_filters, remaining_filters, paimon_predicate =
convert_filters_to_paimon(self._table, filters)
+
+ self._paimon_predicate = paimon_predicate
+
+ if pushed_filters:
+ logger.debug(
+ "Paimon filter pushdown: %d filters pushed, %d remaining",
+ len(pushed_filters),
+ len(remaining_filters),
+ )
+
+ return pushed_filters, remaining_filters
+
+ async def get_tasks(self, pushdowns: Pushdowns) ->
AsyncIterator[DataSourceTask]:
+ read_table = self._table
+ if self._has_blob_columns:
+ read_table = self._table.copy({"blob-as-descriptor": "true"})
+
+ read_builder = read_table.new_read_builder()
+
+ if pushdowns.columns is not None:
+ read_builder =
read_builder.with_projection(list(pushdowns.columns))
+
+ if pushdowns.limit is not None:
+ read_builder = read_builder.with_limit(pushdowns.limit)
+
+ if self._paimon_predicate is not None:
+ read_builder = read_builder.with_filter(self._paimon_predicate)
+ logger.debug(
+ "Applied Paimon filter pushdown predicate: %s",
+ self._paimon_predicate,
+ )
+
+ if self._table.partition_keys and pushdowns.partition_filters is None:
+ logger.warning(
+ "%s has partition keys %s but no partition filter was
specified. "
+ "This will result in a full table scan.",
+ self.name,
+ list(self._table.partition_keys),
+ )
+
+ plan = read_builder.new_scan().plan()
+
+ pv_cache: dict[tuple[Any, ...], RecordBatch | None] = {}
+
+ for split in plan.splits():
+ if self._table.partition_keys and pushdowns.partition_filters is
not None:
+ pv_key = tuple(sorted(split.partition.to_dict().items()))
+ if pv_key not in pv_cache:
+ pv_cache[pv_key] = self._build_partition_values(split)
+ pv = pv_cache[pv_key]
+ if pv is not None and
len(pv.filter(ExpressionsProjection([pushdowns.partition_filters]))) == 0:
+ continue
+
+ _deletion_files = getattr(split, "data_deletion_files", None)
+ has_deletion_vectors = _deletion_files is not None and any(df is
not None for df in _deletion_files)
+
+ can_use_native_reader = (
+ self._is_parquet
+ and not self._has_blob_columns
+ and (not self._table.is_primary_key_table or
split.raw_convertible)
+ and not has_deletion_vectors
+ )
+
+ if can_use_native_reader:
+ pv = None
+ if self._table.partition_keys:
+ pv_key = tuple(sorted(split.partition.to_dict().items()))
+ if pv_key not in pv_cache:
+ pv_cache[pv_key] = self._build_partition_values(split)
+ pv = pv_cache[pv_key]
+
+ for data_file in split.files:
+ file_uri = self._build_file_uri(data_file.file_path)
+ yield DataSourceTask.parquet(
+ path=file_uri,
+ schema=self._schema,
+ pushdowns=pushdowns,
+ num_rows=data_file.row_count,
+ size_bytes=data_file.file_size,
+ partition_values=pv,
+ storage_config=self._storage_config,
+ )
+ else:
+ if not self._is_parquet:
+ reason = "non-parquet format"
+ elif self._has_blob_columns:
+ reason = "blob columns present"
+ elif has_deletion_vectors:
+ reason = "deletion vectors present"
+ else:
+ reason = "LSM merge required"
+ logger.debug(
+ "Split with %d files using pypaimon fallback (%s).",
+ len(split.files),
+ reason,
+ )
+ yield _PaimonPKSplitTask(read_table, split, self._schema,
self._blob_column_names)
+
+ def _build_file_uri(self, file_path: str) -> str:
+ """Reconstruct a full URI from a (potentially scheme-stripped)
file_path."""
+ if self._warehouse_scheme:
+ return f"{self._warehouse_scheme}://{file_path}"
+ return f"file://{file_path}"
+
+ def _build_partition_values(self, split: Split) ->
daft.recordbatch.RecordBatch | None:
+ """Build a single-row RecordBatch encoding the partition values for a
split."""
+ if not self._table.partition_keys:
+ return None
+
+ partition_dict = split.partition.to_dict()
+ arrays: dict[str, daft.Series] = {}
+ for pfield in self._table.partition_keys_fields:
+ value = partition_dict.get(pfield.name)
+ arrow_type = self._partition_field_arrow_types[pfield.name]
+ arrays[pfield.name] = daft.Series.from_arrow(pa.array([value],
type=arrow_type), name=pfield.name)
+
+ if not arrays:
+ return None
+ return daft.recordbatch.RecordBatch.from_pydict(arrays)
diff --git a/paimon-python/pypaimon/daft/daft_io_config.py
b/paimon-python/pypaimon/daft/daft_io_config.py
new file mode 100644
index 0000000000..ef3da7e98d
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_io_config.py
@@ -0,0 +1,89 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from __future__ import annotations
+
+from urllib.parse import urlparse
+
+from daft.io import IOConfig, S3Config
+
+
+def _convert_paimon_catalog_options_to_io_config(catalog_options: dict[str,
str]) -> IOConfig | None:
+ """Convert pypaimon catalog options to Daft IOConfig.
+
+ pypaimon supports only S3-like (s3://, s3a://, s3n://, oss://), HDFS, and
local (file://).
+
+ OSS uses Daft's OpenDAL backend (opendal_backends={"oss": {...}}) rather
than S3Config,
+ because Daft routes oss:// URIs to OpenDAL's OSS operator, not the
S3-compatible path.
+ """
+ warehouse = catalog_options.get("warehouse", "")
+ scheme = urlparse(warehouse).scheme if warehouse else ""
+
+ if scheme == "oss":
+ parsed = urlparse(warehouse)
+ oss_cfg: dict[str, str] = {}
+
+ bucket = parsed.netloc
+ if bucket:
+ oss_cfg["bucket"] = bucket
+
+ endpoint = catalog_options.get("fs.oss.endpoint")
+ if endpoint:
+ if not endpoint.startswith(("http://", "https://")):
+ endpoint = f"https://{endpoint}"
+ oss_cfg["endpoint"] = endpoint
+
+ key_id = catalog_options.get("fs.oss.accessKeyId")
+ if key_id:
+ oss_cfg["access_key_id"] = key_id
+
+ key_secret = catalog_options.get("fs.oss.accessKeySecret")
+ if key_secret:
+ oss_cfg["access_key_secret"] = key_secret
+
+ region = catalog_options.get("fs.oss.region")
+ if region:
+ oss_cfg["region"] = region
+
+ token = catalog_options.get("fs.oss.securityToken")
+ if token:
+ oss_cfg["security_token"] = token
+
+ return IOConfig(opendal_backends={"oss": oss_cfg}) if oss_cfg else None
+
+ # S3-compatible (s3://, s3a://, s3n://)
+ any_props_set = False
+
+ def get(key: str) -> str | None:
+ nonlocal any_props_set
+ val = catalog_options.get(key)
+ if val is not None:
+ any_props_set = True
+ return val
+
+ io_config = IOConfig(
+ s3=S3Config(
+ endpoint_url=get("fs.s3.endpoint"),
+ region_name=get("fs.s3.region"),
+ key_id=get("fs.s3.accessKeyId"),
+ access_key=get("fs.s3.accessKeySecret"),
+ session_token=get("fs.s3.securityToken"),
+ ),
+ )
+
+ return io_config if any_props_set else None
diff --git a/paimon-python/pypaimon/daft/daft_paimon.py
b/paimon-python/pypaimon/daft/daft_paimon.py
new file mode 100644
index 0000000000..b14050f884
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_paimon.py
@@ -0,0 +1,161 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Top-level API for reading and writing Paimon tables with Daft DataFrames.
+
+Usage::
+
+ from pypaimon.daft import read_paimon, write_paimon
+
+ df = read_paimon("db.table", catalog_options={"warehouse": "/path"})
+ write_paimon(df, "db.table", catalog_options={"warehouse": "/path"})
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Dict, Optional
+
+if TYPE_CHECKING:
+ import daft
+
+ from pypaimon.table.file_store_table import FileStoreTable
+
+
+def _read_table(
+ table: FileStoreTable,
+ catalog_options: Dict[str, str] | None = None,
+ io_config=None,
+ snapshot_id: int | None = None,
+ tag_name: str | None = None,
+) -> "daft.DataFrame":
+ """Read a Paimon table object into a lazy Daft DataFrame."""
+ if snapshot_id is not None and tag_name is not None:
+ raise ValueError(
+ "snapshot_id and tag_name cannot be set at the same time"
+ )
+
+ from daft import context, runners
+ from daft.daft import StorageConfig
+
+ from pypaimon.daft.daft_datasource import PaimonDataSource
+ from pypaimon.daft.daft_io_config import (
+ _convert_paimon_catalog_options_to_io_config,
+ )
+
+ travel_options: dict[str, str] = {}
+ if snapshot_id is not None:
+ travel_options["scan.snapshot-id"] = str(snapshot_id)
+ if tag_name is not None:
+ travel_options["scan.tag-name"] = tag_name
+ if travel_options:
+ table = table.copy(travel_options)
+
+ if catalog_options is None:
+ catalog_options = {}
+
+ io_config = io_config or
_convert_paimon_catalog_options_to_io_config(catalog_options)
+ io_config = io_config or
context.get_context().daft_planning_config.default_io_config
+
+ multithreaded_io = runners.get_or_create_runner().name != "ray"
+ storage_config = StorageConfig(multithreaded_io, io_config)
+
+ warehouse = catalog_options.get("warehouse", "")
+ scan_catalog_options = {"warehouse": warehouse} if warehouse else {}
+
+ source = PaimonDataSource(
+ table, storage_config=storage_config,
catalog_options=scan_catalog_options
+ )
+ return source.read()
+
+
+def _write_table(
+ df: "daft.DataFrame",
+ table: FileStoreTable,
+ mode: str = "append",
+) -> "daft.DataFrame":
+ """Write a Daft DataFrame to a Paimon table object."""
+ from pypaimon.daft.daft_datasink import PaimonDataSink
+
+ return df.write_sink(PaimonDataSink(table, mode))
+
+
+def read_paimon(
+ table_identifier: str,
+ catalog_options: Dict[str, str],
+ *,
+ snapshot_id: Optional[int] = None,
+ tag_name: Optional[str] = None,
+ io_config=None,
+) -> "daft.DataFrame":
+ """Read a Paimon table into a lazy Daft DataFrame.
+
+ Returns a lazy DataFrame backed by Daft's optimizer. Use standard
+ DataFrame operations (.select, .where, .limit) for projection,
+ filtering, and limit — they are automatically pushed down into the
+ Paimon scan via Daft's DataSource protocol.
+
+ Args:
+ table_identifier: Full table name, e.g. ``"db_name.table_name"``.
+ catalog_options: Options passed to ``CatalogFactory.create()``,
+ e.g. ``{"warehouse": "/path/to/warehouse"}``.
+ snapshot_id: Optional snapshot id to time-travel to. Mutually
+ exclusive with ``tag_name``.
+ tag_name: Optional tag name to time-travel to. Mutually
+ exclusive with ``snapshot_id``.
+ io_config: Optional Daft IOConfig for accessing object storage.
+ If None, will be inferred from the catalog options.
+
+ Returns:
+ A lazy ``daft.DataFrame`` backed by this Paimon table.
+ """
+ from pypaimon.catalog.catalog_factory import CatalogFactory
+
+ catalog = CatalogFactory.create(catalog_options)
+ table = catalog.get_table(table_identifier)
+
+ return _read_table(
+ table, catalog_options=catalog_options,
+ io_config=io_config, snapshot_id=snapshot_id, tag_name=tag_name,
+ )
+
+
+def write_paimon(
+ df: "daft.DataFrame",
+ table_identifier: str,
+ catalog_options: Dict[str, str],
+ *,
+ mode: str = "append",
+) -> "daft.DataFrame":
+ """Write a Daft DataFrame to a Paimon table.
+
+ Args:
+ df: The Daft DataFrame to write.
+ table_identifier: Full table name, e.g. ``"db_name.table_name"``.
+ catalog_options: Options passed to ``CatalogFactory.create()``.
+ mode: Write mode — ``"append"`` or ``"overwrite"``.
+
+ Returns:
+ A summary ``daft.DataFrame`` with columns
+ (operation, rows, file_size, file_name).
+ """
+ from pypaimon.catalog.catalog_factory import CatalogFactory
+
+ catalog = CatalogFactory.create(catalog_options)
+ table = catalog.get_table(table_identifier)
+
+ return _write_table(df, table, mode=mode)
diff --git a/paimon-python/pypaimon/daft/daft_predicate_visitor.py
b/paimon-python/pypaimon/daft/daft_predicate_visitor.py
new file mode 100644
index 0000000000..0a3d03d5cf
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_predicate_visitor.py
@@ -0,0 +1,248 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Filter conversion utilities for Paimon pushdowns.
+
+This module provides utilities to convert Daft expressions to Paimon predicates
+for filter pushdown optimization using the Visitor pattern.
+"""
+
+from __future__ import annotations
+
+import logging
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
+
+from daft.expressions.visitor import PredicateVisitor
+
+if TYPE_CHECKING:
+ from pypaimon.common.predicate import Predicate
+ from pypaimon.common.predicate_builder import PredicateBuilder
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ from daft.daft import PyExpr
+ from daft.expressions import Expression
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(frozen=True, slots=True)
+class _ColRef:
+ """Column reference marker to distinguish columns from literal values in
the tree fold."""
+
+ name: str
+
+
+class PaimonPredicateVisitor(PredicateVisitor[Any]):
+ """Tree fold visitor that converts Daft expressions to Paimon predicates.
+
+ Leaf nodes return their values (_ColRef for columns, raw values for
literals).
+ Predicate nodes return Paimon Predicate objects, or None if unsupported.
+
+ Supported operations:
+ - Comparison: ==, !=, <, <=, >, >=
+ - Is null / Is not null
+ - Is in
+ - Between (inclusive)
+ - String: startswith, endswith, contains
+ - Logical: and, or
+ """
+
+ def __init__(self, builder: PredicateBuilder) -> None:
+ self._builder = builder
+
+ # -- Leaf nodes --
+
+ def visit_col(self, name: str) -> _ColRef:
+ return _ColRef(name)
+
+ def visit_lit(self, value: Any) -> Any:
+ return value
+
+ def visit_alias(self, expr: Expression, alias: str) -> Any:
+ return self.visit(expr)
+
+ def visit_cast(self, expr: Expression, dtype: Any) -> None:
+ return None
+
+ def visit_coalesce(self, args: list[Expression]) -> None:
+ return None
+
+ def visit_function(self, name: str, args: list[Expression]) -> None:
+ logger.debug("Function '%s' is not supported for Paimon pushdown",
name)
+
+ # -- Logical operators --
+
+ def visit_and(self, left: Expression, right: Expression) -> Predicate |
None:
+ left_pred = self.visit(left)
+ right_pred = self.visit(right)
+ if left_pred is not None and right_pred is not None:
+ return self._builder.and_predicates([left_pred, right_pred])
+ return None
+
+ def visit_or(self, left: Expression, right: Expression) -> Predicate |
None:
+ left_pred = self.visit(left)
+ right_pred = self.visit(right)
+ if left_pred is not None and right_pred is not None:
+ return self._builder.or_predicates([left_pred, right_pred])
+ return None
+
+ def visit_not(self, expr: Expression) -> None:
+ return None
+
+ # -- Comparison operators --
+
+ def _cmp(self, left: Expression, right: Expression, fn: Any, fn_swapped:
Any) -> Predicate | None:
+ """Fold a binary comparison: extract col ref and literal value, then
apply fn.
+
+ If the column is on the right side (e.g. ``3 < col``), apply
``fn_swapped``
+ instead so the operator is reversed along with the operands. For
symmetric
+ operators (==, !=), ``fn`` and ``fn_swapped`` are the same.
+ """
+ lhs, rhs = self.visit(left), self.visit(right)
+ if isinstance(lhs, _ColRef) and not isinstance(rhs, _ColRef):
+ return fn(lhs.name, rhs)
+ if isinstance(rhs, _ColRef) and not isinstance(lhs, _ColRef):
+ return fn_swapped(rhs.name, lhs)
+ return None
+
+ def visit_equal(self, left: Expression, right: Expression) -> Predicate |
None:
+ return self._cmp(left, right, self._builder.equal, self._builder.equal)
+
+ def visit_not_equal(self, left: Expression, right: Expression) ->
Predicate | None:
+ return self._cmp(left, right, self._builder.not_equal,
self._builder.not_equal)
+
+ def visit_less_than(self, left: Expression, right: Expression) ->
Predicate | None:
+ return self._cmp(left, right, self._builder.less_than,
self._builder.greater_than)
+
+ def visit_less_than_or_equal(self, left: Expression, right: Expression) ->
Predicate | None:
+ return self._cmp(left, right, self._builder.less_or_equal,
self._builder.greater_or_equal)
+
+ def visit_greater_than(self, left: Expression, right: Expression) ->
Predicate | None:
+ return self._cmp(left, right, self._builder.greater_than,
self._builder.less_than)
+
+ def visit_greater_than_or_equal(self, left: Expression, right: Expression)
-> Predicate | None:
+ return self._cmp(left, right, self._builder.greater_or_equal,
self._builder.less_or_equal)
+
+ # -- Set/range predicates --
+
+ def visit_between(self, expr: Expression, lower: Expression, upper:
Expression) -> Predicate | None:
+ col = self.visit(expr)
+ if not isinstance(col, _ColRef):
+ return None
+ lower_val = self.visit(lower)
+ upper_val = self.visit(upper)
+ if lower_val is None or upper_val is None:
+ return None
+ return self._builder.between(col.name, lower_val, upper_val)
+
+ def visit_is_in(self, expr: Expression, items: list[Expression]) ->
Predicate | None:
+ col = self.visit(expr)
+ if not isinstance(col, _ColRef):
+ return None
+ values = [self.visit(item) for item in items]
+ if any(v is None or isinstance(v, _ColRef) for v in values):
+ return None
+ return self._builder.is_in(col.name, values)
+
+ # -- Null predicates --
+
+ def visit_is_null(self, expr: Expression) -> Predicate | None:
+ col = self.visit(expr)
+ return self._builder.is_null(col.name) if isinstance(col, _ColRef)
else None
+
+ def visit_not_null(self, expr: Expression) -> Predicate | None:
+ col = self.visit(expr)
+ return self._builder.is_not_null(col.name) if isinstance(col, _ColRef)
else None
+
+ # -- String predicates --
+
+ def visit_starts_with(self, input: Expression, prefix: Expression) ->
Predicate | None:
+ col = self.visit(input)
+ if not isinstance(col, _ColRef):
+ return None
+ return self._builder.startswith(col.name, str(self.visit(prefix)))
+
+ def visit_ends_with(self, input: Expression, suffix: Expression) ->
Predicate | None:
+ col = self.visit(input)
+ if not isinstance(col, _ColRef):
+ return None
+ return self._builder.endswith(col.name, str(self.visit(suffix)))
+
+ def visit_contains(self, input: Expression, substring: Expression) ->
Predicate | None:
+ col = self.visit(input)
+ if not isinstance(col, _ColRef):
+ return None
+ return self._builder.contains(col.name, str(self.visit(substring)))
+
+
+def convert_filters_to_paimon(
+ table: FileStoreTable,
+ py_filters: list[PyExpr] | PyExpr,
+) -> tuple[list[PyExpr], list[PyExpr], Predicate | None]:
+ """Convert Daft filters to Paimon predicate.
+
+ Args:
+ table: Paimon table object (used to create predicate builder)
+ py_filters: Single PyExpr filter or list of PyExpr filters to convert
+
+ Returns:
+ Tuple of (pushed_filters, remaining_filters, combined_predicate)
+ """
+ from daft.expressions import Expression
+
+ if not isinstance(py_filters, list):
+ py_filters = [py_filters]
+
+ if not py_filters:
+ return [], [], None
+
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ converter = PaimonPredicateVisitor(predicate_builder)
+
+ pushed_filters: list[PyExpr] = []
+ remaining_filters: list[PyExpr] = []
+ predicates: list[Predicate] = []
+
+ for py_expr in py_filters:
+ expr = Expression._from_pyexpr(py_expr)
+ predicate = converter.visit(expr)
+
+ if predicate is not None:
+ pushed_filters.append(py_expr)
+ predicates.append(predicate)
+ else:
+ remaining_filters.append(py_expr)
+ logger.debug("Filter %s cannot be pushed down to Paimon", expr)
+
+ combined_predicate: Predicate | None = None
+ if predicates:
+ combined_predicate = predicates[0]
+ for pred in predicates[1:]:
+ combined_predicate =
predicate_builder.and_predicates([combined_predicate, pred])
+
+ if pushed_filters:
+ logger.debug(
+ "Paimon filter pushdown: %d filters pushed, %d remaining",
+ len(pushed_filters),
+ len(remaining_filters),
+ )
+
+ return pushed_filters, remaining_filters, combined_predicate
diff --git a/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
new file mode 100644
index 0000000000..63ba577531
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
@@ -0,0 +1,233 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for PaimonCatalog REST catalog path (using mocks, no real server
needed)."""
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from daft.catalog import Identifier, NotFoundError
+from pypaimon.catalog.catalog_exception import (
+ DatabaseNotExistException,
+ TableNotExistException,
+)
+
+from pypaimon.daft.daft_catalog import PaimonCatalog
+
+# ---------------------------------------------------------------------------
+# Helpers: build a mock inner catalog that mimics RESTCatalog's interface
+# ---------------------------------------------------------------------------
+
+
+def _make_rest_inner(
+ databases: list[str] | None = None,
+ tables_by_db: dict[str, list[str]] | None = None,
+):
+ """Return a mock that quacks like a RESTCatalog."""
+ inner = MagicMock(spec=pypaimon.catalog.catalog.Catalog)
+
+ inner.list_databases = MagicMock(return_value=databases or [])
+ inner.list_tables = MagicMock(side_effect=lambda db: (tables_by_db or
{}).get(db, []))
+ inner.drop_database = MagicMock()
+ inner.drop_table = MagicMock()
+
+ inner.get_database = MagicMock()
+ inner.create_database = MagicMock()
+ inner.get_table = MagicMock()
+ inner.create_table = MagicMock()
+
+ return inner
+
+
+# ---------------------------------------------------------------------------
+# _list_namespaces — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_list_namespaces_delegates_to_list_databases():
+ inner = _make_rest_inner(databases=["db_a", "db_b", "db_c"])
+ cat = PaimonCatalog(inner)
+
+ result = cat.list_namespaces()
+
+ inner.list_databases.assert_called_once()
+ assert Identifier("db_a") in result
+ assert Identifier("db_b") in result
+ assert Identifier("db_c") in result
+
+
+def test_rest_list_namespaces_with_pattern():
+ inner = _make_rest_inner(databases=["prod_orders", "prod_users",
"staging_data"])
+ cat = PaimonCatalog(inner)
+
+ result = cat.list_namespaces(pattern="prod")
+
+ assert all(str(r).startswith("prod") for r in result)
+ assert len(result) == 2
+
+
+def test_rest_list_namespaces_empty():
+ inner = _make_rest_inner(databases=[])
+ cat = PaimonCatalog(inner)
+ assert cat.list_namespaces() == []
+
+
+# ---------------------------------------------------------------------------
+# _list_tables — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_list_tables_delegates_to_list_databases_and_list_tables():
+ inner = _make_rest_inner(
+ databases=["db_a", "db_b"],
+ tables_by_db={"db_a": ["orders", "users"], "db_b": ["events"]},
+ )
+ cat = PaimonCatalog(inner)
+
+ result = cat.list_tables()
+
+ assert Identifier("db_a", "orders") in result
+ assert Identifier("db_a", "users") in result
+ assert Identifier("db_b", "events") in result
+ assert len(result) == 3
+
+
+def test_rest_list_tables_calls_list_tables_per_database():
+ inner = _make_rest_inner(
+ databases=["db_a", "db_b"],
+ tables_by_db={"db_a": ["t1"], "db_b": ["t2"]},
+ )
+ cat = PaimonCatalog(inner)
+ cat.list_tables()
+
+ assert inner.list_tables.call_count == 2
+ inner.list_tables.assert_any_call("db_a")
+ inner.list_tables.assert_any_call("db_b")
+
+
+def test_rest_list_tables_with_pattern():
+ inner = _make_rest_inner(
+ databases=["db_a"],
+ tables_by_db={"db_a": ["orders", "order_items", "users"]},
+ )
+ cat = PaimonCatalog(inner)
+
+ result = cat.list_tables(pattern="db_a.order")
+
+ assert Identifier("db_a", "orders") in result
+ assert Identifier("db_a", "order_items") in result
+ assert Identifier("db_a", "users") not in result
+
+
+def test_rest_list_tables_empty_database():
+ inner = _make_rest_inner(
+ databases=["empty_db"],
+ tables_by_db={"empty_db": []},
+ )
+ cat = PaimonCatalog(inner)
+ assert cat.list_tables() == []
+
+
+# ---------------------------------------------------------------------------
+# _drop_namespace — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_drop_namespace_delegates_to_drop_database():
+ inner = _make_rest_inner(databases=["my_db"])
+ cat = PaimonCatalog(inner)
+
+ cat.drop_namespace("my_db")
+
+ inner.drop_database.assert_called_once_with("my_db",
ignore_if_not_exists=False)
+
+
+def test_rest_drop_namespace_not_found_raises_notfounderror():
+ inner = _make_rest_inner()
+ inner.drop_database.side_effect = DatabaseNotExistException("my_db")
+ cat = PaimonCatalog(inner)
+
+ with pytest.raises(NotFoundError):
+ cat.drop_namespace("my_db")
+
+
+# ---------------------------------------------------------------------------
+# _drop_table — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_drop_table_delegates_to_drop_table():
+ inner = _make_rest_inner()
+ cat = PaimonCatalog(inner)
+
+ cat.drop_table("my_db.my_table")
+
+ inner.drop_table.assert_called_once_with("my_db.my_table",
ignore_if_not_exists=False)
+
+
+def test_rest_drop_table_not_found_raises_notfounderror():
+ inner = _make_rest_inner()
+ fake_ident = MagicMock()
+ fake_ident.get_full_name.return_value = "my_db.my_table"
+ inner.drop_table.side_effect = TableNotExistException(fake_ident)
+ cat = PaimonCatalog(inner)
+
+ with pytest.raises(NotFoundError):
+ cat.drop_table("my_db.my_table")
+
+
+# ---------------------------------------------------------------------------
+# _has_namespace — strips catalog prefix from multi-part identifiers
+# ---------------------------------------------------------------------------
+
+
+def test_has_namespace_single_part():
+ inner = _make_rest_inner()
+ inner.get_database.return_value = MagicMock()
+ cat = PaimonCatalog(inner)
+
+ assert cat.has_namespace("my_db") is True
+ inner.get_database.assert_called_once_with("my_db")
+
+
+def test_has_namespace_not_found_returns_false():
+ inner = _make_rest_inner()
+ inner.get_database.side_effect = DatabaseNotExistException("nope")
+ cat = PaimonCatalog(inner)
+
+ assert cat.has_namespace("nope") is False
+
+
+# ---------------------------------------------------------------------------
+# _create_namespace — delegates properly
+# ---------------------------------------------------------------------------
+
+
+def test_create_namespace_single_part():
+ inner = _make_rest_inner()
+ cat = PaimonCatalog(inner)
+
+ cat.create_namespace("new_db")
+
+ inner.create_database.assert_called_once_with("new_db",
ignore_if_exists=False)
diff --git a/paimon-python/pypaimon/tests/daft/daft_catalog_test.py
b/paimon-python/pypaimon/tests/daft/daft_catalog_test.py
new file mode 100644
index 0000000000..0b34452ed4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_catalog_test.py
@@ -0,0 +1,333 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for PaimonCatalog and PaimonTable Daft catalog wrappers."""
+
+from __future__ import annotations
+
+import pyarrow as pa
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from pypaimon.daft.daft_catalog import PaimonCatalog, PaimonTable
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
[email protected]
+def inner_catalog(tmp_path):
+ """A bare pypaimon FileSystemCatalog with a 'test_db' database."""
+ catalog = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ catalog.create_database("test_db", ignore_if_exists=True)
+ return catalog, tmp_path
+
+
[email protected]
+def inner_catalog_with_table(inner_catalog):
+ """A pypaimon catalog pre-populated with an append-only partitioned
table."""
+ catalog, tmp_path = inner_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("dt", pa.string()),
+ ]
+ ),
+ partition_keys=["dt"],
+ options={"bucket": "1", "file.format": "parquet"},
+ )
+ catalog.create_table("test_db.test_table", schema, ignore_if_exists=True)
+
+ # Pre-populate with data
+ table = catalog.get_table("test_db.test_table")
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], pa.int64()),
+ "name": pa.array(["alice", "bob", "carol"]),
+ "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-02"]),
+ }
+ )
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ try:
+ table_write.write_arrow(data)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ finally:
+ table_write.close()
+ table_commit.close()
+
+ return catalog, tmp_path
+
+
[email protected]
+def paimon_catalog(inner_catalog_with_table):
+ """Daft PaimonCatalog wrapping the pre-populated inner catalog."""
+ catalog, tmp_path = inner_catalog_with_table
+ return PaimonCatalog(catalog, name="test_paimon"), catalog, tmp_path
+
+
+# ---------------------------------------------------------------------------
+# PaimonCatalog — basic properties
+# ---------------------------------------------------------------------------
+
+
+def test_catalog_name(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ assert daft_catalog.name == "test_paimon"
+
+
+def test_catalog_default_name(inner_catalog):
+ inner, _ = inner_catalog
+ daft_catalog = PaimonCatalog(inner)
+ assert daft_catalog.name == "paimon"
+
+
+# ---------------------------------------------------------------------------
+# PaimonCatalog — namespace operations
+# ---------------------------------------------------------------------------
+
+
+def test_catalog_has_namespace(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ assert daft_catalog.has_namespace("test_db")
+ assert not daft_catalog.has_namespace("nonexistent_db")
+
+
+def test_catalog_list_namespaces(paimon_catalog):
+ from daft.catalog import Identifier
+
+ daft_catalog, _, _ = paimon_catalog
+ namespaces = daft_catalog.list_namespaces()
+ assert Identifier("test_db") in namespaces
+
+
+def test_catalog_create_namespace(tmp_path):
+ inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ daft_catalog = PaimonCatalog(inner)
+ daft_catalog.create_namespace("new_db")
+ assert daft_catalog.has_namespace("new_db")
+
+
+def test_catalog_create_namespace_if_not_exists(tmp_path):
+ inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ daft_catalog = PaimonCatalog(inner)
+ daft_catalog.create_namespace_if_not_exists("myns")
+ daft_catalog.create_namespace_if_not_exists("myns") # should not raise
+ assert daft_catalog.has_namespace("myns")
+
+
+def test_catalog_drop_namespace(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ daft_catalog.create_namespace("drop_me_db")
+ assert daft_catalog.has_namespace("drop_me_db")
+ daft_catalog.drop_namespace("drop_me_db")
+ assert not daft_catalog.has_namespace("drop_me_db")
+
+
+# ---------------------------------------------------------------------------
+# PaimonCatalog — table operations
+# ---------------------------------------------------------------------------
+
+
+def test_catalog_has_table(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ assert daft_catalog.has_table("test_db.test_table")
+ assert not daft_catalog.has_table("test_db.nonexistent_table")
+ assert not daft_catalog.has_table("nonexistent_db.test_table")
+
+
+def test_catalog_list_tables(paimon_catalog):
+ from daft.catalog import Identifier
+
+ daft_catalog, _, _ = paimon_catalog
+ tables = daft_catalog.list_tables()
+ assert Identifier("test_db", "test_table") in tables
+
+
+def test_catalog_list_tables_with_pattern(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ tables = daft_catalog.list_tables(pattern="test_db")
+ assert len(tables) > 0
+ tables_no_match = daft_catalog.list_tables(pattern="other_db")
+ assert len(tables_no_match) == 0
+
+
+def test_catalog_get_table(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ table = daft_catalog.get_table("test_db.test_table")
+ assert table.name == "test_table"
+
+
+def test_catalog_get_table_not_found(paimon_catalog):
+ from daft.catalog import NotFoundError
+
+ daft_catalog, _, _ = paimon_catalog
+ with pytest.raises(NotFoundError):
+ daft_catalog.get_table("test_db.nonexistent_table")
+
+
+def test_catalog_drop_table(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ assert daft_catalog.has_table("test_db.test_table")
+ daft_catalog.drop_table("test_db.test_table")
+ assert not daft_catalog.has_table("test_db.test_table")
+
+
+def test_catalog_create_table(tmp_path):
+ inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ inner.create_database("mydb", ignore_if_exists=True)
+ daft_catalog = PaimonCatalog(inner)
+
+ schema = daft.from_pydict({"id": [1, 2], "name": ["a", "b"]}).schema()
+ table = daft_catalog.create_table("mydb.new_table", schema)
+ assert table.name == "new_table"
+ assert daft_catalog.has_table("mydb.new_table")
+
+
+def test_catalog_create_table_with_partitions(tmp_path):
+ from daft.io.partitioning import PartitionField
+
+ inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ inner.create_database("mydb", ignore_if_exists=True)
+ daft_catalog = PaimonCatalog(inner)
+
+ df = daft.from_pydict({"id": [1], "name": ["a"], "dt": ["2024-01-01"]})
+ schema = df.schema()
+ dt_field = schema["dt"]
+ partition_fields = [PartitionField.create(dt_field)]
+ table = daft_catalog.create_table("mydb.part_table", schema,
partition_fields=partition_fields)
+ assert table.name == "part_table"
+ assert table.partition_keys == ["dt"]
+
+
+# ---------------------------------------------------------------------------
+# PaimonTable — read / write
+# ---------------------------------------------------------------------------
+
+
+def test_table_read(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ table = daft_catalog.get_table("test_db.test_table")
+ df = table.read()
+ result = df.sort("id").to_pydict()
+ assert result["id"] == [1, 2, 3]
+
+
+def test_table_append(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ table = daft_catalog.get_table("test_db.test_table")
+ new_rows = daft.from_pydict({"id": [99], "name": ["zara"], "dt":
["2024-03-01"]})
+ table.append(new_rows)
+ ids = sorted(table.read().to_pydict()["id"])
+ assert 99 in ids
+
+
+def test_table_overwrite(paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ table = daft_catalog.get_table("test_db.test_table")
+ replacement = daft.from_pydict({"id": [100, 200], "name": ["p", "q"],
"dt": ["2024-01-01", "2024-01-02"]})
+ table.overwrite(replacement)
+ result = sorted(table.read().to_pydict()["id"])
+ assert result == [100, 200]
+
+
+# ---------------------------------------------------------------------------
+# PaimonTable — direct wrapping
+# ---------------------------------------------------------------------------
+
+
+def test_table_direct_wrap(inner_catalog_with_table):
+ inner, _ = inner_catalog_with_table
+ inner_table = inner.get_table("test_db.test_table")
+ table = PaimonTable(inner_table)
+ assert table.name == "test_table"
+ df = table.read()
+ assert df.count_rows() == 3
+
+
+# ---------------------------------------------------------------------------
+# PaimonTable — properties
+# ---------------------------------------------------------------------------
+
+
+class TestPaimonTableProperties:
+ """Tests for PaimonTable properties."""
+
+ @pytest.fixture
+ def pk_catalog(self, tmp_path):
+ """Create a catalog with primary key table for testing properties."""
+ inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ inner.create_database("test_db", ignore_if_exists=True)
+
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("value", pa.int64()),
+ ]
+ ),
+ primary_keys=["id"],
+ options={"bucket": "2"},
+ )
+ inner.create_table("test_db.pk_table", schema, ignore_if_exists=True)
+
+ schema2 = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("dt", pa.string()),
+ ]
+ ),
+ partition_keys=["dt"],
+ primary_keys=["id"],
+ options={"bucket": "1"},
+ )
+ inner.create_table("test_db.partitioned_pk", schema2,
ignore_if_exists=True)
+
+ return PaimonCatalog(inner)
+
+ def test_append_only_table_properties(self, paimon_catalog):
+ daft_catalog, _, _ = paimon_catalog
+ table = daft_catalog.get_table("test_db.test_table")
+ assert table.is_primary_key_table is False
+ assert table.primary_keys == []
+ assert table.partition_keys == ["dt"]
+
+ def test_primary_key_table_properties(self, pk_catalog):
+ table = pk_catalog.get_table("test_db.pk_table")
+ assert table.is_primary_key_table is True
+ assert table.primary_keys == ["id"]
+ assert table.partition_keys == []
+ assert table.bucket_count == 2
+ assert table.table_options.get("bucket") == "2"
+
+ def test_partitioned_primary_key_table_properties(self, pk_catalog):
+ table = pk_catalog.get_table("test_db.partitioned_pk")
+ assert table.is_primary_key_table is True
+ assert table.primary_keys == ["id"]
+ assert table.partition_keys == ["dt"]
+ assert table.bucket_count == 1
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
new file mode 100644
index 0000000000..e9e32efa93
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -0,0 +1,557 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for _read_table() — reads from a local Paimon filesystem catalog.
+
+All tests run without Docker or external services; pypaimon is used directly
+to create and populate test tables, then _read_table() is validated.
+"""
+
+from __future__ import annotations
+
+import decimal
+
+import pyarrow as pa
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from daft import col
+
+from pypaimon.daft.daft_paimon import _read_table
+
+
+# ---------------------------------------------------------------------------
+# Helper
+# ---------------------------------------------------------------------------
+
+
+def _write_to_paimon(table, arrow_table, mode="append",
overwrite_partition=None):
+ write_builder = table.new_batch_write_builder()
+ if mode == "overwrite":
+ write_builder.overwrite(overwrite_partition or {})
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ try:
+ table_write.write_arrow(arrow_table)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ finally:
+ table_write.close()
+ table_commit.close()
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
[email protected](scope="function")
+def local_paimon_catalog(tmp_path):
+ catalog = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ catalog.create_database("test_db", ignore_if_exists=True)
+ return catalog, tmp_path
+
+
[email protected]
+def append_only_table(local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("value", pa.float64()),
+ pa.field("dt", pa.string()),
+ ]),
+ partition_keys=["dt"],
+ options={"bucket": "1", "file.format": "parquet"},
+ )
+ catalog.create_table("test_db.append_table", schema,
ignore_if_exists=False)
+ return catalog.get_table("test_db.append_table"), tmp_path
+
+
[email protected]
+def pk_table(local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("dt", pa.string()),
+ ]),
+ partition_keys=["dt"],
+ primary_keys=["id", "dt"],
+ options={"bucket": "1", "file.format": "parquet"},
+ )
+ catalog.create_table("test_db.pk_table", schema, ignore_if_exists=False)
+ return catalog.get_table("test_db.pk_table"), tmp_path
+
+
+# ---------------------------------------------------------------------------
+# Basic read roundtrip
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_basic(append_only_table):
+ """Write data via pypaimon, read back via _read_table(table), verify
correctness."""
+ table, tmp_path = append_only_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], pa.int64()),
+ "name": pa.array(["alice", "bob", "charlie"], pa.string()),
+ "value": pa.array([1.1, 2.2, 3.3], pa.float64()),
+ "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01"],
pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.sort("id").to_arrow()
+
+ assert result.num_rows == 3
+ assert result.schema.field("id").type == pa.int64()
+ assert result.schema.field("name").type in (pa.string(), pa.large_string())
+ assert result.column("id").to_pylist() == [1, 2, 3]
+ assert result.column("name").to_pylist() == ["alice", "bob", "charlie"]
+
+
+def test_read_paimon_empty_table(append_only_table):
+ """Reading an empty table should return a DataFrame with the correct
schema but zero rows."""
+ table, _ = append_only_table
+ df = _read_table(table)
+ result = df.to_arrow()
+
+ assert result.num_rows == 0
+ assert "id" in result.schema.names
+ assert "name" in result.schema.names
+ assert "dt" in result.schema.names
+
+
+def test_read_paimon_schema_matches(append_only_table):
+ """The schema reported by _read_table(table) should match the Paimon table
schema."""
+ table, _ = append_only_table
+ df = _read_table(table)
+ schema = df.schema()
+
+ assert "id" in schema.column_names()
+ assert "name" in schema.column_names()
+ assert "value" in schema.column_names()
+ assert "dt" in schema.column_names()
+
+
+# ---------------------------------------------------------------------------
+# Multi-partition reads
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_multiple_partitions(append_only_table):
+ """Data spread across multiple partitions should all be read back."""
+ table, _ = append_only_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3, 4], pa.int64()),
+ "name": pa.array(["a", "b", "c", "d"], pa.string()),
+ "value": pa.array([1.0, 2.0, 3.0, 4.0], pa.float64()),
+ "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-02",
"2024-01-02"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.sort("id").to_arrow()
+
+ assert result.num_rows == 4
+ assert sorted(result.column("id").to_pylist()) == [1, 2, 3, 4]
+ assert set(result.column("dt").to_pylist()) == {"2024-01-01", "2024-01-02"}
+
+
+def test_read_paimon_partition_column_present(append_only_table):
+ """Partition columns must appear in the result schema and have correct
values."""
+ table, _ = append_only_table
+ data = pa.table(
+ {
+ "id": pa.array([10], pa.int64()),
+ "name": pa.array(["x"], pa.string()),
+ "value": pa.array([9.9], pa.float64()),
+ "dt": pa.array(["2024-03-15"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.to_arrow()
+
+ assert "dt" in result.schema.names
+ assert result.column("dt").to_pylist() == ["2024-03-15"]
+
+
+# ---------------------------------------------------------------------------
+# Column projection
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_column_projection(append_only_table):
+ """Requesting a subset of columns should work without errors."""
+ table, _ = append_only_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2], pa.int64()),
+ "name": pa.array(["a", "b"], pa.string()),
+ "value": pa.array([1.0, 2.0], pa.float64()),
+ "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table).select("id", "name")
+ result = df.sort("id").to_arrow()
+
+ assert result.schema.names == ["id", "name"]
+ assert result.num_rows == 2
+ assert result.column("id").to_pylist() == [1, 2]
+
+
+def test_read_paimon_multiple_batches(local_paimon_catalog):
+ """Writing data in two separate batches (two snapshots) should read all
rows."""
+ catalog, tmp_path = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema(
+ [
+ pa.field("id", pa.int64()),
+ pa.field("v", pa.string()),
+ pa.field("dt", pa.string()),
+ ]
+ ),
+ partition_keys=["dt"],
+ options={"bucket": "1", "file.format": "parquet"},
+ )
+ catalog.create_table("test_db.multi_batch", schema, ignore_if_exists=False)
+ table = catalog.get_table("test_db.multi_batch")
+
+ batch1 = pa.table({"id": [1, 2], "v": ["a", "b"], "dt": ["2024-01-01",
"2024-01-01"]})
+ batch2 = pa.table({"id": [3, 4], "v": ["c", "d"], "dt": ["2024-01-02",
"2024-01-02"]})
+
+ _write_to_paimon(table, batch1)
+ _write_to_paimon(table, batch2)
+
+ df = _read_table(table)
+ result = df.sort("id").to_arrow()
+
+ assert result.num_rows == 4
+ assert result.column("id").to_pylist() == [1, 2, 3, 4]
+
+
+# ---------------------------------------------------------------------------
+# Primary-key table (LSM merge fall-back)
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_pk_table_basic(pk_table):
+ """Primary-key table: falls back to pypaimon reader, data should be
correct."""
+ table, _ = pk_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], pa.int64()),
+ "name": pa.array(["x", "y", "z"], pa.string()),
+ "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01"],
pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.sort("id").to_arrow()
+
+ assert result.num_rows == 3
+ assert result.column("id").to_pylist() == [1, 2, 3]
+ assert result.column("name").to_pylist() == ["x", "y", "z"]
+
+
+def test_read_paimon_pk_table_deduplication(pk_table):
+ """Writing the same PK twice should result in the latest value being
visible."""
+ table, _ = pk_table
+ batch1 = pa.table(
+ {
+ "id": pa.array([1, 2], pa.int64()),
+ "name": pa.array(["old_a", "old_b"], pa.string()),
+ "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch1)
+
+ batch2 = pa.table(
+ {
+ "id": pa.array([1], pa.int64()),
+ "name": pa.array(["new_a"], pa.string()),
+ "dt": pa.array(["2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch2)
+
+ df = _read_table(table)
+ result = df.sort("id").to_arrow()
+
+ assert result.num_rows == 2
+ id1_row = [row for row in zip(result.column("id").to_pylist(),
result.column("name").to_pylist()) if row[0] == 1]
+ assert len(id1_row) == 1
+ assert id1_row[0][1] == "new_a"
+
+
+# ---------------------------------------------------------------------------
+# Filter pushdown
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_partition_filter(append_only_table):
+ """Partition filter should prune partitions at scan time."""
+ table, _ = append_only_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3, 4], pa.int64()),
+ "name": pa.array(["a", "b", "c", "d"], pa.string()),
+ "value": pa.array([1.0, 2.0, 3.0, 4.0], pa.float64()),
+ "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-02",
"2024-01-02"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table).where(col("dt") == "2024-01-01")
+ result = df.sort("id").to_arrow()
+
+ assert result.num_rows == 2
+ assert result.column("id").to_pylist() == [1, 2]
+ assert all(dt == "2024-01-01" for dt in result.column("dt").to_pylist())
+
+
+def test_read_paimon_row_filter(append_only_table):
+ """Row-level filter should be applied after reading data."""
+ table, _ = append_only_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3, 4], pa.int64()),
+ "name": pa.array(["alice", "bob", "charlie", "dave"], pa.string()),
+ "value": pa.array([10.0, 20.0, 30.0, 40.0], pa.float64()),
+ "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01",
"2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table).where(col("id") > 2)
+ result = df.sort("id").to_arrow()
+
+ assert result.num_rows == 2
+ assert result.column("id").to_pylist() == [3, 4]
+
+
+def test_read_paimon_combined_filter(append_only_table):
+ """Combined partition + row filter should work together."""
+ table, _ = append_only_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3, 4, 5, 6], pa.int64()),
+ "name": pa.array(["a", "b", "c", "d", "e", "f"], pa.string()),
+ "value": pa.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], pa.float64()),
+ "dt": pa.array(
+ ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02",
"2024-01-03", "2024-01-03"],
+ pa.string(),
+ ),
+ }
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table).where((col("dt") == "2024-01-01") & (col("id") ==
2))
+ result = df.to_arrow()
+
+ assert result.num_rows == 1
+ assert result.column("id").to_pylist() == [2]
+ assert result.column("dt").to_pylist() == ["2024-01-01"]
+
+
+# ---------------------------------------------------------------------------
+# Filter pushdown tests
+# ---------------------------------------------------------------------------
+
+
+class TestFilterPushdown:
+ """Tests for filter pushdown to Paimon."""
+
+ @pytest.fixture
+ def filter_table(self, local_paimon_catalog):
+ """Create a table for filter pushdown tests."""
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("value", pa.string())])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.filter_test", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.filter_test")
+
+ data = pa.table({"id": [1, 2, 3, 4, 5], "value": ["a", "b", "c", "d",
"e"]})
+ _write_to_paimon(table, data)
+ return table
+
+ def test_filter_pushdown_equal(self, filter_table):
+ df = _read_table(filter_table).where(col("id") == 3)
+ result = df.to_pydict()
+ assert result["id"] == [3]
+ assert result["value"] == ["c"]
+
+ def test_filter_pushdown_comparison(self, filter_table):
+ df = _read_table(filter_table).where(col("id") > 3)
+ result = df.to_pydict()
+ assert result["id"] == [4, 5]
+
+ df = _read_table(filter_table).where(col("id") <= 2)
+ result = df.to_pydict()
+ assert result["id"] == [1, 2]
+
+ def test_filter_pushdown_is_in(self, filter_table):
+ df = _read_table(filter_table).where(col("id").is_in([2, 4]))
+ result = df.to_pydict()
+ assert result["id"] == [2, 4]
+
+ def test_filter_pushdown_is_null(self, local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("value", pa.string())])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.filter_null", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.filter_null")
+
+ data = pa.table({"id": [1, 2, 3], "value": ["a", None, "c"]})
+ _write_to_paimon(table, data)
+
+ df = _read_table(table).where(col("value").is_null())
+ result = df.to_pydict()
+ assert result["id"] == [2]
+
+ def test_filter_pushdown_string(self, filter_table):
+ df = _read_table(filter_table).where(col("value").startswith("a"))
+ result = df.to_pydict()
+ assert result["value"] == ["a"]
+
+ df = _read_table(filter_table).where(col("value").contains("b"))
+ result = df.to_pydict()
+ assert result["value"] == ["b"]
+
+ def test_filter_pushdown_combined(self, filter_table):
+ df = _read_table(filter_table).where((col("id") >= 2) & (col("id") <=
4))
+ result = df.to_pydict()
+ assert result["id"] == [2, 3, 4]
+
+
+# ---------------------------------------------------------------------------
+# Advanced data types
+# ---------------------------------------------------------------------------
+
+
+class TestNestedTypes:
+ """Tests for nested data types (list, map, struct)."""
+
+ def test_read_nested_list(self, local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("list_col",
pa.list_(pa.int64()))])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.nested_list", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.nested_list")
+
+ data = pa.table({"id": [1, 2], "list_col": [[1, 2, 3], [4, 5]]},
schema=pa_schema)
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.to_pydict()
+ assert len(result["id"]) == 2
+ assert result["list_col"][0] == [1, 2, 3]
+
+ def test_read_nested_map(self, local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("map_col",
pa.map_(pa.string(), pa.int64()))])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.nested_map", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.nested_map")
+
+ data = pa.table(
+ {"id": [1, 2], "map_col": [[("k1", 1), ("k2", 2)], [("k3", 3)]]},
+ schema=pa_schema,
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.to_pydict()
+ assert len(result["id"]) == 2
+
+
+class TestDecimalType:
+ """Tests for decimal type support."""
+
+ def test_read_decimal(self, local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("amount",
pa.decimal128(10, 2))])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.decimal_table", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.decimal_table")
+
+ data = pa.table(
+ {
+ "id": [1, 2, 3],
+ "amount": [decimal.Decimal("100.50"),
decimal.Decimal("200.75"), decimal.Decimal("300.00")],
+ },
+ schema=pa_schema,
+ )
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.to_pydict()
+ assert len(result["id"]) == 3
+
+
+class TestNullValues:
+ """Tests for null value handling."""
+
+ def test_read_with_nulls(self, local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("name", pa.string()),
("value", pa.float64())])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.null_table", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.null_table")
+
+ data = pa.table({"id": [1, 2, 3], "name": ["alice", None, "charlie"],
"value": [1.1, 2.2, None]})
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.to_pydict()
+ assert len(result["id"]) == 3
+ assert result["name"][1] is None
+ assert result["value"][2] is None
+
+
+class TestCompositePrimaryKey:
+ """Tests for composite primary key tables."""
+
+ def test_read_composite_pk_table(self, local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("pk1", pa.int64()), ("pk2", pa.string()),
("value", pa.float64())])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(
+ pa_schema, primary_keys=["pk1", "pk2"], options={"bucket": "2"}
+ )
+ catalog.create_table("test_db.composite_pk", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.composite_pk")
+
+ data = pa.table({"pk1": [1, 1, 2], "pk2": ["a", "b", "a"], "value":
[1.1, 2.2, 3.3]})
+ _write_to_paimon(table, data)
+
+ df = _read_table(table)
+ result = df.to_pydict()
+ assert len(result["pk1"]) == 3
diff --git a/paimon-python/pypaimon/tests/daft/daft_sink_test.py
b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
new file mode 100644
index 0000000000..b5b9a171d3
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
@@ -0,0 +1,416 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for _write_table() — writes to a local Paimon filesystem catalog.
+
+All tests run without Docker or external services. Data written via Daft is
+verified by reading back with both _read_table() and pypaimon's native
+reader to ensure correctness.
+"""
+
+from __future__ import annotations
+
+import pyarrow as pa
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from pypaimon.daft.daft_compat import has_file_range_reads
+from pypaimon.daft.daft_catalog import PaimonTable
+from pypaimon.daft.daft_paimon import _read_table, _write_table
+
+requires_blob = pytest.mark.skipif(not has_file_range_reads(), reason="BLOB
support requires daft >= 0.7.11")
+
+
+# ---------------------------------------------------------------------------
+# Helpers & Fixtures
+# ---------------------------------------------------------------------------
+
+
+def _write_to_paimon(table, arrow_table, mode="append",
overwrite_partition=None):
+ write_builder = table.new_batch_write_builder()
+ if mode == "overwrite":
+ write_builder.overwrite(overwrite_partition or {})
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ try:
+ table_write.write_arrow(arrow_table)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ finally:
+ table_write.close()
+ table_commit.close()
+
+
[email protected](scope="function")
+def local_paimon_catalog(tmp_path):
+ catalog = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+ catalog.create_database("test_db", ignore_if_exists=True)
+ return catalog, tmp_path
+
+
[email protected]
+def append_only_table(local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("value", pa.float64()),
+ pa.field("dt", pa.string()),
+ ]),
+ partition_keys=["dt"],
+ options={"bucket": "1", "file.format": "parquet"},
+ )
+ catalog.create_table("test_db.append_table", schema,
ignore_if_exists=False)
+ return catalog.get_table("test_db.append_table"), tmp_path
+
+
[email protected]
+def append_only_table_no_partition(local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ ]),
+ options={"bucket": "1", "file.format": "parquet"},
+ )
+ catalog.create_table("test_db.append_no_part", schema,
ignore_if_exists=False)
+ return catalog.get_table("test_db.append_no_part"), tmp_path
+
+
[email protected]
+def pk_table(local_paimon_catalog):
+ catalog, tmp_path = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ pa.field("dt", pa.string()),
+ ]),
+ partition_keys=["dt"],
+ primary_keys=["id", "dt"],
+ options={"bucket": "1", "file.format": "parquet"},
+ )
+ catalog.create_table("test_db.pk_table", schema, ignore_if_exists=False)
+ return catalog.get_table("test_db.pk_table"), tmp_path
+
+
+# ---------------------------------------------------------------------------
+# Basic append
+# ---------------------------------------------------------------------------
+
+
+def test_write_paimon_append_basic(append_only_table):
+ """_write_table with mode='append' should persist data readable by
_read_table."""
+ table, _ = append_only_table
+ df = daft.from_pydict(
+ {
+ "id": [1, 2, 3],
+ "name": ["alice", "bob", "charlie"],
+ "value": [1.1, 2.2, 3.3],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
+ }
+ )
+ _write_table(df, table)
+
+ result = _read_table(table).sort("id").to_arrow()
+ assert result.num_rows == 3
+ assert result.column("id").to_pylist() == [1, 2, 3]
+ assert result.column("name").to_pylist() == ["alice", "bob", "charlie"]
+
+
+def test_write_paimon_append_returns_summary(append_only_table):
+ """_write_table should return a DataFrame with operation metadata
columns."""
+ table, _ = append_only_table
+ df = daft.from_pydict(
+ {
+ "id": [10, 20],
+ "name": ["x", "y"],
+ "value": [5.0, 6.0],
+ "dt": ["2024-02-01", "2024-02-01"],
+ }
+ )
+ result = _write_table(df, table)
+ result_dict = result.to_pydict()
+
+ assert "operation" in result_dict
+ assert "rows" in result_dict
+ assert "file_size" in result_dict
+ assert "file_name" in result_dict
+
+ assert all(op == "ADD" for op in result_dict["operation"])
+ assert sum(result_dict["rows"]) == 2
+ assert all(s > 0 for s in result_dict["file_size"])
+ assert all(len(fn) > 0 for fn in result_dict["file_name"])
+
+
+def test_write_paimon_append_multiple_times(append_only_table):
+ """Multiple append writes should accumulate rows."""
+ table, _ = append_only_table
+ df1 = daft.from_pydict({"id": [1], "name": ["a"], "value": [1.0], "dt":
["2024-01-01"]})
+ df2 = daft.from_pydict({"id": [2], "name": ["b"], "value": [2.0], "dt":
["2024-01-02"]})
+ _write_table(df1, table)
+ _write_table(df2, table)
+
+ result = _read_table(table).sort("id").to_arrow()
+ assert result.num_rows == 2
+ assert result.column("id").to_pylist() == [1, 2]
+
+
+def test_write_paimon_roundtrip_native_verify(append_only_table):
+ """Data written by Daft should also be readable via pypaimon's native
reader."""
+ table, _ = append_only_table
+ df = daft.from_pydict(
+ {
+ "id": [7, 8, 9],
+ "name": ["p", "q", "r"],
+ "value": [7.0, 8.0, 9.0],
+ "dt": ["2024-05-01", "2024-05-01", "2024-05-01"],
+ }
+ )
+ _write_table(df, table)
+
+ # Verify via pypaimon native reader
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+ arrow_table = table_read.to_arrow(splits)
+
+ assert arrow_table.num_rows == 3
+ ids = sorted(arrow_table.column("id").to_pylist())
+ assert ids == [7, 8, 9]
+
+
+# ---------------------------------------------------------------------------
+# Overwrite
+# ---------------------------------------------------------------------------
+
+
+def
test_write_paimon_overwrite_full_unpartitioned(append_only_table_no_partition):
+ """mode='overwrite' on an unpartitioned table should replace all existing
data."""
+ table, _ = append_only_table_no_partition
+ initial = daft.from_pydict({"id": [1, 2], "name": ["a", "b"]})
+ _write_table(initial, table)
+
+ replacement = daft.from_pydict({"id": [100], "name": ["z"]})
+ result = _write_table(replacement, table, mode="overwrite")
+ result_dict = result.to_pydict()
+ assert all(op == "OVERWRITE" for op in result_dict["operation"])
+
+ final = _read_table(table).to_arrow()
+ assert final.num_rows == 1
+ assert final.column("id").to_pylist() == [100]
+
+
+def test_write_paimon_overwrite_dynamic_partition(append_only_table):
+ """mode='overwrite' on a partitioned table should only replace touched
partitions."""
+ table, _ = append_only_table
+ initial = daft.from_pydict(
+ {
+ "id": [1, 2, 3, 4],
+ "name": ["a", "b", "c", "d"],
+ "value": [1.0, 2.0, 3.0, 4.0],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
+ }
+ )
+ _write_table(initial, table)
+
+ replacement = daft.from_pydict({"id": [10], "name": ["x"], "value":
[10.0], "dt": ["2024-01-01"]})
+ result = _write_table(replacement, table, mode="overwrite")
+ result_dict = result.to_pydict()
+ assert all(op == "OVERWRITE" for op in result_dict["operation"])
+
+ final = _read_table(table).sort("id").to_pydict()
+ assert final["id"] == [3, 4, 10]
+ assert final["dt"] == ["2024-01-02", "2024-01-02", "2024-01-01"]
+
+
+# ---------------------------------------------------------------------------
+# Error handling
+# ---------------------------------------------------------------------------
+
+
+def test_write_paimon_invalid_mode(append_only_table):
+ """An unsupported mode should raise a ValueError."""
+ table, _ = append_only_table
+ df = daft.from_pydict({"id": [1], "name": ["a"], "value": [1.0], "dt":
["2024-01-01"]})
+ with pytest.raises(ValueError, match="Only 'append' or 'overwrite' mode is
supported"):
+ _write_table(df, table, mode="upsert")
+
+
+def test_write_paimon_pk_table(pk_table):
+ """Writing to a PK table should work and be readable back."""
+ table, _ = pk_table
+ df = daft.from_pydict(
+ {
+ "id": [1, 2, 3],
+ "name": ["x", "y", "z"],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
+ }
+ )
+ _write_table(df, table)
+
+ result = _read_table(table).sort("id").to_arrow()
+ assert result.num_rows == 3
+ assert result.column("id").to_pylist() == [1, 2, 3]
+
+
+# ---------------------------------------------------------------------------
+# Schema conversion tests
+# ---------------------------------------------------------------------------
+
+
+class TestSchemaConversion:
+ """Tests for schema conversion utilities."""
+
+ def test_write_large_string_conversion(self, local_paimon_catalog):
+ """Test that large_string columns are converted to string for
pypaimon."""
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("text", pa.string())])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.large_str", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.large_str")
+
+ df = daft.from_pydict({"id": [1, 2, 3], "text": ["a", "b", "c"]})
+ _write_table(df, table, mode="append")
+
+ result = _read_table(table).to_pydict()
+ assert result["id"] == [1, 2, 3]
+ assert result["text"] == ["a", "b", "c"]
+
+ def test_write_large_binary_conversion(self, local_paimon_catalog):
+ """Test that large_binary columns are converted to binary for
pypaimon."""
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("data", pa.binary())])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.large_bin", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.large_bin")
+
+ df = daft.from_pydict({"id": [1, 2], "data": [b"abc", b"def"]})
+ _write_table(df, table, mode="append")
+
+ result = _read_table(table).to_pydict()
+ assert result["id"] == [1, 2]
+
+
+# ---------------------------------------------------------------------------
+# Complex type tests
+# ---------------------------------------------------------------------------
+
+
+class TestComplexTypes:
+ """Tests for writing complex data types."""
+
+ def test_write_nested_list(self, local_paimon_catalog):
+ """Test writing Paimon table with list type."""
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("list_col",
pa.list_(pa.int64()))])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+ catalog.create_table("test_db.write_list", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.write_list")
+
+ df = daft.from_pydict({"id": [1, 2], "list_col": [[1, 2, 3], [4, 5]]})
+ _write_table(df, table, mode="append")
+
+ result = _read_table(table).to_pydict()
+ assert result["id"] == [1, 2]
+
+
+@requires_blob
+class TestBlobType:
+ """Tests for BLOB type support (pypaimon 1.4+)."""
+
+ def test_write_read_blob_type(self, local_paimon_catalog):
+ """Test that BLOB columns are returned as FileReference objects."""
+ catalog, tmp_path = local_paimon_catalog
+ pa_schema = pa.schema([("id", pa.int64()), ("blob_data",
pa.large_binary())])
+ paimon_schema = pypaimon.Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ "bucket": "1",
+ "file.format": "parquet",
+ "row-tracking.enabled": "true",
+ "data-evolution.enabled": "true",
+ },
+ )
+ catalog.create_table("test_db.blob_table", paimon_schema,
ignore_if_exists=True)
+ table = catalog.get_table("test_db.blob_table")
+
+ df = daft.from_pydict({"id": [1, 2], "blob_data": [b"hello",
b"world"]})
+ _write_table(df, table, mode="append")
+
+ result_df = _read_table(table).sort("id")
+ assert str(result_df.schema()["blob_data"].dtype) == "File[Unknown]"
+
+ result = result_df.to_pydict()
+ assert result["id"] == [1, 2]
+
+ blob_refs = result["blob_data"]
+ assert len(blob_refs) == 2
+ for ref in blob_refs:
+ assert isinstance(ref, daft.File)
+ assert isinstance(ref.path, str)
+ assert ".blob" in ref.path
+ assert ref.offset is not None
+ assert ref.length is not None
+
+
+# ---------------------------------------------------------------------------
+# Truncate tests
+# ---------------------------------------------------------------------------
+
+
+class TestTruncate:
+ """Tests for table truncate operations (pypaimon 1.4+)."""
+
+ def test_truncate_table(self, append_only_table):
+ """truncate() should remove all data from the table."""
+ table, _ = append_only_table
+ df = daft.from_pydict(
+ {"id": [1, 2, 3], "name": ["a", "b", "c"], "value": [1.0, 2.0,
3.0], "dt": ["2024-01-01"] * 3}
+ )
+ _write_table(df, table)
+ assert _read_table(table).count_rows() == 3
+
+ paimon_table = PaimonTable(table)
+ paimon_table.truncate()
+ assert _read_table(table).count_rows() == 0
+
+ def test_truncate_partitions(self, append_only_table):
+ """truncate_partitions() should remove only the specified partition
data."""
+ table, _ = append_only_table
+ df = daft.from_pydict(
+ {
+ "id": [1, 2, 3, 4],
+ "name": ["a", "b", "c", "d"],
+ "value": [1.0, 2.0, 3.0, 4.0],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
+ }
+ )
+ _write_table(df, table)
+ assert _read_table(table).count_rows() == 4
+
+ paimon_table = PaimonTable(table)
+ paimon_table.truncate_partitions([{"dt": "2024-01-01"}])
+ result = _read_table(table).sort("id").to_pydict()
+ assert result["id"] == [3, 4]
+ assert result["dt"] == ["2024-01-02", "2024-01-02"]
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index ef8ef3a2df..f420a813c0 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -344,7 +344,9 @@ class DataWriter(ABC):
}
column_type = column_array.type
- supports_minmax = not (pa.types.is_nested(column_type) or
pa.types.is_map(column_type))
+ supports_minmax = not (
+ pa.types.is_nested(column_type) or pa.types.is_map(column_type) or
pa.types.is_large_binary(column_type)
+ )
if not supports_minmax:
return {
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index cd0f80e8aa..f9ce76a934 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -157,6 +157,9 @@ setup(
'torch': [
'torch',
],
+ 'daft': [
+ 'daft>=0.7.6; python_version>="3.10"',
+ ],
'oss': [
'ossfs>=2021.8; python_version<"3.8"',
'ossfs>=2023; python_version>="3.8"'