This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 98d913ec47 [python] Fix Daft fallback filter limit pushdown (#7965)
98d913ec47 is described below

commit 98d913ec47da283ed328f8fa8ff032073add418d
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 26 15:16:28 2026 +0800

    [python] Fix Daft fallback filter limit pushdown (#7965)
    
    The Daft Paimon datasource used the configured read builder for scan
    planning, but fallback split tasks rebuilt a bare `TableRead`. As a
    result, fallback reads for PK merge, non-Parquet formats, BLOB columns,
    and deletion-vector paths could miss pushed predicate/projection/limit
    state.
    
    This was correctness-sensitive because Daft may treat pushed filters as
    already handled by the source. A query filtering a fallback table could
    plan the right split but still emit unfiltered rows from that split.
    There was also a related limit-ordering issue: applying `limit` inside
    the fallback reader while Daft still had remaining row or partition
    filters could truncate rows before those filters were evaluated.
    
    This patch makes fallback tasks use a configured `TableRead`, keeps the
    required filter columns available for fallback execution, and
    centralizes the source-side limit decision so `limit` is only pushed
    when it is safe for the source to apply it before returning rows.
---
 paimon-python/pypaimon/daft/daft_datasource.py     | 207 +++++++++++++++++--
 .../pypaimon/tests/daft/daft_data_test.py          | 222 +++++++++++++++++++++
 2 files changed, 417 insertions(+), 12 deletions(-)

diff --git a/paimon-python/pypaimon/daft/daft_datasource.py 
b/paimon-python/pypaimon/daft/daft_datasource.py
index ce063bf109..de4b22bda1 100644
--- a/paimon-python/pypaimon/daft/daft_datasource.py
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -18,6 +18,7 @@
 
 from __future__ import annotations
 
+from dataclasses import dataclass
 import logging
 from typing import TYPE_CHECKING, Any
 from urllib.parse import urlparse
@@ -37,6 +38,7 @@ if TYPE_CHECKING:
     from collections.abc import AsyncIterator
 
     from pypaimon.common.predicate import Predicate
+    from pypaimon.read.table_read import TableRead
     from pypaimon.read.split import Split
     from pypaimon.table.file_store_table import FileStoreTable
 
@@ -50,6 +52,16 @@ PAIMON_FILE_FORMAT_ORC = "orc"
 PAIMON_FILE_FORMAT_AVRO = "avro"
 
 
+@dataclass(frozen=True, slots=True)
+class _ReadPushdownState:
+    reader_predicate: Predicate | None
+    planning_predicate: Predicate | None
+    requested_columns: list[str] | None
+    task_columns: list[str] | None
+    read_columns: list[str] | None
+    source_limit: int | None
+
+
 class _PaimonPKSplitTask(DataSourceTask):
     """DataSourceTask for PK-table splits that require LSM-tree merge.
 
@@ -60,14 +72,16 @@ class _PaimonPKSplitTask(DataSourceTask):
 
     def __init__(
         self,
-        table: FileStoreTable,
+        table_read: TableRead,
         split: Split,
         schema: Schema,
+        output_columns: list[str] | None = None,
         blob_column_names: set[str] | None = None,
     ) -> None:
-        self._table = table
+        self._table_read = table_read
         self._split = split
         self._schema = schema
+        self._output_columns = output_columns
         self._blob_column_names = blob_column_names or set()
 
     @property
@@ -75,9 +89,10 @@ class _PaimonPKSplitTask(DataSourceTask):
         return self._schema
 
     async def read(self) -> AsyncIterator[RecordBatch]:
-        table_read = self._table.new_read_builder().new_read()
-        reader = table_read.to_arrow_batch_reader([self._split])
+        reader = self._table_read.to_arrow_batch_reader([self._split])
         for batch in iter(reader.read_next_batch, None):
+            if self._output_columns is not None:
+                batch = batch.select(self._output_columns)
             if self._blob_column_names:
                 batch = _convert_blob_columns(batch, self._blob_column_names)
             rb = RecordBatch.from_arrow_record_batches([batch], batch.schema)
@@ -174,6 +189,7 @@ class PaimonDataSource(DataSource):
         )
 
         self._paimon_predicate: Predicate | None = None
+        self._remaining_filters: list[PyExpr] | None = None
 
     @property
     def name(self) -> str:
@@ -197,6 +213,7 @@ class PaimonDataSource(DataSource):
         pushed_filters, remaining_filters, paimon_predicate = 
convert_filters_to_paimon(self._table, filters)
 
         self._paimon_predicate = paimon_predicate
+        self._remaining_filters = remaining_filters
 
         if pushed_filters:
             logger.debug(
@@ -213,18 +230,19 @@ class PaimonDataSource(DataSource):
             read_table = self._table.copy({"blob-as-descriptor": "true"})
 
         read_builder = read_table.new_read_builder()
+        read_pushdowns = self._read_pushdown_state(read_table, pushdowns)
 
-        if pushdowns.columns is not None:
-            read_builder = 
read_builder.with_projection(list(pushdowns.columns))
+        if read_pushdowns.requested_columns is not None:
+            read_builder = 
read_builder.with_projection(read_pushdowns.requested_columns)
 
-        if pushdowns.limit is not None:
-            read_builder = read_builder.with_limit(pushdowns.limit)
+        if read_pushdowns.source_limit is not None:
+            read_builder = read_builder.with_limit(read_pushdowns.source_limit)
 
-        if self._paimon_predicate is not None:
-            read_builder = read_builder.with_filter(self._paimon_predicate)
+        if read_pushdowns.planning_predicate is not None:
+            read_builder = 
read_builder.with_filter(read_pushdowns.planning_predicate)
             logger.debug(
                 "Applied Paimon filter pushdown predicate: %s",
-                self._paimon_predicate,
+                read_pushdowns.planning_predicate,
             )
 
         if self._table.partition_keys and pushdowns.partition_filters is None:
@@ -291,7 +309,18 @@ class PaimonDataSource(DataSource):
                     len(split.files),
                     reason,
                 )
-                yield _PaimonPKSplitTask(read_table, split, self._schema, 
self._blob_column_names)
+                yield _PaimonPKSplitTask(
+                    self._fallback_read_builder(
+                        read_table,
+                        read_pushdowns.read_columns,
+                        read_pushdowns.source_limit,
+                        read_pushdowns.reader_predicate,
+                    ).new_read(),
+                    split,
+                    self._project_schema(read_pushdowns.task_columns),
+                    read_pushdowns.task_columns,
+                    self._blob_column_names,
+                )
 
     def _build_file_uri(self, file_path: str) -> str:
         """Reconstruct a full URI from a (potentially scheme-stripped) 
file_path."""
@@ -316,3 +345,157 @@ class PaimonDataSource(DataSource):
         if not arrays:
             return None
         return daft.recordbatch.RecordBatch.from_pydict(arrays)
+
+    def _valid_output_columns(self, columns: list[str] | None) -> list[str] | 
None:
+        if columns is None:
+            return None
+        schema_names = {field.name for field in self._schema}
+        return [name for name in columns if name in schema_names]
+
+    def _task_columns(
+        self,
+        table: FileStoreTable,
+        output_columns: list[str] | None,
+        pushdowns: Pushdowns,
+    ) -> list[str] | None:
+        if output_columns is None:
+            return None
+
+        task_columns = list(output_columns)
+        filter_required_column_names = getattr(pushdowns, 
"filter_required_column_names", None)
+        required_fields = filter_required_column_names() if 
filter_required_column_names else set()
+        return self._append_existing_columns(table, task_columns, 
required_fields)
+
+    def _fallback_read_columns(
+        self,
+        table: FileStoreTable,
+        task_columns: list[str] | None,
+        paimon_predicate: Predicate | None,
+    ) -> list[str] | None:
+        if task_columns is None:
+            return None
+
+        read_columns = list(task_columns)
+        if paimon_predicate is not None:
+            from pypaimon.read.push_down_utils import _get_all_fields
+
+            return self._append_existing_columns(table, read_columns, 
_get_all_fields(paimon_predicate))
+
+        return read_columns
+
+    @staticmethod
+    def _append_existing_columns(
+        table: FileStoreTable,
+        columns: list[str],
+        required_fields: set[str],
+    ) -> list[str]:
+        if not required_fields:
+            return columns
+        existing = set(columns)
+        columns.extend(
+            field.name
+            for field in table.fields
+            if field.name in required_fields and field.name not in existing
+        )
+        return columns
+
+    def _project_schema(self, columns: list[str] | None) -> Schema:
+        if columns is None:
+            return self._schema
+        field_map = {field.name: field for field in self._schema}
+        return Schema.from_field_name_and_types(
+            [(name, field_map[name].dtype) for name in columns if name in 
field_map]
+        )
+
+    def _fallback_read_builder(
+        self,
+        table: FileStoreTable,
+        read_columns: list[str] | None,
+        limit: int | None,
+        predicate: Predicate | None,
+    ) -> Any:
+        read_builder = table.new_read_builder()
+        if read_columns is not None:
+            read_builder = read_builder.with_projection(read_columns)
+        if limit is not None:
+            read_builder = read_builder.with_limit(limit)
+        if predicate is not None:
+            read_builder = read_builder.with_filter(predicate)
+        return read_builder
+
+    def _read_pushdown_state(
+        self,
+        table: FileStoreTable,
+        pushdowns: Pushdowns,
+    ) -> _ReadPushdownState:
+        reader_predicate, filters_consumed = 
self._pushdown_filter_state(pushdowns)
+        planning_predicate = self._planning_predicate(reader_predicate)
+        requested_columns = self._valid_output_columns(pushdowns.columns)
+        task_columns = self._task_columns(table, requested_columns, pushdowns)
+        read_columns = self._fallback_read_columns(table, task_columns, 
reader_predicate)
+        source_limit = self._source_limit(
+            pushdowns,
+            reader_predicate,
+            planning_predicate,
+            filters_consumed,
+        )
+        return _ReadPushdownState(
+            reader_predicate=reader_predicate,
+            planning_predicate=planning_predicate,
+            requested_columns=requested_columns,
+            task_columns=task_columns,
+            read_columns=read_columns,
+            source_limit=source_limit,
+        )
+
+    def _pushdown_filter_state(self, pushdowns: Pushdowns) -> tuple[Predicate 
| None, bool]:
+        if self._remaining_filters is not None:
+            return self._paimon_predicate, not self._remaining_filters
+        if pushdowns.filters is None:
+            return None, True
+
+        py_expr = getattr(pushdowns.filters, "_expr", pushdowns.filters)
+        _, remaining_filters, paimon_predicate = 
convert_filters_to_paimon(self._table, [py_expr])
+        return paimon_predicate, not remaining_filters
+
+    def _planning_predicate(self, pushdown_predicate: Predicate | None) -> 
Predicate | None:
+        if pushdown_predicate is None:
+            return None
+        if not self._can_plan_predicate(pushdown_predicate):
+            return None
+        if self._paimon_predicate is not None or 
self._requires_fallback_reader():
+            return pushdown_predicate
+        return None
+
+    @staticmethod
+    def _source_limit(
+        pushdowns: Pushdowns,
+        reader_predicate: Predicate | None,
+        planning_predicate: Predicate | None,
+        filters_consumed: bool,
+    ) -> int | None:
+        if pushdowns.limit is None:
+            return None
+        if pushdowns.partition_filters is not None:
+            return None
+        if not filters_consumed:
+            return None
+        if reader_predicate is not None and planning_predicate is None:
+            return None
+        return pushdowns.limit
+
+    def _requires_fallback_reader(self) -> bool:
+        return not self._is_parquet or self._has_blob_columns or 
self._table.is_primary_key_table
+
+    def _can_plan_predicate(self, predicate: Predicate) -> bool:
+        # Missing value null-count stats make isNull unsafe for scan planning.
+        if not self._predicate_contains_is_null(predicate):
+            return True
+        return self._table.is_primary_key_table and not 
self._table.options.deletion_vectors_enabled()
+
+    def _predicate_contains_is_null(self, predicate: Predicate) -> bool:
+        if predicate.method == "isNull":
+            return True
+        if predicate.method in ("and", "or"):
+            return any(self._predicate_contains_is_null(child) for child in 
predicate.literals or [])
+        return False
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py 
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
index e9e32efa93..2eee5ba749 100644
--- a/paimon-python/pypaimon/tests/daft/daft_data_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -24,6 +24,7 @@ to create and populate test tables, then _read_table() is 
validated.
 
 from __future__ import annotations
 
+import asyncio
 import decimal
 
 import pyarrow as pa
@@ -57,6 +58,40 @@ def _write_to_paimon(table, arrow_table, mode="append", 
overwrite_partition=None
         table_commit.close()
 
 
+async def _read_paimon_source_batches(
+    table,
+    filter_expr=None,
+    columns=None,
+    limit=None,
+    call_push_filters=True,
+):
+    from daft import context, runners
+    from daft.daft import StorageConfig
+    from daft.io.pushdowns import Pushdowns
+
+    from pypaimon.daft.daft_datasource import PaimonDataSource
+
+    io_config = context.get_context().daft_planning_config.default_io_config
+    storage_config = StorageConfig(runners.get_or_create_runner().name != 
"ray", io_config)
+    source = PaimonDataSource(table, storage_config=storage_config, 
catalog_options={})
+
+    if filter_expr is not None and call_push_filters:
+        pushed_filters, remaining_filters = 
source.push_filters([filter_expr._expr])
+        assert pushed_filters
+        assert not remaining_filters
+
+    batches = []
+    fallback_task_count = 0
+    pushdowns = Pushdowns(filters=filter_expr, columns=columns, limit=limit)
+    async for task in source.get_tasks(pushdowns):
+        if type(task).__name__ == "_PaimonPKSplitTask":
+            fallback_task_count += 1
+        async for batch in task.read():
+            batches.append(batch.to_pydict())
+    assert fallback_task_count > 0
+    return batches
+
+
 # ---------------------------------------------------------------------------
 # Fixtures
 # ---------------------------------------------------------------------------
@@ -312,6 +347,193 @@ def test_read_paimon_pk_table_deduplication(pk_table):
     assert id1_row[0][1] == "new_a"
 
 
+def test_read_paimon_pk_fallback_applies_pushed_filter(pk_table):
+    """Fallback PK reads must apply filters pushed into the Paimon source."""
+    table, _ = pk_table
+    batch1 = pa.table(
+        {
+            "id": pa.array([1, 2], pa.int64()),
+            "name": pa.array(["old_a", "old_b"], pa.string()),
+            "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch1)
+
+    batch2 = pa.table(
+        {
+            "id": pa.array([1], pa.int64()),
+            "name": pa.array(["new_a"], pa.string()),
+            "dt": pa.array(["2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch2)
+
+    batches = asyncio.run(_read_paimon_source_batches(table, 
filter_expr=col("id") == 1))
+    ids = [value for batch in batches for value in batch["id"]]
+    names = [value for batch in batches for value in batch["name"]]
+
+    assert ids == [1]
+    assert names == ["new_a"]
+
+
+def test_read_paimon_pk_fallback_filters_before_projection(pk_table):
+    """Fallback reads keep filter columns available for Daft's upper filter."""
+    table, _ = pk_table
+    batch1 = pa.table(
+        {
+            "id": pa.array([1, 2], pa.int64()),
+            "name": pa.array(["old_a", "old_b"], pa.string()),
+            "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch1)
+
+    batch2 = pa.table(
+        {
+            "id": pa.array([1], pa.int64()),
+            "name": pa.array(["new_a"], pa.string()),
+            "dt": pa.array(["2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch2)
+
+    batches = asyncio.run(
+        _read_paimon_source_batches(
+            table,
+            filter_expr=col("id") == 1,
+            columns=["name"],
+        )
+    )
+
+    assert batches == [{"name": ["new_a"], "id": [1]}]
+
+
+def 
test_read_paimon_fallback_plans_pushdown_filter_without_push_filters(local_paimon_catalog):
+    """Fallback planning must use Pushdowns.filters even if push_filters was 
not called."""
+    catalog, _ = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+        ]),
+        options={
+            "file.format": "avro",
+            "source.split.target-size": "800b",
+            "source.split.open-file-cost": "600b",
+        },
+    )
+    catalog.create_table("test_db.avro_pushdown_filter", schema, 
ignore_if_exists=False)
+    table = catalog.get_table("test_db.avro_pushdown_filter")
+    _write_to_paimon(table, pa.table({"id": [1], "name": ["first"]}))
+    _write_to_paimon(table, pa.table({"id": [999], "name": ["match"]}))
+
+    batches = asyncio.run(
+        _read_paimon_source_batches(
+            table,
+            filter_expr=col("id") == 999,
+            limit=1,
+            call_push_filters=False,
+        )
+    )
+
+    assert batches == [{"id": [999], "name": ["match"]}]
+
+
+def 
test_read_paimon_fallback_keeps_limit_above_remaining_filter(local_paimon_catalog):
+    """Fallback reads must not apply limit before Daft evaluates remaining 
filters."""
+    catalog, _ = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+        ]),
+        options={"file.format": "avro"},
+    )
+    catalog.create_table("test_db.avro_remaining_filter_limit", schema, 
ignore_if_exists=False)
+    table = catalog.get_table("test_db.avro_remaining_filter_limit")
+    _write_to_paimon(table, pa.table({"id": [1, 999], "name": ["first", 
"match"]}))
+
+    result = _read_table(table).where(~(col("id") == 1)).limit(1).to_pydict()
+
+    assert result == {"id": [999], "name": ["match"]}
+
+
+def test_read_paimon_keeps_limit_above_partition_filter(append_only_table):
+    """Scan planning must not apply limit before datasource partition 
pruning."""
+    table, _ = append_only_table
+    _write_to_paimon(
+        table,
+        pa.table(
+            {
+                "id": pa.array([1], pa.int64()),
+                "name": pa.array(["first"], pa.string()),
+                "value": pa.array([1.0], pa.float64()),
+                "dt": pa.array(["2024-01-01"], pa.string()),
+            }
+        ),
+    )
+    _write_to_paimon(
+        table,
+        pa.table(
+            {
+                "id": pa.array([2], pa.int64()),
+                "name": pa.array(["match"], pa.string()),
+                "value": pa.array([2.0], pa.float64()),
+                "dt": pa.array(["2024-01-02"], pa.string()),
+            }
+        ),
+    )
+
+    result = _read_table(table).where(col("dt") == 
"2024-01-02").limit(1).to_pydict()
+
+    assert result["id"] == [2]
+
+
+def test_read_paimon_pk_fallback_filter_then_project_dataframe(pk_table):
+    """Daft may keep the filter above the source while pushing projection."""
+    table, _ = pk_table
+    batch1 = pa.table(
+        {
+            "id": pa.array([1, 2], pa.int64()),
+            "name": pa.array(["old_a", "old_b"], pa.string()),
+            "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch1)
+
+    batch2 = pa.table(
+        {
+            "id": pa.array([1], pa.int64()),
+            "name": pa.array(["new_a"], pa.string()),
+            "dt": pa.array(["2024-01-01"], pa.string()),
+        }
+    )
+    _write_to_paimon(table, batch2)
+
+    result = _read_table(table).where(col("id") == 
1).select("name").to_pydict()
+
+    assert result == {"name": ["new_a"]}
+
+
+def test_read_paimon_pk_fallback_applies_limit(pk_table):
+    """Fallback PK reads must use the same limit as the planning read 
builder."""
+    table, _ = pk_table
+    data = pa.table(
+        {
+            "id": pa.array([1, 2, 3], pa.int64()),
+            "name": pa.array(["a", "b", "c"], pa.string()),
+            "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01"], 
pa.string()),
+        }
+    )
+    _write_to_paimon(table, data)
+    _write_to_paimon(table, data)
+
+    batches = asyncio.run(_read_paimon_source_batches(table, limit=1))
+    ids = [value for batch in batches for value in batch["id"]]
+
+    assert len(ids) == 1
+
+
 # ---------------------------------------------------------------------------
 # Filter pushdown
 # ---------------------------------------------------------------------------

Reply via email to