This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4133576330 [python] initialize daft connector for paimon (#7844)
4133576330 is described below

commit 4133576330ff7bbc71c4caf60414fe5fa7f79d34
Author: ChengHui Chen <[email protected]>
AuthorDate: Sat May 16 00:21:03 2026 +0800

    [python] initialize daft connector for paimon (#7844)
    
    Initialize `paimon-daft` module in the Paimon main repository.
    
    Based on discussion from https://github.com/Eventual-Inc/Daft/pull/6839,
    the community decided to maintain the Daft-Paimon connector in the
    Paimon repository going forward — for more efficient maintenance and
    stronger ownership by the Paimon community.
    
    This is the init PR that migrates the connector code from the Daft
    repository and adds basic project infrastructure.
---
 .github/workflows/paimon-python-checks.yml         |   2 +-
 docs/content/pypaimon/daft.md                      | 231 +++++++++
 paimon-python/dev/lint-python.sh                   |   7 +-
 paimon-python/pypaimon/daft/__init__.py            |  31 ++
 paimon-python/pypaimon/daft/daft_blob.py           |  86 ++++
 paimon-python/pypaimon/daft/daft_catalog.py        | 285 +++++++++++
 paimon-python/pypaimon/daft/daft_compat.py         |  60 +++
 paimon-python/pypaimon/daft/daft_datasink.py       | 125 +++++
 paimon-python/pypaimon/daft/daft_datasource.py     | 316 ++++++++++++
 paimon-python/pypaimon/daft/daft_io_config.py      |  89 ++++
 paimon-python/pypaimon/daft/daft_paimon.py         | 161 ++++++
 .../pypaimon/daft/daft_predicate_visitor.py        | 248 +++++++++
 .../pypaimon/tests/daft/daft_catalog_rest_test.py  | 233 +++++++++
 .../pypaimon/tests/daft/daft_catalog_test.py       | 333 ++++++++++++
 .../pypaimon/tests/daft/daft_data_test.py          | 557 +++++++++++++++++++++
 .../pypaimon/tests/daft/daft_sink_test.py          | 416 +++++++++++++++
 paimon-python/pypaimon/write/writer/data_writer.py |   4 +-
 paimon-python/setup.py                             |   3 +
 18 files changed, 3184 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 5302d1267d..580ea4cccc 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -132,7 +132,7 @@ jobs:
           else
             python -m pip install --upgrade pip
             pip install torch --index-url https://download.pytorch.org/whl/cpu
-            python -m pip install pyroaring readerwriterlock==1.0.9 
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0 
py4j==0.10.9.9 requests parameterized==0.9.0
+            python -m pip install pyroaring readerwriterlock==1.0.9 
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0 
py4j==0.10.9.9 requests parameterized==0.9.0 'daft>=0.7.6'
             python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION 
}}' -i https://pypi.org/simple/
             if python -c "import sys; sys.exit(0 if sys.version_info >= (3, 
11) else 1)"; then
               python -m pip install vortex-data
diff --git a/docs/content/pypaimon/daft.md b/docs/content/pypaimon/daft.md
new file mode 100644
index 0000000000..b2f4043bbe
--- /dev/null
+++ b/docs/content/pypaimon/daft.md
@@ -0,0 +1,231 @@
+---
+title: "Daft"
+weight: 4
+type: docs
+aliases:
+  - /pypaimon/daft.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Daft
+
+[Daft](https://www.daft.io/) is a distributed DataFrame engine for Python.
+
+This requires `daft` to be installed:
+
+```bash
+pip install pypaimon[daft]
+```
+
+`pypaimon.daft` exposes a top-level `read_paimon` / `write_paimon` API that
+takes a table identifier and catalog options directly.
+
+## Read
+
+### `read_paimon` (recommended)
+
+```python
+from pypaimon.daft import read_paimon
+
+df = read_paimon(
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+)
+
+df.show()
+```
+
+`read_paimon` opens its own catalog and resolves the table in a single call.
+
+The returned DataFrame is lazy. Use standard Daft operations for filtering,
+projection, and limit — they are automatically pushed down into the Paimon scan
+via Daft's DataSource protocol:
+
+```python
+import daft
+
+df = read_paimon(
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+)
+
+# Filter pushdown (partition pruning + file-level skipping)
+df = df.where(daft.col("dt") == "2024-01-01")
+
+# Projection pushdown (only requested columns are read from disk)
+df = df.select("id", "name")
+
+# Limit pushdown
+df = df.limit(100)
+
+df.show()
+```
+
+**Time travel:**
+
+```python
+# Read a specific snapshot.
+df = read_paimon(
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+    snapshot_id=42,
+)
+
+# Read a tagged snapshot.
+df = read_paimon(
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+    tag_name="release-2026-04",
+)
+```
+
+`snapshot_id` and `tag_name` are mutually exclusive.
+
+**Parameters:**
+- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
+- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
+  e.g. `{"warehouse": "/path/to/warehouse"}`.
+- `snapshot_id`: optional snapshot id to time-travel to. Mutually
+  exclusive with `tag_name`.
+- `tag_name`: optional tag name to time-travel to. Mutually
+  exclusive with `snapshot_id`.
+- `io_config`: optional Daft `IOConfig` for accessing object storage.
+  If `None`, will be inferred from the catalog options.
+
+For tables on object stores, credentials are inferred from the catalog options
+automatically, or you can pass an explicit `IOConfig`:
+
+```python
+from daft.io import IOConfig, S3Config
+
+df = read_paimon(
+    "my_db.my_table",
+    catalog_options={
+        "warehouse": "s3://my-bucket/warehouse",
+        "fs.s3.accessKeyId": "...",
+        "fs.s3.accessKeySecret": "...",
+    },
+)
+df.show()
+```
+
+**Features:**
+- Append-only tables with Parquet format use Daft's native high-performance 
Parquet reader.
+- Primary-key tables that require LSM merge fall back to pypaimon's built-in 
reader.
+- Partition pruning, predicate pushdown, projection pushdown, and limit 
pushdown are all supported.
+
+## Write
+
+### `write_paimon` (recommended)
+
+```python
+import daft
+from pypaimon.daft import write_paimon
+
+df = daft.from_pydict({
+    "id": [1, 2, 3],
+    "name": ["alice", "bob", "charlie"],
+    "dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
+})
+
+write_paimon(
+    df,
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+)
+```
+
+`write_paimon` opens its own catalog, resolves the table, and commits the
+write through Daft's DataSink API.
+
+**Overwrite mode:**
+
+```python
+write_paimon(
+    df,
+    "database_name.table_name",
+    catalog_options={"warehouse": "/path/to/warehouse"},
+    mode="overwrite",
+)
+```
+
+**Parameters:**
+- `df`: the Daft DataFrame to write.
+- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
+- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
+- `mode`: write mode — `"append"` (default) or `"overwrite"`.
+
+## Catalog Abstraction
+
+Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` 
interfaces:
+
+```python
+import pypaimon
+from pypaimon.daft import PaimonCatalog
+
+inner = pypaimon.CatalogFactory.create({"warehouse": "/path/to/warehouse"})
+catalog = PaimonCatalog(inner, name="my_paimon")
+
+# Browse
+catalog.list_namespaces()
+catalog.list_tables()
+
+# Read / write through catalog
+table = catalog.get_table("my_db.my_table")
+df = table.read()
+table.append(df)
+table.overwrite(df)
+```
+
+You can also wrap a single table directly:
+
+```python
+from pypaimon.daft import PaimonTable
+
+inner_table = inner.get_table("my_db.my_table")
+table = PaimonTable(inner_table)
+df = table.read()
+```
+
+### Creating Tables
+
+```python
+import daft
+from daft.io.partitioning import PartitionField
+
+schema = daft.from_pydict({"id": [1], "name": ["a"], "dt": 
["2024-01-01"]}).schema()
+dt_field = schema["dt"]
+partition_fields = [PartitionField.create(dt_field)]
+
+table = catalog.create_table(
+    "my_db.new_table",
+    schema,
+    partition_fields=partition_fields,
+)
+
+# Primary-key table
+table = catalog.create_table(
+    "my_db.pk_table",
+    schema,
+    properties={"primary_keys": ["id", "dt"]},
+    partition_fields=partition_fields,
+)
+```
diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index a8e21c2d34..ffee46cbef 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -152,7 +152,12 @@ function license_check() {
 
 # Flake8 check
 function flake8_check() {
-    local PYTHON_SOURCE="$(find . \( -path ./dev -o -path ./.tox -o -path 
./.venv \) -prune -o -type f -name "*.py" -print )"
+    local PRUNE_PATHS="\( -path ./dev -o -path ./.tox -o -path ./.venv"
+    if python -c "import sys; sys.exit(0 if sys.version_info < (3, 10) else 
1)" 2>/dev/null; then
+        PRUNE_PATHS="$PRUNE_PATHS -o -path ./pypaimon/daft -o -path 
./pypaimon/tests/daft"
+    fi
+    PRUNE_PATHS="$PRUNE_PATHS \)"
+    local PYTHON_SOURCE="$(eval "find . $PRUNE_PATHS -prune -o -type f -name 
'*.py' -print")"
 
     print_function "STAGE" "flake8 checks"
     if [ ! -f "$FLAKE8_PATH" ]; then
diff --git a/paimon-python/pypaimon/daft/__init__.py 
b/paimon-python/pypaimon/daft/__init__.py
new file mode 100644
index 0000000000..e9651dd477
--- /dev/null
+++ b/paimon-python/pypaimon/daft/__init__.py
@@ -0,0 +1,31 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.daft.daft_paimon import read_paimon, write_paimon
+
+__all__ = ["read_paimon", "write_paimon", "PaimonCatalog", "PaimonTable"]
+
+
+def __getattr__(name):
+    if name in ("PaimonCatalog", "PaimonTable"):
+        from pypaimon.daft.daft_catalog import PaimonCatalog, PaimonTable
+
+        globals()["PaimonCatalog"] = PaimonCatalog
+        globals()["PaimonTable"] = PaimonTable
+        return globals()[name]
+    raise AttributeError(f"module 'pypaimon.daft' has no attribute {name!r}")
diff --git a/paimon-python/pypaimon/daft/daft_blob.py 
b/paimon-python/pypaimon/daft/daft_blob.py
new file mode 100644
index 0000000000..156cda52bf
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_blob.py
@@ -0,0 +1,86 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Utilities for deserializing Paimon BlobDescriptor bytes into FileReference 
arrays."""
+
+from __future__ import annotations
+
+import struct
+
+import pyarrow as pa
+
+FILE_PHYSICAL_TYPE = pa.struct(
+    [
+        pa.field("url", pa.large_utf8()),
+        pa.field("io_config", pa.large_binary()),
+        pa.field("offset", pa.int64()),
+        pa.field("length", pa.int64()),
+    ]
+)
+
+
+def _deserialize_one(data: bytes) -> tuple[str, int, int]:
+    """Deserialize a single BlobDescriptor -> (url, offset, length)."""
+    pos = 0
+    version = data[pos]
+    pos += 1
+
+    if version > 1:
+        pos += 8  # skip magic
+
+    uri_len = struct.unpack_from("<I", data, pos)[0]
+    pos += 4
+
+    uri = data[pos:pos + uri_len].decode("utf-8")
+    pos += uri_len
+
+    offset = struct.unpack_from("<q", data, pos)[0]
+    pos += 8
+
+    length = struct.unpack_from("<q", data, pos)[0]
+    return uri, offset, length
+
+
+def blob_column_to_file_array(column: pa.Array) -> pa.Array:
+    """Convert a large_binary column of serialized BlobDescriptors to a 
FileReference-compatible struct."""
+    urls: list[str | None] = []
+    offsets: list[int | None] = []
+    lengths: list[int | None] = []
+
+    for value in column:
+        if value is None or not value.is_valid:
+            urls.append(None)
+            offsets.append(None)
+            lengths.append(None)
+        else:
+            raw = value.as_py()
+            uri, off, length = _deserialize_one(raw)
+            urls.append(uri)
+            offsets.append(off)
+            lengths.append(length)
+
+    n = len(urls)
+    return pa.StructArray.from_arrays(
+        [
+            pa.array(urls, type=pa.large_utf8()),
+            pa.nulls(n, type=pa.large_binary()),
+            pa.array(offsets, type=pa.int64()),
+            pa.array(lengths, type=pa.int64()),
+        ],
+        names=["url", "io_config", "offset", "length"],
+    )
diff --git a/paimon-python/pypaimon/daft/daft_catalog.py 
b/paimon-python/pypaimon/daft/daft_catalog.py
new file mode 100644
index 0000000000..aaf3f6879d
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_catalog.py
@@ -0,0 +1,285 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Paimon catalog and table wrappers for Daft's Catalog/Table interfaces."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pyarrow as pa
+from pypaimon.catalog.catalog import Catalog as InnerCatalog
+from pypaimon.catalog.catalog_exception import (
+    DatabaseNotExistException,
+    TableNotExistException,
+)
+from pypaimon.table.table import Table as InnerTable
+
+from daft.catalog import Catalog, Function, Identifier, NotFoundError, 
Properties, Schema, Table
+
+if TYPE_CHECKING:
+    from collections.abc import Callable
+
+    from daft.dataframe import DataFrame
+    from daft.io.partitioning import PartitionField
+
+
+class PaimonCatalog(Catalog):
+    _inner: InnerCatalog
+    _name: str
+    _catalog_options: dict[str, str]
+
+    def __init__(
+        self,
+        inner: InnerCatalog,
+        name: str = "paimon",
+        catalog_options: dict[str, str] | None = None,
+    ) -> None:
+        self._inner = inner
+        self._name = name
+        if catalog_options is not None:
+            self._catalog_options = catalog_options
+        else:
+            opts = getattr(inner, "catalog_options", None)
+            if opts is None:
+                ctx = getattr(inner, "context", None)
+                opts = getattr(ctx, "options", None) if ctx is not None else 
None
+            self._catalog_options = opts.to_map() if opts is not None and 
hasattr(opts, "to_map") else {}
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    ###
+    # create_*
+    ###
+
+    def _create_function(self, ident: Identifier, function: Function | 
Callable[..., Any]) -> None:
+        raise NotImplementedError("Paimon does not support function 
registration.")
+
+    def _create_namespace(self, ident: Identifier) -> None:
+        db_name = _to_paimon_ident(ident)
+        self._inner.create_database(db_name, ignore_if_exists=False)
+
+    def _create_table(
+        self,
+        ident: Identifier,
+        schema: Schema,
+        properties: Properties | None = None,
+        partition_fields: list[PartitionField] | None = None,
+    ) -> Table:
+        import pypaimon
+
+        pa_schema = _cast_large_types(schema.to_pyarrow_schema())
+        partition_keys = [pf.field.name for pf in (partition_fields or [])]
+        primary_keys = list((properties or {}).get("primary_keys", []))
+        options = {k: str(v) for k, v in (properties or {}).items() if k != 
"primary_keys"} if properties else {}
+
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=partition_keys,
+            primary_keys=primary_keys,
+            options=options,
+        )
+
+        paimon_ident = _to_paimon_ident(ident)
+        self._inner.create_table(paimon_ident, paimon_schema, 
ignore_if_exists=False)
+
+        inner_table = self._inner.get_table(paimon_ident)
+        return PaimonTable(inner_table, catalog_options=self._catalog_options)
+
+    ###
+    # drop_*
+    ###
+
+    def _drop_namespace(self, ident: Identifier) -> None:
+        db_name = _to_paimon_ident(ident)
+        try:
+            self._inner.drop_database(db_name, ignore_if_not_exists=False)
+        except DatabaseNotExistException as ex:
+            raise NotFoundError(f"Namespace '{db_name}' not found.") from ex
+
+    def _drop_table(self, ident: Identifier) -> None:
+        paimon_ident = _to_paimon_ident(ident)
+        try:
+            self._inner.drop_table(paimon_ident, ignore_if_not_exists=False)
+        except TableNotExistException as ex:
+            raise NotFoundError(f"Table '{paimon_ident}' not found.") from ex
+
+    ###
+    # has_*
+    ###
+
+    def _has_namespace(self, ident: Identifier) -> bool:
+        db_name = _to_paimon_ident(ident)
+        try:
+            self._inner.get_database(db_name)
+            return True
+        except DatabaseNotExistException:
+            return False
+
+    def _has_table(self, ident: Identifier) -> bool:
+        paimon_ident = _to_paimon_ident(ident)
+        try:
+            self._inner.get_table(paimon_ident)
+            return True
+        except (TableNotExistException, DatabaseNotExistException):
+            return False
+
+    ###
+    # get_*
+    ###
+
+    def _get_function(self, ident: Identifier) -> Function:
+        raise NotFoundError(f"Function '{ident}' not found in catalog 
'{self.name}'")
+
+    def _get_table(self, ident: Identifier) -> PaimonTable:
+        paimon_ident = _to_paimon_ident(ident)
+        try:
+            inner = self._inner.get_table(paimon_ident)
+            return PaimonTable(inner, catalog_options=self._catalog_options)
+        except TableNotExistException as ex:
+            raise NotFoundError() from ex
+
+    ###
+    # list_*
+    ###
+
+    def _list_namespaces(self, pattern: str | None = None) -> list[Identifier]:
+        databases: list[str] = self._inner.list_databases()
+        return [Identifier(db) for db in databases if pattern is None or 
db.startswith(pattern)]
+
+    def _list_tables(self, pattern: str | None = None) -> list[Identifier]:
+        result = []
+        for db in self._inner.list_databases():
+            for table_name in self._inner.list_tables(db):
+                ident = Identifier(db, table_name)
+                if pattern is None or str(ident).startswith(pattern):
+                    result.append(ident)
+        return result
+
+
+class PaimonTable(Table):
+    _inner: InnerTable
+    _catalog_options: dict[str, str]
+
+    def __init__(self, inner: InnerTable, catalog_options: dict[str, str] | 
None = None) -> None:
+        self._inner = inner
+        self._catalog_options = catalog_options or {}
+
+    @property
+    def name(self) -> str:
+        identifier = self._inner.identifier
+        return identifier.object
+
+    @property
+    def primary_keys(self) -> list[str]:
+        return list(self._inner.primary_keys)
+
+    @property
+    def partition_keys(self) -> list[str]:
+        return list(self._inner.partition_keys)
+
+    @property
+    def is_primary_key_table(self) -> bool:
+        return self._inner.is_primary_key_table
+
+    @property
+    def bucket_count(self) -> int:
+        return self._inner.total_buckets
+
+    @property
+    def table_options(self) -> dict[str, str]:
+        return dict(self._inner.options.options.to_map())
+
+    def schema(self) -> Schema:
+        return self.read().schema()
+
+    def read(self, **options: Any) -> DataFrame:
+        from pypaimon.daft.daft_paimon import _read_table
+
+        Table._validate_options("Paimon read", options, set())
+        return _read_table(self._inner, catalog_options=self._catalog_options)
+
+    def append(self, df: DataFrame, **options: Any) -> None:
+        from pypaimon.daft.daft_paimon import _write_table
+
+        Table._validate_options("Paimon write", options, set())
+        _write_table(df, self._inner, mode="append")
+
+    def overwrite(self, df: DataFrame, **options: Any) -> None:
+        from pypaimon.daft.daft_paimon import _write_table
+
+        Table._validate_options("Paimon write", options, set())
+        _write_table(df, self._inner, mode="overwrite")
+
+    def truncate(self) -> None:
+        """Remove all data from this table."""
+        write_builder = self._inner.new_batch_write_builder()
+        table_commit = write_builder.new_commit()
+        try:
+            table_commit.truncate_table()
+        finally:
+            table_commit.close()
+
+    def truncate_partitions(self, partitions: list[dict[str, str]]) -> None:
+        """Remove data from specific partitions."""
+        write_builder = self._inner.new_batch_write_builder()
+        table_commit = write_builder.new_commit()
+        try:
+            table_commit.truncate_partitions(partitions)
+        finally:
+            table_commit.close()
+
+
+def _to_paimon_ident(ident: Identifier) -> str:
+    """Convert a Daft identifier to a pypaimon identifier string.
+
+    - 1 part  (table,)              -> 'table'
+    - 2 parts (db, table)           -> 'db.table'
+    - 3 parts (catalog, db, table)  -> 'db.table'  (catalog prefix stripped)
+    """
+    if isinstance(ident, Identifier):
+        parts = tuple(ident)
+        if len(parts) == 3:
+            return f"{parts[1]}.{parts[2]}"
+        if len(parts) == 2:
+            return f"{parts[0]}.{parts[1]}"
+        return str(parts[0])
+    return ident
+
+
+def _cast_large_types(arrow_schema: pa.Schema) -> pa.Schema:
+    """Convert PyArrow schema to be compatible with pypaimon.
+
+    pypaimon doesn't support large_string, so we convert it to regular string.
+    large_binary is kept as-is because pypaimon 1.4+ maps it to the BLOB type.
+    """
+    new_fields = []
+    need_conversion = False
+
+    for field in arrow_schema:
+        field_type = field.type
+        if pa.types.is_large_string(field_type):
+            field_type = pa.string()
+            need_conversion = True
+        new_fields.append(pa.field(field.name, field_type, 
nullable=field.nullable, metadata=field.metadata))
+
+    if need_conversion:
+        return pa.schema(new_fields, metadata=arrow_schema.metadata)
+    return arrow_schema
diff --git a/paimon-python/pypaimon/daft/daft_compat.py 
b/paimon-python/pypaimon/daft/daft_compat.py
new file mode 100644
index 0000000000..cfc3e35c36
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_compat.py
@@ -0,0 +1,60 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Daft version compatibility checks."""
+
+from __future__ import annotations
+
+from importlib.metadata import version
+
+_daft_version: tuple[int, ...] | None = None
+
+
+def _parse_daft_version() -> tuple[int, ...]:
+    global _daft_version
+    if _daft_version is not None:
+        return _daft_version
+    raw = version("daft")
+    parts = []
+    for seg in raw.split(".")[:3]:
+        digits = ""
+        for c in seg:
+            if c.isdigit():
+                digits += c
+            else:
+                break
+        if digits:
+            parts.append(int(digits))
+    _daft_version = tuple(parts)
+    return _daft_version
+
+
+def has_file_range_reads() -> bool:
+    """True if the installed Daft supports File offset/length (>= 0.7.11)."""
+    return _parse_daft_version() >= (0, 7, 11)
+
+
+def require_file_range_reads(feature: str = "BLOB columns") -> None:
+    """Raise if Daft is too old for File offset/length support (requires >= 
0.7.11)."""
+    if not has_file_range_reads():
+        v = ".".join(str(x) for x in _parse_daft_version())
+        raise NotImplementedError(
+            f"{feature} require daft >= 0.7.11 (File offset/length support), "
+            f"but found {v}. "
+            f"Please upgrade: pip install 'daft>=0.7.11'"
+        )
diff --git a/paimon-python/pypaimon/daft/daft_datasink.py 
b/paimon-python/pypaimon/daft/daft_datasink.py
new file mode 100644
index 0000000000..1f37cc4165
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_datasink.py
@@ -0,0 +1,125 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pyarrow as pa
+from daft.datatype import DataType
+from daft.io.sink import DataSink, WriteResult
+from daft.recordbatch.micropartition import MicroPartition
+from daft.schema import Schema
+
+if TYPE_CHECKING:
+    from collections.abc import Iterator
+
+    from pypaimon.table.file_store_table import FileStoreTable
+
+
+class PaimonDataSink(DataSink[list[Any]]):
+    """DataSink for writing data to an Apache Paimon table.
+
+    Delegates all file I/O and commit logic to pypaimon's BatchTableWrite /
+    BatchTableCommit APIs. Each `write()` call (one per parallel worker) opens
+    an independent BatchTableWrite, accumulates micro-partitions, then calls
+    `prepare_commit()` to obtain CommitMessage objects. `finalize()` gathers
+    all CommitMessages from all workers and performs a single atomic commit.
+    """
+
+    def __init__(self, table: FileStoreTable, mode: str = "append") -> None:
+        if mode not in ("append", "overwrite"):
+            raise ValueError(f"Only 'append' or 'overwrite' mode is supported, 
got: {mode!r}")
+        self._table = table
+        self._mode = mode
+
+        from pypaimon.schema.data_types import PyarrowFieldParser
+
+        self._target_schema: pa.Schema = 
PyarrowFieldParser.from_paimon_schema(table.fields)
+        self._write_builder = table.new_batch_write_builder()
+        if mode == "overwrite":
+            self._write_builder.overwrite({})
+
+    def name(self) -> str:
+        return "Paimon Write"
+
+    def schema(self) -> Schema:
+        return Schema._from_field_name_and_types(
+            [
+                ("operation", DataType.string()),
+                ("rows", DataType.int64()),
+                ("file_size", DataType.int64()),
+                ("file_name", DataType.string()),
+            ]
+        )
+
+    def write(self, micropartitions: Iterator[MicroPartition]) -> 
Iterator[WriteResult[list[Any]]]:
+        table_write = self._write_builder.new_write()
+
+        cast_fields: list[tuple[int, pa.DataType]] | None = None
+
+        total_rows = 0
+        total_bytes = 0
+        try:
+            for mp in micropartitions:
+                for rb in mp.get_record_batches():
+                    batch = rb.to_arrow_record_batch()
+                    if cast_fields is None:
+                        cast_fields = [
+                            (i, field.type)
+                            for i, field in enumerate(self._target_schema)
+                            if batch.column(i).type != field.type
+                        ]
+                    if cast_fields:
+                        arrays = list(batch.columns)
+                        for i, target_type in cast_fields:
+                            arrays[i] = arrays[i].cast(target_type)
+                        batch = pa.RecordBatch.from_arrays(arrays, 
schema=self._target_schema)
+                    table_write.write_arrow_batch(batch)
+                    total_rows += batch.num_rows
+                    total_bytes += batch.nbytes
+            commit_messages = table_write.prepare_commit()
+        finally:
+            table_write.close()
+
+        yield WriteResult(
+            result=list(commit_messages),
+            bytes_written=total_bytes,
+            rows_written=total_rows,
+        )
+
+    def finalize(self, write_results: list[WriteResult[list[Any]]]) -> 
MicroPartition:
+        all_commit_messages = [msg for wr in write_results for msg in 
wr.result]
+
+        table_commit = self._write_builder.new_commit()
+        try:
+            table_commit.commit(all_commit_messages)
+        finally:
+            table_commit.close()
+
+        operation_label = "OVERWRITE" if self._mode == "overwrite" else "ADD"
+        all_files = [f for msg in all_commit_messages for f in msg.new_files]
+
+        return MicroPartition.from_pydict(
+            {
+                "operation": pa.array([operation_label] * len(all_files), 
type=pa.string()),
+                "rows": pa.array([f.row_count for f in all_files], 
type=pa.int64()),
+                "file_size": pa.array([f.file_size for f in all_files], 
type=pa.int64()),
+                "file_name": pa.array([f.file_name for f in all_files], 
type=pa.string()),
+            }
+        )
diff --git a/paimon-python/pypaimon/daft/daft_datasource.py 
b/paimon-python/pypaimon/daft/daft_datasource.py
new file mode 100644
index 0000000000..f6fb6f8f4c
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -0,0 +1,316 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlparse
+
+import daft
+from daft.dependencies import pa
+from daft.expressions import ExpressionsProjection
+from daft.io.partitioning import PartitionField
+from daft.io.source import DataSource, DataSourceTask
+from daft.logical.schema import Schema
+from daft.recordbatch import RecordBatch
+
+from pypaimon.daft.daft_compat import require_file_range_reads
+from pypaimon.daft.daft_predicate_visitor import convert_filters_to_paimon
+
+if TYPE_CHECKING:
+    from collections.abc import AsyncIterator
+
+    from pypaimon.common.predicate import Predicate
+    from pypaimon.read.split import Split
+    from pypaimon.table.file_store_table import FileStoreTable
+
+    from daft.daft import PyExpr, StorageConfig
+    from daft.io.pushdowns import Pushdowns
+
+logger = logging.getLogger(__name__)
+
+PAIMON_FILE_FORMAT_PARQUET = "parquet"
+PAIMON_FILE_FORMAT_ORC = "orc"
+PAIMON_FILE_FORMAT_AVRO = "avro"
+
+
+class _PaimonPKSplitTask(DataSourceTask):
+    """DataSourceTask for PK-table splits that require LSM-tree merge.
+
+    Used when split.raw_convertible is False (overlapping levels exist) or
+    when the file format is not Parquet (ORC, Avro). Delegates to pypaimon's
+    native reader which handles LSM merging internally.
+    """
+
+    def __init__(
+        self,
+        table: FileStoreTable,
+        split: Split,
+        schema: Schema,
+        blob_column_names: set[str] | None = None,
+    ) -> None:
+        self._table = table
+        self._split = split
+        self._schema = schema
+        self._blob_column_names = blob_column_names or set()
+
+    @property
+    def schema(self) -> Schema:
+        return self._schema
+
+    async def read(self) -> AsyncIterator[RecordBatch]:
+        table_read = self._table.new_read_builder().new_read()
+        reader = table_read.to_arrow_batch_reader([self._split])
+        for batch in iter(reader.read_next_batch, None):
+            if self._blob_column_names:
+                batch = _convert_blob_columns(batch, self._blob_column_names)
+            rb = RecordBatch.from_arrow_record_batches([batch], batch.schema)
+            if self._blob_column_names:
+                rb = _cast_blob_columns_to_file(rb, self._blob_column_names)
+            yield rb
+
+
+def _convert_blob_columns(batch: pa.RecordBatch, blob_column_names: set[str]) 
-> pa.RecordBatch:
+    """Replace serialized BlobDescriptor columns with the File physical struct 
layout."""
+    from pypaimon.daft.daft_blob import FILE_PHYSICAL_TYPE, 
blob_column_to_file_array
+
+    arrays = []
+    fields = []
+    for i, field in enumerate(batch.schema):
+        col = batch.column(i)
+        if field.name in blob_column_names and 
(pa.types.is_large_binary(field.type) or pa.types.is_binary(field.type)):
+            arrays.append(blob_column_to_file_array(col))
+            fields.append(pa.field(field.name, FILE_PHYSICAL_TYPE, 
nullable=field.nullable))
+        else:
+            arrays.append(col)
+            fields.append(field)
+    return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields))
+
+
+def _cast_blob_columns_to_file(rb: RecordBatch, blob_column_names: set[str]) 
-> RecordBatch:
+    """Cast struct-typed blob columns in a RecordBatch to DataType.file()."""
+    from daft.datatype import DataType
+
+    file_dtype = DataType.file()
+    columns = {}
+    for i, field in enumerate(rb.schema()):
+        col = rb.get_column(i)
+        if field.name in blob_column_names:
+            col = col.cast(file_dtype)
+        columns[field.name] = col
+    return RecordBatch.from_pydict(columns)
+
+
+class PaimonDataSource(DataSource):
+    """DataSource for Apache Paimon tables.
+
+    Uses pypaimon for catalog metadata and scan planning (file listing,
+    partition pruning, statistics-based file skipping), then yields
+    DataSourceTask objects executed by Daft's native Parquet reader.
+
+    For primary-key tables whose splits cannot be read directly without an
+    LSM-tree merge, a _PaimonPKSplitTask is yielded which delegates back
+    to pypaimon's native reader.
+    """
+
+    def __init__(
+        self,
+        table: FileStoreTable,
+        storage_config: StorageConfig,
+        catalog_options: dict[str, str],
+    ) -> None:
+        self._table = table
+        self._storage_config = storage_config
+        self._catalog_options = catalog_options
+
+        from pypaimon.schema.data_types import PyarrowFieldParser
+
+        pa_schema = PyarrowFieldParser.from_paimon_schema(table.fields)
+
+        self._blob_column_names: set[str] = {field.name for field in pa_schema 
if pa.types.is_large_binary(field.type)}
+        self._has_blob_columns = bool(self._blob_column_names)
+
+        if self._has_blob_columns:
+            require_file_range_reads()
+            from daft.datatype import DataType
+
+            base_schema = Schema.from_pyarrow_schema(pa_schema)
+            fields = []
+            for f in base_schema:
+                if f.name in self._blob_column_names:
+                    fields.append((f.name, DataType.file()))
+                else:
+                    fields.append((f.name, f.dtype))
+            self._schema = Schema.from_field_name_and_types(fields)
+        else:
+            self._schema = Schema.from_pyarrow_schema(pa_schema)
+
+        warehouse = catalog_options.get("warehouse", "")
+        self._warehouse_scheme = urlparse(warehouse).scheme
+
+        self._file_format = table.options.file_format().lower()
+        self._is_parquet = self._file_format == PAIMON_FILE_FORMAT_PARQUET
+
+        self._partition_field_arrow_types: dict[str, pa.DataType] = (
+            {f.name: PyarrowFieldParser.from_paimon_type(f.type) for f in 
table.partition_keys_fields}
+            if table.partition_keys
+            else {}
+        )
+
+        self._paimon_predicate: Predicate | None = None
+
+    @property
+    def name(self) -> str:
+        table_path = getattr(self._table, "table_path", None)
+        return f"PaimonDataSource({table_path})"
+
+    @property
+    def schema(self) -> Schema:
+        return self._schema
+
+    def get_partition_fields(self) -> list[PartitionField]:
+        partition_key_names = set(self._table.partition_keys)
+        return [PartitionField.create(f) for f in self._schema if f.name in 
partition_key_names]
+
+    def push_filters(self, filters: list[PyExpr]) -> tuple[list[PyExpr], 
list[PyExpr]]:
+        """Push filters down to Paimon scan.
+
+        Converts Daft expressions to Paimon predicates where possible.
+        Returns (pushed_filters, remaining_filters).
+        """
+        pushed_filters, remaining_filters, paimon_predicate = 
convert_filters_to_paimon(self._table, filters)
+
+        self._paimon_predicate = paimon_predicate
+
+        if pushed_filters:
+            logger.debug(
+                "Paimon filter pushdown: %d filters pushed, %d remaining",
+                len(pushed_filters),
+                len(remaining_filters),
+            )
+
+        return pushed_filters, remaining_filters
+
+    async def get_tasks(self, pushdowns: Pushdowns) -> 
AsyncIterator[DataSourceTask]:
+        read_table = self._table
+        if self._has_blob_columns:
+            read_table = self._table.copy({"blob-as-descriptor": "true"})
+
+        read_builder = read_table.new_read_builder()
+
+        if pushdowns.columns is not None:
+            read_builder = 
read_builder.with_projection(list(pushdowns.columns))
+
+        if pushdowns.limit is not None:
+            read_builder = read_builder.with_limit(pushdowns.limit)
+
+        if self._paimon_predicate is not None:
+            read_builder = read_builder.with_filter(self._paimon_predicate)
+            logger.debug(
+                "Applied Paimon filter pushdown predicate: %s",
+                self._paimon_predicate,
+            )
+
+        if self._table.partition_keys and pushdowns.partition_filters is None:
+            logger.warning(
+                "%s has partition keys %s but no partition filter was 
specified. "
+                "This will result in a full table scan.",
+                self.name,
+                list(self._table.partition_keys),
+            )
+
+        plan = read_builder.new_scan().plan()
+
+        pv_cache: dict[tuple[Any, ...], RecordBatch | None] = {}
+
+        for split in plan.splits():
+            if self._table.partition_keys and pushdowns.partition_filters is 
not None:
+                pv_key = tuple(sorted(split.partition.to_dict().items()))
+                if pv_key not in pv_cache:
+                    pv_cache[pv_key] = self._build_partition_values(split)
+                pv = pv_cache[pv_key]
+                if pv is not None and 
len(pv.filter(ExpressionsProjection([pushdowns.partition_filters]))) == 0:
+                    continue
+
+            _deletion_files = getattr(split, "data_deletion_files", None)
+            has_deletion_vectors = _deletion_files is not None and any(df is 
not None for df in _deletion_files)
+
+            can_use_native_reader = (
+                self._is_parquet
+                and not self._has_blob_columns
+                and (not self._table.is_primary_key_table or 
split.raw_convertible)
+                and not has_deletion_vectors
+            )
+
+            if can_use_native_reader:
+                pv = None
+                if self._table.partition_keys:
+                    pv_key = tuple(sorted(split.partition.to_dict().items()))
+                    if pv_key not in pv_cache:
+                        pv_cache[pv_key] = self._build_partition_values(split)
+                    pv = pv_cache[pv_key]
+
+                for data_file in split.files:
+                    file_uri = self._build_file_uri(data_file.file_path)
+                    yield DataSourceTask.parquet(
+                        path=file_uri,
+                        schema=self._schema,
+                        pushdowns=pushdowns,
+                        num_rows=data_file.row_count,
+                        size_bytes=data_file.file_size,
+                        partition_values=pv,
+                        storage_config=self._storage_config,
+                    )
+            else:
+                if not self._is_parquet:
+                    reason = "non-parquet format"
+                elif self._has_blob_columns:
+                    reason = "blob columns present"
+                elif has_deletion_vectors:
+                    reason = "deletion vectors present"
+                else:
+                    reason = "LSM merge required"
+                logger.debug(
+                    "Split with %d files using pypaimon fallback (%s).",
+                    len(split.files),
+                    reason,
+                )
+                yield _PaimonPKSplitTask(read_table, split, self._schema, 
self._blob_column_names)
+
+    def _build_file_uri(self, file_path: str) -> str:
+        """Reconstruct a full URI from a (potentially scheme-stripped) 
file_path."""
+        if self._warehouse_scheme:
+            return f"{self._warehouse_scheme}://{file_path}"
+        return f"file://{file_path}"
+
+    def _build_partition_values(self, split: Split) -> 
daft.recordbatch.RecordBatch | None:
+        """Build a single-row RecordBatch encoding the partition values for a 
split."""
+        if not self._table.partition_keys:
+            return None
+
+        partition_dict = split.partition.to_dict()
+        arrays: dict[str, daft.Series] = {}
+        for pfield in self._table.partition_keys_fields:
+            value = partition_dict.get(pfield.name)
+            arrow_type = self._partition_field_arrow_types[pfield.name]
+            arrays[pfield.name] = daft.Series.from_arrow(pa.array([value], 
type=arrow_type), name=pfield.name)
+
+        if not arrays:
+            return None
+        return daft.recordbatch.RecordBatch.from_pydict(arrays)
diff --git a/paimon-python/pypaimon/daft/daft_io_config.py 
b/paimon-python/pypaimon/daft/daft_io_config.py
new file mode 100644
index 0000000000..ef3da7e98d
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_io_config.py
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from __future__ import annotations
+
+from urllib.parse import urlparse
+
+from daft.io import IOConfig, S3Config
+
+
+def _convert_paimon_catalog_options_to_io_config(catalog_options: dict[str, 
str]) -> IOConfig | None:
+    """Convert pypaimon catalog options to Daft IOConfig.
+
+    pypaimon supports only S3-like (s3://, s3a://, s3n://, oss://), HDFS, and 
local (file://).
+
+    OSS uses Daft's OpenDAL backend (opendal_backends={"oss": {...}}) rather 
than S3Config,
+    because Daft routes oss:// URIs to OpenDAL's OSS operator, not the 
S3-compatible path.
+    """
+    warehouse = catalog_options.get("warehouse", "")
+    scheme = urlparse(warehouse).scheme if warehouse else ""
+
+    if scheme == "oss":
+        parsed = urlparse(warehouse)
+        oss_cfg: dict[str, str] = {}
+
+        bucket = parsed.netloc
+        if bucket:
+            oss_cfg["bucket"] = bucket
+
+        endpoint = catalog_options.get("fs.oss.endpoint")
+        if endpoint:
+            if not endpoint.startswith(("http://";, "https://";)):
+                endpoint = f"https://{endpoint}";
+            oss_cfg["endpoint"] = endpoint
+
+        key_id = catalog_options.get("fs.oss.accessKeyId")
+        if key_id:
+            oss_cfg["access_key_id"] = key_id
+
+        key_secret = catalog_options.get("fs.oss.accessKeySecret")
+        if key_secret:
+            oss_cfg["access_key_secret"] = key_secret
+
+        region = catalog_options.get("fs.oss.region")
+        if region:
+            oss_cfg["region"] = region
+
+        token = catalog_options.get("fs.oss.securityToken")
+        if token:
+            oss_cfg["security_token"] = token
+
+        return IOConfig(opendal_backends={"oss": oss_cfg}) if oss_cfg else None
+
+    # S3-compatible (s3://, s3a://, s3n://)
+    any_props_set = False
+
+    def get(key: str) -> str | None:
+        nonlocal any_props_set
+        val = catalog_options.get(key)
+        if val is not None:
+            any_props_set = True
+        return val
+
+    io_config = IOConfig(
+        s3=S3Config(
+            endpoint_url=get("fs.s3.endpoint"),
+            region_name=get("fs.s3.region"),
+            key_id=get("fs.s3.accessKeyId"),
+            access_key=get("fs.s3.accessKeySecret"),
+            session_token=get("fs.s3.securityToken"),
+        ),
+    )
+
+    return io_config if any_props_set else None
diff --git a/paimon-python/pypaimon/daft/daft_paimon.py 
b/paimon-python/pypaimon/daft/daft_paimon.py
new file mode 100644
index 0000000000..b14050f884
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_paimon.py
@@ -0,0 +1,161 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Top-level API for reading and writing Paimon tables with Daft DataFrames.
+
+Usage::
+
+    from pypaimon.daft import read_paimon, write_paimon
+
+    df = read_paimon("db.table", catalog_options={"warehouse": "/path"})
+    write_paimon(df, "db.table", catalog_options={"warehouse": "/path"})
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Dict, Optional
+
+if TYPE_CHECKING:
+    import daft
+
+    from pypaimon.table.file_store_table import FileStoreTable
+
+
+def _read_table(
+    table: FileStoreTable,
+    catalog_options: Dict[str, str] | None = None,
+    io_config=None,
+    snapshot_id: int | None = None,
+    tag_name: str | None = None,
+) -> "daft.DataFrame":
+    """Read a Paimon table object into a lazy Daft DataFrame."""
+    if snapshot_id is not None and tag_name is not None:
+        raise ValueError(
+            "snapshot_id and tag_name cannot be set at the same time"
+        )
+
+    from daft import context, runners
+    from daft.daft import StorageConfig
+
+    from pypaimon.daft.daft_datasource import PaimonDataSource
+    from pypaimon.daft.daft_io_config import (
+        _convert_paimon_catalog_options_to_io_config,
+    )
+
+    travel_options: dict[str, str] = {}
+    if snapshot_id is not None:
+        travel_options["scan.snapshot-id"] = str(snapshot_id)
+    if tag_name is not None:
+        travel_options["scan.tag-name"] = tag_name
+    if travel_options:
+        table = table.copy(travel_options)
+
+    if catalog_options is None:
+        catalog_options = {}
+
+    io_config = io_config or 
_convert_paimon_catalog_options_to_io_config(catalog_options)
+    io_config = io_config or 
context.get_context().daft_planning_config.default_io_config
+
+    multithreaded_io = runners.get_or_create_runner().name != "ray"
+    storage_config = StorageConfig(multithreaded_io, io_config)
+
+    warehouse = catalog_options.get("warehouse", "")
+    scan_catalog_options = {"warehouse": warehouse} if warehouse else {}
+
+    source = PaimonDataSource(
+        table, storage_config=storage_config, 
catalog_options=scan_catalog_options
+    )
+    return source.read()
+
+
+def _write_table(
+    df: "daft.DataFrame",
+    table: FileStoreTable,
+    mode: str = "append",
+) -> "daft.DataFrame":
+    """Write a Daft DataFrame to a Paimon table object."""
+    from pypaimon.daft.daft_datasink import PaimonDataSink
+
+    return df.write_sink(PaimonDataSink(table, mode))
+
+
+def read_paimon(
+    table_identifier: str,
+    catalog_options: Dict[str, str],
+    *,
+    snapshot_id: Optional[int] = None,
+    tag_name: Optional[str] = None,
+    io_config=None,
+) -> "daft.DataFrame":
+    """Read a Paimon table into a lazy Daft DataFrame.
+
+    Returns a lazy DataFrame backed by Daft's optimizer. Use standard
+    DataFrame operations (.select, .where, .limit) for projection,
+    filtering, and limit — they are automatically pushed down into the
+    Paimon scan via Daft's DataSource protocol.
+
+    Args:
+        table_identifier: Full table name, e.g. ``"db_name.table_name"``.
+        catalog_options: Options passed to ``CatalogFactory.create()``,
+            e.g. ``{"warehouse": "/path/to/warehouse"}``.
+        snapshot_id: Optional snapshot id to time-travel to. Mutually
+            exclusive with ``tag_name``.
+        tag_name: Optional tag name to time-travel to. Mutually
+            exclusive with ``snapshot_id``.
+        io_config: Optional Daft IOConfig for accessing object storage.
+            If None, will be inferred from the catalog options.
+
+    Returns:
+        A lazy ``daft.DataFrame`` backed by this Paimon table.
+    """
+    from pypaimon.catalog.catalog_factory import CatalogFactory
+
+    catalog = CatalogFactory.create(catalog_options)
+    table = catalog.get_table(table_identifier)
+
+    return _read_table(
+        table, catalog_options=catalog_options,
+        io_config=io_config, snapshot_id=snapshot_id, tag_name=tag_name,
+    )
+
+
+def write_paimon(
+    df: "daft.DataFrame",
+    table_identifier: str,
+    catalog_options: Dict[str, str],
+    *,
+    mode: str = "append",
+) -> "daft.DataFrame":
+    """Write a Daft DataFrame to a Paimon table.
+
+    Args:
+        df: The Daft DataFrame to write.
+        table_identifier: Full table name, e.g. ``"db_name.table_name"``.
+        catalog_options: Options passed to ``CatalogFactory.create()``.
+        mode: Write mode — ``"append"`` or ``"overwrite"``.
+
+    Returns:
+        A summary ``daft.DataFrame`` with columns
+        (operation, rows, file_size, file_name).
+    """
+    from pypaimon.catalog.catalog_factory import CatalogFactory
+
+    catalog = CatalogFactory.create(catalog_options)
+    table = catalog.get_table(table_identifier)
+
+    return _write_table(df, table, mode=mode)
diff --git a/paimon-python/pypaimon/daft/daft_predicate_visitor.py 
b/paimon-python/pypaimon/daft/daft_predicate_visitor.py
new file mode 100644
index 0000000000..0a3d03d5cf
--- /dev/null
+++ b/paimon-python/pypaimon/daft/daft_predicate_visitor.py
@@ -0,0 +1,248 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Filter conversion utilities for Paimon pushdowns.
+
+This module provides utilities to convert Daft expressions to Paimon predicates
+for filter pushdown optimization using the Visitor pattern.
+"""
+
+from __future__ import annotations
+
+import logging
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
+
+from daft.expressions.visitor import PredicateVisitor
+
+if TYPE_CHECKING:
+    from pypaimon.common.predicate import Predicate
+    from pypaimon.common.predicate_builder import PredicateBuilder
+    from pypaimon.table.file_store_table import FileStoreTable
+
+    from daft.daft import PyExpr
+    from daft.expressions import Expression
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(frozen=True, slots=True)
+class _ColRef:
+    """Column reference marker to distinguish columns from literal values in 
the tree fold."""
+
+    name: str
+
+
+class PaimonPredicateVisitor(PredicateVisitor[Any]):
+    """Tree fold visitor that converts Daft expressions to Paimon predicates.
+
+    Leaf nodes return their values (_ColRef for columns, raw values for 
literals).
+    Predicate nodes return Paimon Predicate objects, or None if unsupported.
+
+    Supported operations:
+    - Comparison: ==, !=, <, <=, >, >=
+    - Is null / Is not null
+    - Is in
+    - Between (inclusive)
+    - String: startswith, endswith, contains
+    - Logical: and, or
+    """
+
+    def __init__(self, builder: PredicateBuilder) -> None:
+        self._builder = builder
+
+    # -- Leaf nodes --
+
+    def visit_col(self, name: str) -> _ColRef:
+        return _ColRef(name)
+
+    def visit_lit(self, value: Any) -> Any:
+        return value
+
+    def visit_alias(self, expr: Expression, alias: str) -> Any:
+        return self.visit(expr)
+
+    def visit_cast(self, expr: Expression, dtype: Any) -> None:
+        return None
+
+    def visit_coalesce(self, args: list[Expression]) -> None:
+        return None
+
+    def visit_function(self, name: str, args: list[Expression]) -> None:
+        logger.debug("Function '%s' is not supported for Paimon pushdown", 
name)
+
+    # -- Logical operators --
+
+    def visit_and(self, left: Expression, right: Expression) -> Predicate | 
None:
+        left_pred = self.visit(left)
+        right_pred = self.visit(right)
+        if left_pred is not None and right_pred is not None:
+            return self._builder.and_predicates([left_pred, right_pred])
+        return None
+
+    def visit_or(self, left: Expression, right: Expression) -> Predicate | 
None:
+        left_pred = self.visit(left)
+        right_pred = self.visit(right)
+        if left_pred is not None and right_pred is not None:
+            return self._builder.or_predicates([left_pred, right_pred])
+        return None
+
+    def visit_not(self, expr: Expression) -> None:
+        return None
+
+    # -- Comparison operators --
+
+    def _cmp(self, left: Expression, right: Expression, fn: Any, fn_swapped: 
Any) -> Predicate | None:
+        """Fold a binary comparison: extract col ref and literal value, then 
apply fn.
+
+        If the column is on the right side (e.g. ``3 < col``), apply 
``fn_swapped``
+        instead so the operator is reversed along with the operands. For 
symmetric
+        operators (==, !=), ``fn`` and ``fn_swapped`` are the same.
+        """
+        lhs, rhs = self.visit(left), self.visit(right)
+        if isinstance(lhs, _ColRef) and not isinstance(rhs, _ColRef):
+            return fn(lhs.name, rhs)
+        if isinstance(rhs, _ColRef) and not isinstance(lhs, _ColRef):
+            return fn_swapped(rhs.name, lhs)
+        return None
+
+    def visit_equal(self, left: Expression, right: Expression) -> Predicate | 
None:
+        return self._cmp(left, right, self._builder.equal, self._builder.equal)
+
+    def visit_not_equal(self, left: Expression, right: Expression) -> 
Predicate | None:
+        return self._cmp(left, right, self._builder.not_equal, 
self._builder.not_equal)
+
+    def visit_less_than(self, left: Expression, right: Expression) -> 
Predicate | None:
+        return self._cmp(left, right, self._builder.less_than, 
self._builder.greater_than)
+
+    def visit_less_than_or_equal(self, left: Expression, right: Expression) -> 
Predicate | None:
+        return self._cmp(left, right, self._builder.less_or_equal, 
self._builder.greater_or_equal)
+
+    def visit_greater_than(self, left: Expression, right: Expression) -> 
Predicate | None:
+        return self._cmp(left, right, self._builder.greater_than, 
self._builder.less_than)
+
+    def visit_greater_than_or_equal(self, left: Expression, right: Expression) 
-> Predicate | None:
+        return self._cmp(left, right, self._builder.greater_or_equal, 
self._builder.less_or_equal)
+
+    # -- Set/range predicates --
+
+    def visit_between(self, expr: Expression, lower: Expression, upper: 
Expression) -> Predicate | None:
+        col = self.visit(expr)
+        if not isinstance(col, _ColRef):
+            return None
+        lower_val = self.visit(lower)
+        upper_val = self.visit(upper)
+        if lower_val is None or upper_val is None:
+            return None
+        return self._builder.between(col.name, lower_val, upper_val)
+
+    def visit_is_in(self, expr: Expression, items: list[Expression]) -> 
Predicate | None:
+        col = self.visit(expr)
+        if not isinstance(col, _ColRef):
+            return None
+        values = [self.visit(item) for item in items]
+        if any(v is None or isinstance(v, _ColRef) for v in values):
+            return None
+        return self._builder.is_in(col.name, values)
+
+    # -- Null predicates --
+
+    def visit_is_null(self, expr: Expression) -> Predicate | None:
+        col = self.visit(expr)
+        return self._builder.is_null(col.name) if isinstance(col, _ColRef) 
else None
+
+    def visit_not_null(self, expr: Expression) -> Predicate | None:
+        col = self.visit(expr)
+        return self._builder.is_not_null(col.name) if isinstance(col, _ColRef) 
else None
+
+    # -- String predicates --
+
+    def visit_starts_with(self, input: Expression, prefix: Expression) -> 
Predicate | None:
+        col = self.visit(input)
+        if not isinstance(col, _ColRef):
+            return None
+        return self._builder.startswith(col.name, str(self.visit(prefix)))
+
+    def visit_ends_with(self, input: Expression, suffix: Expression) -> 
Predicate | None:
+        col = self.visit(input)
+        if not isinstance(col, _ColRef):
+            return None
+        return self._builder.endswith(col.name, str(self.visit(suffix)))
+
+    def visit_contains(self, input: Expression, substring: Expression) -> 
Predicate | None:
+        col = self.visit(input)
+        if not isinstance(col, _ColRef):
+            return None
+        return self._builder.contains(col.name, str(self.visit(substring)))
+
+
+def convert_filters_to_paimon(
+    table: FileStoreTable,
+    py_filters: list[PyExpr] | PyExpr,
+) -> tuple[list[PyExpr], list[PyExpr], Predicate | None]:
+    """Convert Daft filters to Paimon predicate.
+
+    Args:
+        table: Paimon table object (used to create predicate builder)
+        py_filters: Single PyExpr filter or list of PyExpr filters to convert
+
+    Returns:
+        Tuple of (pushed_filters, remaining_filters, combined_predicate)
+    """
+    from daft.expressions import Expression
+
+    if not isinstance(py_filters, list):
+        py_filters = [py_filters]
+
+    if not py_filters:
+        return [], [], None
+
+    read_builder = table.new_read_builder()
+    predicate_builder = read_builder.new_predicate_builder()
+    converter = PaimonPredicateVisitor(predicate_builder)
+
+    pushed_filters: list[PyExpr] = []
+    remaining_filters: list[PyExpr] = []
+    predicates: list[Predicate] = []
+
+    for py_expr in py_filters:
+        expr = Expression._from_pyexpr(py_expr)
+        predicate = converter.visit(expr)
+
+        if predicate is not None:
+            pushed_filters.append(py_expr)
+            predicates.append(predicate)
+        else:
+            remaining_filters.append(py_expr)
+            logger.debug("Filter %s cannot be pushed down to Paimon", expr)
+
+    combined_predicate: Predicate | None = None
+    if predicates:
+        combined_predicate = predicates[0]
+        for pred in predicates[1:]:
+            combined_predicate = 
predicate_builder.and_predicates([combined_predicate, pred])
+
+    if pushed_filters:
+        logger.debug(
+            "Paimon filter pushdown: %d filters pushed, %d remaining",
+            len(pushed_filters),
+            len(remaining_filters),
+        )
+
+    return pushed_filters, remaining_filters, combined_predicate
diff --git a/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py 
b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
new file mode 100644
index 0000000000..63ba577531
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
@@ -0,0 +1,233 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for PaimonCatalog REST catalog path (using mocks, no real server 
needed)."""
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from daft.catalog import Identifier, NotFoundError
+from pypaimon.catalog.catalog_exception import (
+    DatabaseNotExistException,
+    TableNotExistException,
+)
+
+from pypaimon.daft.daft_catalog import PaimonCatalog
+
+# ---------------------------------------------------------------------------
+# Helpers: build a mock inner catalog that mimics RESTCatalog's interface
+# ---------------------------------------------------------------------------
+
+
+def _make_rest_inner(
+    databases: list[str] | None = None,
+    tables_by_db: dict[str, list[str]] | None = None,
+):
+    """Return a mock that quacks like a RESTCatalog."""
+    inner = MagicMock(spec=pypaimon.catalog.catalog.Catalog)
+
+    inner.list_databases = MagicMock(return_value=databases or [])
+    inner.list_tables = MagicMock(side_effect=lambda db: (tables_by_db or 
{}).get(db, []))
+    inner.drop_database = MagicMock()
+    inner.drop_table = MagicMock()
+
+    inner.get_database = MagicMock()
+    inner.create_database = MagicMock()
+    inner.get_table = MagicMock()
+    inner.create_table = MagicMock()
+
+    return inner
+
+
+# ---------------------------------------------------------------------------
+# _list_namespaces — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_list_namespaces_delegates_to_list_databases():
+    inner = _make_rest_inner(databases=["db_a", "db_b", "db_c"])
+    cat = PaimonCatalog(inner)
+
+    result = cat.list_namespaces()
+
+    inner.list_databases.assert_called_once()
+    assert Identifier("db_a") in result
+    assert Identifier("db_b") in result
+    assert Identifier("db_c") in result
+
+
+def test_rest_list_namespaces_with_pattern():
+    inner = _make_rest_inner(databases=["prod_orders", "prod_users", 
"staging_data"])
+    cat = PaimonCatalog(inner)
+
+    result = cat.list_namespaces(pattern="prod")
+
+    assert all(str(r).startswith("prod") for r in result)
+    assert len(result) == 2
+
+
+def test_rest_list_namespaces_empty():
+    inner = _make_rest_inner(databases=[])
+    cat = PaimonCatalog(inner)
+    assert cat.list_namespaces() == []
+
+
+# ---------------------------------------------------------------------------
+# _list_tables — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_list_tables_delegates_to_list_databases_and_list_tables():
+    inner = _make_rest_inner(
+        databases=["db_a", "db_b"],
+        tables_by_db={"db_a": ["orders", "users"], "db_b": ["events"]},
+    )
+    cat = PaimonCatalog(inner)
+
+    result = cat.list_tables()
+
+    assert Identifier("db_a", "orders") in result
+    assert Identifier("db_a", "users") in result
+    assert Identifier("db_b", "events") in result
+    assert len(result) == 3
+
+
+def test_rest_list_tables_calls_list_tables_per_database():
+    inner = _make_rest_inner(
+        databases=["db_a", "db_b"],
+        tables_by_db={"db_a": ["t1"], "db_b": ["t2"]},
+    )
+    cat = PaimonCatalog(inner)
+    cat.list_tables()
+
+    assert inner.list_tables.call_count == 2
+    inner.list_tables.assert_any_call("db_a")
+    inner.list_tables.assert_any_call("db_b")
+
+
+def test_rest_list_tables_with_pattern():
+    inner = _make_rest_inner(
+        databases=["db_a"],
+        tables_by_db={"db_a": ["orders", "order_items", "users"]},
+    )
+    cat = PaimonCatalog(inner)
+
+    result = cat.list_tables(pattern="db_a.order")
+
+    assert Identifier("db_a", "orders") in result
+    assert Identifier("db_a", "order_items") in result
+    assert Identifier("db_a", "users") not in result
+
+
+def test_rest_list_tables_empty_database():
+    inner = _make_rest_inner(
+        databases=["empty_db"],
+        tables_by_db={"empty_db": []},
+    )
+    cat = PaimonCatalog(inner)
+    assert cat.list_tables() == []
+
+
+# ---------------------------------------------------------------------------
+# _drop_namespace — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_drop_namespace_delegates_to_drop_database():
+    inner = _make_rest_inner(databases=["my_db"])
+    cat = PaimonCatalog(inner)
+
+    cat.drop_namespace("my_db")
+
+    inner.drop_database.assert_called_once_with("my_db", 
ignore_if_not_exists=False)
+
+
+def test_rest_drop_namespace_not_found_raises_notfounderror():
+    inner = _make_rest_inner()
+    inner.drop_database.side_effect = DatabaseNotExistException("my_db")
+    cat = PaimonCatalog(inner)
+
+    with pytest.raises(NotFoundError):
+        cat.drop_namespace("my_db")
+
+
+# ---------------------------------------------------------------------------
+# _drop_table — REST path
+# ---------------------------------------------------------------------------
+
+
+def test_rest_drop_table_delegates_to_drop_table():
+    inner = _make_rest_inner()
+    cat = PaimonCatalog(inner)
+
+    cat.drop_table("my_db.my_table")
+
+    inner.drop_table.assert_called_once_with("my_db.my_table", 
ignore_if_not_exists=False)
+
+
+def test_rest_drop_table_not_found_raises_notfounderror():
+    inner = _make_rest_inner()
+    fake_ident = MagicMock()
+    fake_ident.get_full_name.return_value = "my_db.my_table"
+    inner.drop_table.side_effect = TableNotExistException(fake_ident)
+    cat = PaimonCatalog(inner)
+
+    with pytest.raises(NotFoundError):
+        cat.drop_table("my_db.my_table")
+
+
+# ---------------------------------------------------------------------------
+# _has_namespace — strips catalog prefix from multi-part identifiers
+# ---------------------------------------------------------------------------
+
+
+def test_has_namespace_single_part():
+    inner = _make_rest_inner()
+    inner.get_database.return_value = MagicMock()
+    cat = PaimonCatalog(inner)
+
+    assert cat.has_namespace("my_db") is True
+    inner.get_database.assert_called_once_with("my_db")
+
+
+def test_has_namespace_not_found_returns_false():
+    inner = _make_rest_inner()
+    inner.get_database.side_effect = DatabaseNotExistException("nope")
+    cat = PaimonCatalog(inner)
+
+    assert cat.has_namespace("nope") is False
+
+
+# ---------------------------------------------------------------------------
+# _create_namespace — delegates properly
+# ---------------------------------------------------------------------------
+
+
+def test_create_namespace_single_part():
+    inner = _make_rest_inner()
+    cat = PaimonCatalog(inner)
+
+    cat.create_namespace("new_db")
+
+    inner.create_database.assert_called_once_with("new_db", 
ignore_if_exists=False)
diff --git a/paimon-python/pypaimon/tests/daft/daft_catalog_test.py 
b/paimon-python/pypaimon/tests/daft/daft_catalog_test.py
new file mode 100644
index 0000000000..0b34452ed4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_catalog_test.py
@@ -0,0 +1,333 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for PaimonCatalog and PaimonTable Daft catalog wrappers."""
+
+from __future__ import annotations
+
+import pyarrow as pa
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from pypaimon.daft.daft_catalog import PaimonCatalog, PaimonTable
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
[email protected]
+def inner_catalog(tmp_path):
+    """A bare pypaimon FileSystemCatalog with a 'test_db' database."""
+    catalog = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+    catalog.create_database("test_db", ignore_if_exists=True)
+    return catalog, tmp_path
+
+
[email protected]
+def inner_catalog_with_table(inner_catalog):
+    """A pypaimon catalog pre-populated with an append-only partitioned 
table."""
+    catalog, tmp_path = inner_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int64()),
+                pa.field("name", pa.string()),
+                pa.field("dt", pa.string()),
+            ]
+        ),
+        partition_keys=["dt"],
+        options={"bucket": "1", "file.format": "parquet"},
+    )
+    catalog.create_table("test_db.test_table", schema, ignore_if_exists=True)
+
+    # Pre-populate with data
+    table = catalog.get_table("test_db.test_table")
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], pa.int64()),
+            "name": pa.array(["alice", "bob", "carol"]),
+            "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-02"]),
+        }
+    )
+    write_builder = table.new_batch_write_builder()
+    table_write = write_builder.new_write()
+    table_commit = write_builder.new_commit()
+    try:
+        table_write.write_arrow(data)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+    finally:
+        table_write.close()
+        table_commit.close()
+
+    return catalog, tmp_path
+
+
[email protected]
+def paimon_catalog(inner_catalog_with_table):
+    """Daft PaimonCatalog wrapping the pre-populated inner catalog."""
+    catalog, tmp_path = inner_catalog_with_table
+    return PaimonCatalog(catalog, name="test_paimon"), catalog, tmp_path
+
+
+# ---------------------------------------------------------------------------
+# PaimonCatalog — basic properties
+# ---------------------------------------------------------------------------
+
+
+def test_catalog_name(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    assert daft_catalog.name == "test_paimon"
+
+
+def test_catalog_default_name(inner_catalog):
+    inner, _ = inner_catalog
+    daft_catalog = PaimonCatalog(inner)
+    assert daft_catalog.name == "paimon"
+
+
+# ---------------------------------------------------------------------------
+# PaimonCatalog — namespace operations
+# ---------------------------------------------------------------------------
+
+
+def test_catalog_has_namespace(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    assert daft_catalog.has_namespace("test_db")
+    assert not daft_catalog.has_namespace("nonexistent_db")
+
+
+def test_catalog_list_namespaces(paimon_catalog):
+    from daft.catalog import Identifier
+
+    daft_catalog, _, _ = paimon_catalog
+    namespaces = daft_catalog.list_namespaces()
+    assert Identifier("test_db") in namespaces
+
+
+def test_catalog_create_namespace(tmp_path):
+    inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+    daft_catalog = PaimonCatalog(inner)
+    daft_catalog.create_namespace("new_db")
+    assert daft_catalog.has_namespace("new_db")
+
+
+def test_catalog_create_namespace_if_not_exists(tmp_path):
+    inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+    daft_catalog = PaimonCatalog(inner)
+    daft_catalog.create_namespace_if_not_exists("myns")
+    daft_catalog.create_namespace_if_not_exists("myns")  # should not raise
+    assert daft_catalog.has_namespace("myns")
+
+
+def test_catalog_drop_namespace(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    daft_catalog.create_namespace("drop_me_db")
+    assert daft_catalog.has_namespace("drop_me_db")
+    daft_catalog.drop_namespace("drop_me_db")
+    assert not daft_catalog.has_namespace("drop_me_db")
+
+
+# ---------------------------------------------------------------------------
+# PaimonCatalog — table operations
+# ---------------------------------------------------------------------------
+
+
+def test_catalog_has_table(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    assert daft_catalog.has_table("test_db.test_table")
+    assert not daft_catalog.has_table("test_db.nonexistent_table")
+    assert not daft_catalog.has_table("nonexistent_db.test_table")
+
+
+def test_catalog_list_tables(paimon_catalog):
+    from daft.catalog import Identifier
+
+    daft_catalog, _, _ = paimon_catalog
+    tables = daft_catalog.list_tables()
+    assert Identifier("test_db", "test_table") in tables
+
+
+def test_catalog_list_tables_with_pattern(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    tables = daft_catalog.list_tables(pattern="test_db")
+    assert len(tables) > 0
+    tables_no_match = daft_catalog.list_tables(pattern="other_db")
+    assert len(tables_no_match) == 0
+
+
+def test_catalog_get_table(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    table = daft_catalog.get_table("test_db.test_table")
+    assert table.name == "test_table"
+
+
+def test_catalog_get_table_not_found(paimon_catalog):
+    from daft.catalog import NotFoundError
+
+    daft_catalog, _, _ = paimon_catalog
+    with pytest.raises(NotFoundError):
+        daft_catalog.get_table("test_db.nonexistent_table")
+
+
+def test_catalog_drop_table(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    assert daft_catalog.has_table("test_db.test_table")
+    daft_catalog.drop_table("test_db.test_table")
+    assert not daft_catalog.has_table("test_db.test_table")
+
+
+def test_catalog_create_table(tmp_path):
+    inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+    inner.create_database("mydb", ignore_if_exists=True)
+    daft_catalog = PaimonCatalog(inner)
+
+    schema = daft.from_pydict({"id": [1, 2], "name": ["a", "b"]}).schema()
+    table = daft_catalog.create_table("mydb.new_table", schema)
+    assert table.name == "new_table"
+    assert daft_catalog.has_table("mydb.new_table")
+
+
+def test_catalog_create_table_with_partitions(tmp_path):
+    from daft.io.partitioning import PartitionField
+
+    inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+    inner.create_database("mydb", ignore_if_exists=True)
+    daft_catalog = PaimonCatalog(inner)
+
+    df = daft.from_pydict({"id": [1], "name": ["a"], "dt": ["2024-01-01"]})
+    schema = df.schema()
+    dt_field = schema["dt"]
+    partition_fields = [PartitionField.create(dt_field)]
+    table = daft_catalog.create_table("mydb.part_table", schema, 
partition_fields=partition_fields)
+    assert table.name == "part_table"
+    assert table.partition_keys == ["dt"]
+
+
+# ---------------------------------------------------------------------------
+# PaimonTable — read / write
+# ---------------------------------------------------------------------------
+
+
+def test_table_read(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    table = daft_catalog.get_table("test_db.test_table")
+    df = table.read()
+    result = df.sort("id").to_pydict()
+    assert result["id"] == [1, 2, 3]
+
+
+def test_table_append(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    table = daft_catalog.get_table("test_db.test_table")
+    new_rows = daft.from_pydict({"id": [99], "name": ["zara"], "dt": 
["2024-03-01"]})
+    table.append(new_rows)
+    ids = sorted(table.read().to_pydict()["id"])
+    assert 99 in ids
+
+
+def test_table_overwrite(paimon_catalog):
+    daft_catalog, _, _ = paimon_catalog
+    table = daft_catalog.get_table("test_db.test_table")
+    replacement = daft.from_pydict({"id": [100, 200], "name": ["p", "q"], 
"dt": ["2024-01-01", "2024-01-02"]})
+    table.overwrite(replacement)
+    result = sorted(table.read().to_pydict()["id"])
+    assert result == [100, 200]
+
+
+# ---------------------------------------------------------------------------
+# PaimonTable — direct wrapping
+# ---------------------------------------------------------------------------
+
+
+def test_table_direct_wrap(inner_catalog_with_table):
+    inner, _ = inner_catalog_with_table
+    inner_table = inner.get_table("test_db.test_table")
+    table = PaimonTable(inner_table)
+    assert table.name == "test_table"
+    df = table.read()
+    assert df.count_rows() == 3
+
+
+# ---------------------------------------------------------------------------
+# PaimonTable — properties
+# ---------------------------------------------------------------------------
+
+
+class TestPaimonTableProperties:
+    """Tests for PaimonTable properties."""
+
+    @pytest.fixture
+    def pk_catalog(self, tmp_path):
+        """Create a catalog with primary key table for testing properties."""
+        inner = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+        inner.create_database("test_db", ignore_if_exists=True)
+
+        schema = pypaimon.Schema.from_pyarrow_schema(
+            pa.schema(
+                [
+                    pa.field("id", pa.int64()),
+                    pa.field("name", pa.string()),
+                    pa.field("value", pa.int64()),
+                ]
+            ),
+            primary_keys=["id"],
+            options={"bucket": "2"},
+        )
+        inner.create_table("test_db.pk_table", schema, ignore_if_exists=True)
+
+        schema2 = pypaimon.Schema.from_pyarrow_schema(
+            pa.schema(
+                [
+                    pa.field("id", pa.int64()),
+                    pa.field("name", pa.string()),
+                    pa.field("dt", pa.string()),
+                ]
+            ),
+            partition_keys=["dt"],
+            primary_keys=["id"],
+            options={"bucket": "1"},
+        )
+        inner.create_table("test_db.partitioned_pk", schema2, 
ignore_if_exists=True)
+
+        return PaimonCatalog(inner)
+
+    def test_append_only_table_properties(self, paimon_catalog):
+        daft_catalog, _, _ = paimon_catalog
+        table = daft_catalog.get_table("test_db.test_table")
+        assert table.is_primary_key_table is False
+        assert table.primary_keys == []
+        assert table.partition_keys == ["dt"]
+
+    def test_primary_key_table_properties(self, pk_catalog):
+        table = pk_catalog.get_table("test_db.pk_table")
+        assert table.is_primary_key_table is True
+        assert table.primary_keys == ["id"]
+        assert table.partition_keys == []
+        assert table.bucket_count == 2
+        assert table.table_options.get("bucket") == "2"
+
+    def test_partitioned_primary_key_table_properties(self, pk_catalog):
+        table = pk_catalog.get_table("test_db.partitioned_pk")
+        assert table.is_primary_key_table is True
+        assert table.primary_keys == ["id"]
+        assert table.partition_keys == ["dt"]
+        assert table.bucket_count == 1
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py 
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
new file mode 100644
index 0000000000..e9e32efa93
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -0,0 +1,557 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for _read_table() — reads from a local Paimon filesystem catalog.
+
+All tests run without Docker or external services; pypaimon is used directly
+to create and populate test tables, then _read_table() is validated.
+"""
+
+from __future__ import annotations
+
+import decimal
+
+import pyarrow as pa
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from daft import col
+
+from pypaimon.daft.daft_paimon import _read_table
+
+
+# ---------------------------------------------------------------------------
+# Helper
+# ---------------------------------------------------------------------------
+
+
+def _write_to_paimon(table, arrow_table, mode="append", 
overwrite_partition=None):
+    write_builder = table.new_batch_write_builder()
+    if mode == "overwrite":
+        write_builder.overwrite(overwrite_partition or {})
+    table_write = write_builder.new_write()
+    table_commit = write_builder.new_commit()
+    try:
+        table_write.write_arrow(arrow_table)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+    finally:
+        table_write.close()
+        table_commit.close()
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
[email protected](scope="function")
+def local_paimon_catalog(tmp_path):
+    catalog = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+    catalog.create_database("test_db", ignore_if_exists=True)
+    return catalog, tmp_path
+
+
[email protected]
+def append_only_table(local_paimon_catalog):
+    catalog, tmp_path = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+            pa.field("value", pa.float64()),
+            pa.field("dt", pa.string()),
+        ]),
+        partition_keys=["dt"],
+        options={"bucket": "1", "file.format": "parquet"},
+    )
+    catalog.create_table("test_db.append_table", schema, 
ignore_if_exists=False)
+    return catalog.get_table("test_db.append_table"), tmp_path
+
+
[email protected]
+def pk_table(local_paimon_catalog):
+    catalog, tmp_path = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+            pa.field("dt", pa.string()),
+        ]),
+        partition_keys=["dt"],
+        primary_keys=["id", "dt"],
+        options={"bucket": "1", "file.format": "parquet"},
+    )
+    catalog.create_table("test_db.pk_table", schema, ignore_if_exists=False)
+    return catalog.get_table("test_db.pk_table"), tmp_path
+
+
+# ---------------------------------------------------------------------------
+# Basic read roundtrip
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_basic(append_only_table):
+    """Write data via pypaimon, read back via _read_table(table), verify 
correctness."""
+    table, tmp_path = append_only_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], pa.int64()),
+            "name": pa.array(["alice", "bob", "charlie"], pa.string()),
+            "value": pa.array([1.1, 2.2, 3.3], pa.float64()),
+            "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01"], 
pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table)
+    result = df.sort("id").to_arrow()
+
+    assert result.num_rows == 3
+    assert result.schema.field("id").type == pa.int64()
+    assert result.schema.field("name").type in (pa.string(), pa.large_string())
+    assert result.column("id").to_pylist() == [1, 2, 3]
+    assert result.column("name").to_pylist() == ["alice", "bob", "charlie"]
+
+
+def test_read_paimon_empty_table(append_only_table):
+    """Reading an empty table should return a DataFrame with the correct 
schema but zero rows."""
+    table, _ = append_only_table
+    df = _read_table(table)
+    result = df.to_arrow()
+
+    assert result.num_rows == 0
+    assert "id" in result.schema.names
+    assert "name" in result.schema.names
+    assert "dt" in result.schema.names
+
+
+def test_read_paimon_schema_matches(append_only_table):
+    """The schema reported by _read_table(table) should match the Paimon table 
schema."""
+    table, _ = append_only_table
+    df = _read_table(table)
+    schema = df.schema()
+
+    assert "id" in schema.column_names()
+    assert "name" in schema.column_names()
+    assert "value" in schema.column_names()
+    assert "dt" in schema.column_names()
+
+
+# ---------------------------------------------------------------------------
+# Multi-partition reads
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_multiple_partitions(append_only_table):
+    """Data spread across multiple partitions should all be read back."""
+    table, _ = append_only_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3, 4], pa.int64()),
+            "name": pa.array(["a", "b", "c", "d"], pa.string()),
+            "value": pa.array([1.0, 2.0, 3.0, 4.0], pa.float64()),
+            "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-02", 
"2024-01-02"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table)
+    result = df.sort("id").to_arrow()
+
+    assert result.num_rows == 4
+    assert sorted(result.column("id").to_pylist()) == [1, 2, 3, 4]
+    assert set(result.column("dt").to_pylist()) == {"2024-01-01", "2024-01-02"}
+
+
+def test_read_paimon_partition_column_present(append_only_table):
+    """Partition columns must appear in the result schema and have correct 
values."""
+    table, _ = append_only_table
+    data = pa.table(
+        {
+            "id": pa.array([10], pa.int64()),
+            "name": pa.array(["x"], pa.string()),
+            "value": pa.array([9.9], pa.float64()),
+            "dt": pa.array(["2024-03-15"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table)
+    result = df.to_arrow()
+
+    assert "dt" in result.schema.names
+    assert result.column("dt").to_pylist() == ["2024-03-15"]
+
+
+# ---------------------------------------------------------------------------
+# Column projection
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_column_projection(append_only_table):
+    """Requesting a subset of columns should work without errors."""
+    table, _ = append_only_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2], pa.int64()),
+            "name": pa.array(["a", "b"], pa.string()),
+            "value": pa.array([1.0, 2.0], pa.float64()),
+            "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table).select("id", "name")
+    result = df.sort("id").to_arrow()
+
+    assert result.schema.names == ["id", "name"]
+    assert result.num_rows == 2
+    assert result.column("id").to_pylist() == [1, 2]
+
+
+def test_read_paimon_multiple_batches(local_paimon_catalog):
+    """Writing data in two separate batches (two snapshots) should read all 
rows."""
+    catalog, tmp_path = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema(
+            [
+                pa.field("id", pa.int64()),
+                pa.field("v", pa.string()),
+                pa.field("dt", pa.string()),
+            ]
+        ),
+        partition_keys=["dt"],
+        options={"bucket": "1", "file.format": "parquet"},
+    )
+    catalog.create_table("test_db.multi_batch", schema, ignore_if_exists=False)
+    table = catalog.get_table("test_db.multi_batch")
+
+    batch1 = pa.table({"id": [1, 2], "v": ["a", "b"], "dt": ["2024-01-01", 
"2024-01-01"]})
+    batch2 = pa.table({"id": [3, 4], "v": ["c", "d"], "dt": ["2024-01-02", 
"2024-01-02"]})
+
+    _write_to_paimon(table, batch1)
+    _write_to_paimon(table, batch2)
+
+    df = _read_table(table)
+    result = df.sort("id").to_arrow()
+
+    assert result.num_rows == 4
+    assert result.column("id").to_pylist() == [1, 2, 3, 4]
+
+
+# ---------------------------------------------------------------------------
+# Primary-key table (LSM merge fall-back)
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_pk_table_basic(pk_table):
+    """Primary-key table: falls back to pypaimon reader, data should be 
correct."""
+    table, _ = pk_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], pa.int64()),
+            "name": pa.array(["x", "y", "z"], pa.string()),
+            "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01"], 
pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table)
+    result = df.sort("id").to_arrow()
+
+    assert result.num_rows == 3
+    assert result.column("id").to_pylist() == [1, 2, 3]
+    assert result.column("name").to_pylist() == ["x", "y", "z"]
+
+
+def test_read_paimon_pk_table_deduplication(pk_table):
+    """Writing the same PK twice should result in the latest value being 
visible."""
+    table, _ = pk_table
+    batch1 = pa.table(
+        {
+            "id": pa.array([1, 2], pa.int64()),
+            "name": pa.array(["old_a", "old_b"], pa.string()),
+            "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch1)
+
+    batch2 = pa.table(
+        {
+            "id": pa.array([1], pa.int64()),
+            "name": pa.array(["new_a"], pa.string()),
+            "dt": pa.array(["2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch2)
+
+    df = _read_table(table)
+    result = df.sort("id").to_arrow()
+
+    assert result.num_rows == 2
+    id1_row = [row for row in zip(result.column("id").to_pylist(), 
result.column("name").to_pylist()) if row[0] == 1]
+    assert len(id1_row) == 1
+    assert id1_row[0][1] == "new_a"
+
+
+# ---------------------------------------------------------------------------
+# Filter pushdown
+# ---------------------------------------------------------------------------
+
+
+def test_read_paimon_partition_filter(append_only_table):
+    """Partition filter should prune partitions at scan time."""
+    table, _ = append_only_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3, 4], pa.int64()),
+            "name": pa.array(["a", "b", "c", "d"], pa.string()),
+            "value": pa.array([1.0, 2.0, 3.0, 4.0], pa.float64()),
+            "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-02", 
"2024-01-02"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table).where(col("dt") == "2024-01-01")
+    result = df.sort("id").to_arrow()
+
+    assert result.num_rows == 2
+    assert result.column("id").to_pylist() == [1, 2]
+    assert all(dt == "2024-01-01" for dt in result.column("dt").to_pylist())
+
+
+def test_read_paimon_row_filter(append_only_table):
+    """Row-level filter should be applied after reading data."""
+    table, _ = append_only_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3, 4], pa.int64()),
+            "name": pa.array(["alice", "bob", "charlie", "dave"], pa.string()),
+            "value": pa.array([10.0, 20.0, 30.0, 40.0], pa.float64()),
+            "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01", 
"2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table).where(col("id") > 2)
+    result = df.sort("id").to_arrow()
+
+    assert result.num_rows == 2
+    assert result.column("id").to_pylist() == [3, 4]
+
+
+def test_read_paimon_combined_filter(append_only_table):
+    """Combined partition + row filter should work together."""
+    table, _ = append_only_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3, 4, 5, 6], pa.int64()),
+            "name": pa.array(["a", "b", "c", "d", "e", "f"], pa.string()),
+            "value": pa.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], pa.float64()),
+            "dt": pa.array(
+                ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02", 
"2024-01-03", "2024-01-03"],
+                pa.string(),
+            ),
+        }
+    )
+    _write_to_paimon(table, data)
+
+    df = _read_table(table).where((col("dt") == "2024-01-01") & (col("id") == 
2))
+    result = df.to_arrow()
+
+    assert result.num_rows == 1
+    assert result.column("id").to_pylist() == [2]
+    assert result.column("dt").to_pylist() == ["2024-01-01"]
+
+
+# ---------------------------------------------------------------------------
+# Filter pushdown tests
+# ---------------------------------------------------------------------------
+
+
+class TestFilterPushdown:
+    """Tests for filter pushdown to Paimon."""
+
+    @pytest.fixture
+    def filter_table(self, local_paimon_catalog):
+        """Create a table for filter pushdown tests."""
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("value", pa.string())])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.filter_test", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.filter_test")
+
+        data = pa.table({"id": [1, 2, 3, 4, 5], "value": ["a", "b", "c", "d", 
"e"]})
+        _write_to_paimon(table, data)
+        return table
+
+    def test_filter_pushdown_equal(self, filter_table):
+        df = _read_table(filter_table).where(col("id") == 3)
+        result = df.to_pydict()
+        assert result["id"] == [3]
+        assert result["value"] == ["c"]
+
+    def test_filter_pushdown_comparison(self, filter_table):
+        df = _read_table(filter_table).where(col("id") > 3)
+        result = df.to_pydict()
+        assert result["id"] == [4, 5]
+
+        df = _read_table(filter_table).where(col("id") <= 2)
+        result = df.to_pydict()
+        assert result["id"] == [1, 2]
+
+    def test_filter_pushdown_is_in(self, filter_table):
+        df = _read_table(filter_table).where(col("id").is_in([2, 4]))
+        result = df.to_pydict()
+        assert result["id"] == [2, 4]
+
+    def test_filter_pushdown_is_null(self, local_paimon_catalog):
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("value", pa.string())])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.filter_null", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.filter_null")
+
+        data = pa.table({"id": [1, 2, 3], "value": ["a", None, "c"]})
+        _write_to_paimon(table, data)
+
+        df = _read_table(table).where(col("value").is_null())
+        result = df.to_pydict()
+        assert result["id"] == [2]
+
+    def test_filter_pushdown_string(self, filter_table):
+        df = _read_table(filter_table).where(col("value").startswith("a"))
+        result = df.to_pydict()
+        assert result["value"] == ["a"]
+
+        df = _read_table(filter_table).where(col("value").contains("b"))
+        result = df.to_pydict()
+        assert result["value"] == ["b"]
+
+    def test_filter_pushdown_combined(self, filter_table):
+        df = _read_table(filter_table).where((col("id") >= 2) & (col("id") <= 
4))
+        result = df.to_pydict()
+        assert result["id"] == [2, 3, 4]
+
+
+# ---------------------------------------------------------------------------
+# Advanced data types
+# ---------------------------------------------------------------------------
+
+
+class TestNestedTypes:
+    """Tests for nested data types (list, map, struct)."""
+
+    def test_read_nested_list(self, local_paimon_catalog):
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("list_col", 
pa.list_(pa.int64()))])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.nested_list", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.nested_list")
+
+        data = pa.table({"id": [1, 2], "list_col": [[1, 2, 3], [4, 5]]}, 
schema=pa_schema)
+        _write_to_paimon(table, data)
+
+        df = _read_table(table)
+        result = df.to_pydict()
+        assert len(result["id"]) == 2
+        assert result["list_col"][0] == [1, 2, 3]
+
+    def test_read_nested_map(self, local_paimon_catalog):
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("map_col", 
pa.map_(pa.string(), pa.int64()))])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.nested_map", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.nested_map")
+
+        data = pa.table(
+            {"id": [1, 2], "map_col": [[("k1", 1), ("k2", 2)], [("k3", 3)]]},
+            schema=pa_schema,
+        )
+        _write_to_paimon(table, data)
+
+        df = _read_table(table)
+        result = df.to_pydict()
+        assert len(result["id"]) == 2
+
+
+class TestDecimalType:
+    """Tests for decimal type support."""
+
+    def test_read_decimal(self, local_paimon_catalog):
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("amount", 
pa.decimal128(10, 2))])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.decimal_table", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.decimal_table")
+
+        data = pa.table(
+            {
+                "id": [1, 2, 3],
+                "amount": [decimal.Decimal("100.50"), 
decimal.Decimal("200.75"), decimal.Decimal("300.00")],
+            },
+            schema=pa_schema,
+        )
+        _write_to_paimon(table, data)
+
+        df = _read_table(table)
+        result = df.to_pydict()
+        assert len(result["id"]) == 3
+
+
+class TestNullValues:
+    """Tests for null value handling."""
+
+    def test_read_with_nulls(self, local_paimon_catalog):
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("name", pa.string()), 
("value", pa.float64())])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.null_table", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.null_table")
+
+        data = pa.table({"id": [1, 2, 3], "name": ["alice", None, "charlie"], 
"value": [1.1, 2.2, None]})
+        _write_to_paimon(table, data)
+
+        df = _read_table(table)
+        result = df.to_pydict()
+        assert len(result["id"]) == 3
+        assert result["name"][1] is None
+        assert result["value"][2] is None
+
+
+class TestCompositePrimaryKey:
+    """Tests for composite primary key tables."""
+
+    def test_read_composite_pk_table(self, local_paimon_catalog):
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("pk1", pa.int64()), ("pk2", pa.string()), 
("value", pa.float64())])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(
+            pa_schema, primary_keys=["pk1", "pk2"], options={"bucket": "2"}
+        )
+        catalog.create_table("test_db.composite_pk", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.composite_pk")
+
+        data = pa.table({"pk1": [1, 1, 2], "pk2": ["a", "b", "a"], "value": 
[1.1, 2.2, 3.3]})
+        _write_to_paimon(table, data)
+
+        df = _read_table(table)
+        result = df.to_pydict()
+        assert len(result["pk1"]) == 3
diff --git a/paimon-python/pypaimon/tests/daft/daft_sink_test.py 
b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
new file mode 100644
index 0000000000..b5b9a171d3
--- /dev/null
+++ b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
@@ -0,0 +1,416 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for _write_table() — writes to a local Paimon filesystem catalog.
+
+All tests run without Docker or external services. Data written via Daft is
+verified by reading back with both _read_table() and pypaimon's native
+reader to ensure correctness.
+"""
+
+from __future__ import annotations
+
+import pyarrow as pa
+import pytest
+
+pypaimon = pytest.importorskip("pypaimon")
+daft = pytest.importorskip("daft")
+
+from pypaimon.daft.daft_compat import has_file_range_reads
+from pypaimon.daft.daft_catalog import PaimonTable
+from pypaimon.daft.daft_paimon import _read_table, _write_table
+
+requires_blob = pytest.mark.skipif(not has_file_range_reads(), reason="BLOB 
support requires daft >= 0.7.11")
+
+
+# ---------------------------------------------------------------------------
+# Helpers & Fixtures
+# ---------------------------------------------------------------------------
+
+
+def _write_to_paimon(table, arrow_table, mode="append", 
overwrite_partition=None):
+    write_builder = table.new_batch_write_builder()
+    if mode == "overwrite":
+        write_builder.overwrite(overwrite_partition or {})
+    table_write = write_builder.new_write()
+    table_commit = write_builder.new_commit()
+    try:
+        table_write.write_arrow(arrow_table)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+    finally:
+        table_write.close()
+        table_commit.close()
+
+
[email protected](scope="function")
+def local_paimon_catalog(tmp_path):
+    catalog = pypaimon.CatalogFactory.create({"warehouse": str(tmp_path)})
+    catalog.create_database("test_db", ignore_if_exists=True)
+    return catalog, tmp_path
+
+
[email protected]
+def append_only_table(local_paimon_catalog):
+    catalog, tmp_path = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+            pa.field("value", pa.float64()),
+            pa.field("dt", pa.string()),
+        ]),
+        partition_keys=["dt"],
+        options={"bucket": "1", "file.format": "parquet"},
+    )
+    catalog.create_table("test_db.append_table", schema, 
ignore_if_exists=False)
+    return catalog.get_table("test_db.append_table"), tmp_path
+
+
[email protected]
+def append_only_table_no_partition(local_paimon_catalog):
+    catalog, tmp_path = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+        ]),
+        options={"bucket": "1", "file.format": "parquet"},
+    )
+    catalog.create_table("test_db.append_no_part", schema, 
ignore_if_exists=False)
+    return catalog.get_table("test_db.append_no_part"), tmp_path
+
+
[email protected]
+def pk_table(local_paimon_catalog):
+    catalog, tmp_path = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+            pa.field("dt", pa.string()),
+        ]),
+        partition_keys=["dt"],
+        primary_keys=["id", "dt"],
+        options={"bucket": "1", "file.format": "parquet"},
+    )
+    catalog.create_table("test_db.pk_table", schema, ignore_if_exists=False)
+    return catalog.get_table("test_db.pk_table"), tmp_path
+
+
+# ---------------------------------------------------------------------------
+# Basic append
+# ---------------------------------------------------------------------------
+
+
+def test_write_paimon_append_basic(append_only_table):
+    """_write_table with mode='append' should persist data readable by 
_read_table."""
+    table, _ = append_only_table
+    df = daft.from_pydict(
+        {
+            "id": [1, 2, 3],
+            "name": ["alice", "bob", "charlie"],
+            "value": [1.1, 2.2, 3.3],
+            "dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
+        }
+    )
+    _write_table(df, table)
+
+    result = _read_table(table).sort("id").to_arrow()
+    assert result.num_rows == 3
+    assert result.column("id").to_pylist() == [1, 2, 3]
+    assert result.column("name").to_pylist() == ["alice", "bob", "charlie"]
+
+
+def test_write_paimon_append_returns_summary(append_only_table):
+    """_write_table should return a DataFrame with operation metadata 
columns."""
+    table, _ = append_only_table
+    df = daft.from_pydict(
+        {
+            "id": [10, 20],
+            "name": ["x", "y"],
+            "value": [5.0, 6.0],
+            "dt": ["2024-02-01", "2024-02-01"],
+        }
+    )
+    result = _write_table(df, table)
+    result_dict = result.to_pydict()
+
+    assert "operation" in result_dict
+    assert "rows" in result_dict
+    assert "file_size" in result_dict
+    assert "file_name" in result_dict
+
+    assert all(op == "ADD" for op in result_dict["operation"])
+    assert sum(result_dict["rows"]) == 2
+    assert all(s > 0 for s in result_dict["file_size"])
+    assert all(len(fn) > 0 for fn in result_dict["file_name"])
+
+
+def test_write_paimon_append_multiple_times(append_only_table):
+    """Multiple append writes should accumulate rows."""
+    table, _ = append_only_table
+    df1 = daft.from_pydict({"id": [1], "name": ["a"], "value": [1.0], "dt": 
["2024-01-01"]})
+    df2 = daft.from_pydict({"id": [2], "name": ["b"], "value": [2.0], "dt": 
["2024-01-02"]})
+    _write_table(df1, table)
+    _write_table(df2, table)
+
+    result = _read_table(table).sort("id").to_arrow()
+    assert result.num_rows == 2
+    assert result.column("id").to_pylist() == [1, 2]
+
+
+def test_write_paimon_roundtrip_native_verify(append_only_table):
+    """Data written by Daft should also be readable via pypaimon's native 
reader."""
+    table, _ = append_only_table
+    df = daft.from_pydict(
+        {
+            "id": [7, 8, 9],
+            "name": ["p", "q", "r"],
+            "value": [7.0, 8.0, 9.0],
+            "dt": ["2024-05-01", "2024-05-01", "2024-05-01"],
+        }
+    )
+    _write_table(df, table)
+
+    # Verify via pypaimon native reader
+    read_builder = table.new_read_builder()
+    table_scan = read_builder.new_scan()
+    table_read = read_builder.new_read()
+    splits = table_scan.plan().splits()
+    arrow_table = table_read.to_arrow(splits)
+
+    assert arrow_table.num_rows == 3
+    ids = sorted(arrow_table.column("id").to_pylist())
+    assert ids == [7, 8, 9]
+
+
+# ---------------------------------------------------------------------------
+# Overwrite
+# ---------------------------------------------------------------------------
+
+
+def 
test_write_paimon_overwrite_full_unpartitioned(append_only_table_no_partition):
+    """mode='overwrite' on an unpartitioned table should replace all existing 
data."""
+    table, _ = append_only_table_no_partition
+    initial = daft.from_pydict({"id": [1, 2], "name": ["a", "b"]})
+    _write_table(initial, table)
+
+    replacement = daft.from_pydict({"id": [100], "name": ["z"]})
+    result = _write_table(replacement, table, mode="overwrite")
+    result_dict = result.to_pydict()
+    assert all(op == "OVERWRITE" for op in result_dict["operation"])
+
+    final = _read_table(table).to_arrow()
+    assert final.num_rows == 1
+    assert final.column("id").to_pylist() == [100]
+
+
+def test_write_paimon_overwrite_dynamic_partition(append_only_table):
+    """mode='overwrite' on a partitioned table should only replace touched 
partitions."""
+    table, _ = append_only_table
+    initial = daft.from_pydict(
+        {
+            "id": [1, 2, 3, 4],
+            "name": ["a", "b", "c", "d"],
+            "value": [1.0, 2.0, 3.0, 4.0],
+            "dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
+        }
+    )
+    _write_table(initial, table)
+
+    replacement = daft.from_pydict({"id": [10], "name": ["x"], "value": 
[10.0], "dt": ["2024-01-01"]})
+    result = _write_table(replacement, table, mode="overwrite")
+    result_dict = result.to_pydict()
+    assert all(op == "OVERWRITE" for op in result_dict["operation"])
+
+    final = _read_table(table).sort("id").to_pydict()
+    assert final["id"] == [3, 4, 10]
+    assert final["dt"] == ["2024-01-02", "2024-01-02", "2024-01-01"]
+
+
+# ---------------------------------------------------------------------------
+# Error handling
+# ---------------------------------------------------------------------------
+
+
+def test_write_paimon_invalid_mode(append_only_table):
+    """An unsupported mode should raise a ValueError."""
+    table, _ = append_only_table
+    df = daft.from_pydict({"id": [1], "name": ["a"], "value": [1.0], "dt": 
["2024-01-01"]})
+    with pytest.raises(ValueError, match="Only 'append' or 'overwrite' mode is 
supported"):
+        _write_table(df, table, mode="upsert")
+
+
+def test_write_paimon_pk_table(pk_table):
+    """Writing to a PK table should work and be readable back."""
+    table, _ = pk_table
+    df = daft.from_pydict(
+        {
+            "id": [1, 2, 3],
+            "name": ["x", "y", "z"],
+            "dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
+        }
+    )
+    _write_table(df, table)
+
+    result = _read_table(table).sort("id").to_arrow()
+    assert result.num_rows == 3
+    assert result.column("id").to_pylist() == [1, 2, 3]
+
+
+# ---------------------------------------------------------------------------
+# Schema conversion tests
+# ---------------------------------------------------------------------------
+
+
+class TestSchemaConversion:
+    """Tests for schema conversion utilities."""
+
+    def test_write_large_string_conversion(self, local_paimon_catalog):
+        """Test that large_string columns are converted to string for 
pypaimon."""
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("text", pa.string())])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.large_str", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.large_str")
+
+        df = daft.from_pydict({"id": [1, 2, 3], "text": ["a", "b", "c"]})
+        _write_table(df, table, mode="append")
+
+        result = _read_table(table).to_pydict()
+        assert result["id"] == [1, 2, 3]
+        assert result["text"] == ["a", "b", "c"]
+
+    def test_write_large_binary_conversion(self, local_paimon_catalog):
+        """Test that large_binary columns are converted to binary for 
pypaimon."""
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("data", pa.binary())])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.large_bin", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.large_bin")
+
+        df = daft.from_pydict({"id": [1, 2], "data": [b"abc", b"def"]})
+        _write_table(df, table, mode="append")
+
+        result = _read_table(table).to_pydict()
+        assert result["id"] == [1, 2]
+
+
+# ---------------------------------------------------------------------------
+# Complex type tests
+# ---------------------------------------------------------------------------
+
+
+class TestComplexTypes:
+    """Tests for writing complex data types."""
+
+    def test_write_nested_list(self, local_paimon_catalog):
+        """Test writing Paimon table with list type."""
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("list_col", 
pa.list_(pa.int64()))])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.write_list", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.write_list")
+
+        df = daft.from_pydict({"id": [1, 2], "list_col": [[1, 2, 3], [4, 5]]})
+        _write_table(df, table, mode="append")
+
+        result = _read_table(table).to_pydict()
+        assert result["id"] == [1, 2]
+
+
+@requires_blob
+class TestBlobType:
+    """Tests for BLOB type support (pypaimon 1.4+)."""
+
+    def test_write_read_blob_type(self, local_paimon_catalog):
+        """Test that BLOB columns are returned as FileReference objects."""
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([("id", pa.int64()), ("blob_data", 
pa.large_binary())])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                "bucket": "1",
+                "file.format": "parquet",
+                "row-tracking.enabled": "true",
+                "data-evolution.enabled": "true",
+            },
+        )
+        catalog.create_table("test_db.blob_table", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.blob_table")
+
+        df = daft.from_pydict({"id": [1, 2], "blob_data": [b"hello", 
b"world"]})
+        _write_table(df, table, mode="append")
+
+        result_df = _read_table(table).sort("id")
+        assert str(result_df.schema()["blob_data"].dtype) == "File[Unknown]"
+
+        result = result_df.to_pydict()
+        assert result["id"] == [1, 2]
+
+        blob_refs = result["blob_data"]
+        assert len(blob_refs) == 2
+        for ref in blob_refs:
+            assert isinstance(ref, daft.File)
+            assert isinstance(ref.path, str)
+            assert ".blob" in ref.path
+            assert ref.offset is not None
+            assert ref.length is not None
+
+
+# ---------------------------------------------------------------------------
+# Truncate tests
+# ---------------------------------------------------------------------------
+
+
+class TestTruncate:
+    """Tests for table truncate operations (pypaimon 1.4+)."""
+
+    def test_truncate_table(self, append_only_table):
+        """truncate() should remove all data from the table."""
+        table, _ = append_only_table
+        df = daft.from_pydict(
+            {"id": [1, 2, 3], "name": ["a", "b", "c"], "value": [1.0, 2.0, 
3.0], "dt": ["2024-01-01"] * 3}
+        )
+        _write_table(df, table)
+        assert _read_table(table).count_rows() == 3
+
+        paimon_table = PaimonTable(table)
+        paimon_table.truncate()
+        assert _read_table(table).count_rows() == 0
+
+    def test_truncate_partitions(self, append_only_table):
+        """truncate_partitions() should remove only the specified partition 
data."""
+        table, _ = append_only_table
+        df = daft.from_pydict(
+            {
+                "id": [1, 2, 3, 4],
+                "name": ["a", "b", "c", "d"],
+                "value": [1.0, 2.0, 3.0, 4.0],
+                "dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
+            }
+        )
+        _write_table(df, table)
+        assert _read_table(table).count_rows() == 4
+
+        paimon_table = PaimonTable(table)
+        paimon_table.truncate_partitions([{"dt": "2024-01-01"}])
+        result = _read_table(table).sort("id").to_pydict()
+        assert result["id"] == [3, 4]
+        assert result["dt"] == ["2024-01-02", "2024-01-02"]
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index ef8ef3a2df..f420a813c0 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -344,7 +344,9 @@ class DataWriter(ABC):
             }
         
         column_type = column_array.type
-        supports_minmax = not (pa.types.is_nested(column_type) or 
pa.types.is_map(column_type))
+        supports_minmax = not (
+            pa.types.is_nested(column_type) or pa.types.is_map(column_type) or 
pa.types.is_large_binary(column_type)
+        )
         
         if not supports_minmax:
             return {
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index cd0f80e8aa..f9ce76a934 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -157,6 +157,9 @@ setup(
         'torch': [
             'torch',
         ],
+        'daft': [
+            'daft>=0.7.6; python_version>="3.10"',
+        ],
         'oss': [
             'ossfs>=2021.8; python_version<"3.8"',
             'ossfs>=2023; python_version>="3.8"'

Reply via email to