This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 98d913ec47 [python] Fix Daft fallback filter limit pushdown (#7965)
98d913ec47 is described below
commit 98d913ec47da283ed328f8fa8ff032073add418d
Author: QuakeWang <[email protected]>
AuthorDate: Tue May 26 15:16:28 2026 +0800
[python] Fix Daft fallback filter limit pushdown (#7965)
The Daft Paimon datasource used the configured read builder for scan
planning, but fallback split tasks rebuilt a bare `TableRead`. As a
result, fallback reads for PK merge, non-Parquet formats, BLOB columns,
and deletion-vector paths could miss pushed predicate/projection/limit
state.
This was correctness-sensitive because Daft may treat pushed filters as
already handled by the source. A query filtering a fallback table could
plan the right split but still emit unfiltered rows from that split.
There was also a related limit-ordering issue: applying `limit` inside
the fallback reader while Daft still had remaining row or partition
filters could truncate rows before those filters were evaluated.
This patch makes fallback tasks use a configured `TableRead`, keeps the
required filter columns available for fallback execution, and
centralizes the source-side limit decision so `limit` is only pushed
when it is safe for the source to apply it before returning rows.
---
paimon-python/pypaimon/daft/daft_datasource.py | 207 +++++++++++++++++--
.../pypaimon/tests/daft/daft_data_test.py | 222 +++++++++++++++++++++
2 files changed, 417 insertions(+), 12 deletions(-)
diff --git a/paimon-python/pypaimon/daft/daft_datasource.py
b/paimon-python/pypaimon/daft/daft_datasource.py
index ce063bf109..de4b22bda1 100644
--- a/paimon-python/pypaimon/daft/daft_datasource.py
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -18,6 +18,7 @@
from __future__ import annotations
+from dataclasses import dataclass
import logging
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
@@ -37,6 +38,7 @@ if TYPE_CHECKING:
from collections.abc import AsyncIterator
from pypaimon.common.predicate import Predicate
+ from pypaimon.read.table_read import TableRead
from pypaimon.read.split import Split
from pypaimon.table.file_store_table import FileStoreTable
@@ -50,6 +52,16 @@ PAIMON_FILE_FORMAT_ORC = "orc"
PAIMON_FILE_FORMAT_AVRO = "avro"
+@dataclass(frozen=True, slots=True)
+class _ReadPushdownState:
+ reader_predicate: Predicate | None
+ planning_predicate: Predicate | None
+ requested_columns: list[str] | None
+ task_columns: list[str] | None
+ read_columns: list[str] | None
+ source_limit: int | None
+
+
class _PaimonPKSplitTask(DataSourceTask):
"""DataSourceTask for PK-table splits that require LSM-tree merge.
@@ -60,14 +72,16 @@ class _PaimonPKSplitTask(DataSourceTask):
def __init__(
self,
- table: FileStoreTable,
+ table_read: TableRead,
split: Split,
schema: Schema,
+ output_columns: list[str] | None = None,
blob_column_names: set[str] | None = None,
) -> None:
- self._table = table
+ self._table_read = table_read
self._split = split
self._schema = schema
+ self._output_columns = output_columns
self._blob_column_names = blob_column_names or set()
@property
@@ -75,9 +89,10 @@ class _PaimonPKSplitTask(DataSourceTask):
return self._schema
async def read(self) -> AsyncIterator[RecordBatch]:
- table_read = self._table.new_read_builder().new_read()
- reader = table_read.to_arrow_batch_reader([self._split])
+ reader = self._table_read.to_arrow_batch_reader([self._split])
for batch in iter(reader.read_next_batch, None):
+ if self._output_columns is not None:
+ batch = batch.select(self._output_columns)
if self._blob_column_names:
batch = _convert_blob_columns(batch, self._blob_column_names)
rb = RecordBatch.from_arrow_record_batches([batch], batch.schema)
@@ -174,6 +189,7 @@ class PaimonDataSource(DataSource):
)
self._paimon_predicate: Predicate | None = None
+ self._remaining_filters: list[PyExpr] | None = None
@property
def name(self) -> str:
@@ -197,6 +213,7 @@ class PaimonDataSource(DataSource):
pushed_filters, remaining_filters, paimon_predicate =
convert_filters_to_paimon(self._table, filters)
self._paimon_predicate = paimon_predicate
+ self._remaining_filters = remaining_filters
if pushed_filters:
logger.debug(
@@ -213,18 +230,19 @@ class PaimonDataSource(DataSource):
read_table = self._table.copy({"blob-as-descriptor": "true"})
read_builder = read_table.new_read_builder()
+ read_pushdowns = self._read_pushdown_state(read_table, pushdowns)
- if pushdowns.columns is not None:
- read_builder =
read_builder.with_projection(list(pushdowns.columns))
+ if read_pushdowns.requested_columns is not None:
+ read_builder =
read_builder.with_projection(read_pushdowns.requested_columns)
- if pushdowns.limit is not None:
- read_builder = read_builder.with_limit(pushdowns.limit)
+ if read_pushdowns.source_limit is not None:
+ read_builder = read_builder.with_limit(read_pushdowns.source_limit)
- if self._paimon_predicate is not None:
- read_builder = read_builder.with_filter(self._paimon_predicate)
+ if read_pushdowns.planning_predicate is not None:
+ read_builder =
read_builder.with_filter(read_pushdowns.planning_predicate)
logger.debug(
"Applied Paimon filter pushdown predicate: %s",
- self._paimon_predicate,
+ read_pushdowns.planning_predicate,
)
if self._table.partition_keys and pushdowns.partition_filters is None:
@@ -291,7 +309,18 @@ class PaimonDataSource(DataSource):
len(split.files),
reason,
)
- yield _PaimonPKSplitTask(read_table, split, self._schema,
self._blob_column_names)
+ yield _PaimonPKSplitTask(
+ self._fallback_read_builder(
+ read_table,
+ read_pushdowns.read_columns,
+ read_pushdowns.source_limit,
+ read_pushdowns.reader_predicate,
+ ).new_read(),
+ split,
+ self._project_schema(read_pushdowns.task_columns),
+ read_pushdowns.task_columns,
+ self._blob_column_names,
+ )
def _build_file_uri(self, file_path: str) -> str:
"""Reconstruct a full URI from a (potentially scheme-stripped)
file_path."""
@@ -316,3 +345,157 @@ class PaimonDataSource(DataSource):
if not arrays:
return None
return daft.recordbatch.RecordBatch.from_pydict(arrays)
+
+ def _valid_output_columns(self, columns: list[str] | None) -> list[str] |
None:
+ if columns is None:
+ return None
+ schema_names = {field.name for field in self._schema}
+ return [name for name in columns if name in schema_names]
+
+ def _task_columns(
+ self,
+ table: FileStoreTable,
+ output_columns: list[str] | None,
+ pushdowns: Pushdowns,
+ ) -> list[str] | None:
+ if output_columns is None:
+ return None
+
+ task_columns = list(output_columns)
+ filter_required_column_names = getattr(pushdowns,
"filter_required_column_names", None)
+ required_fields = filter_required_column_names() if
filter_required_column_names else set()
+ return self._append_existing_columns(table, task_columns,
required_fields)
+
+ def _fallback_read_columns(
+ self,
+ table: FileStoreTable,
+ task_columns: list[str] | None,
+ paimon_predicate: Predicate | None,
+ ) -> list[str] | None:
+ if task_columns is None:
+ return None
+
+ read_columns = list(task_columns)
+ if paimon_predicate is not None:
+ from pypaimon.read.push_down_utils import _get_all_fields
+
+ return self._append_existing_columns(table, read_columns,
_get_all_fields(paimon_predicate))
+
+ return read_columns
+
+ @staticmethod
+ def _append_existing_columns(
+ table: FileStoreTable,
+ columns: list[str],
+ required_fields: set[str],
+ ) -> list[str]:
+ if not required_fields:
+ return columns
+ existing = set(columns)
+ columns.extend(
+ field.name
+ for field in table.fields
+ if field.name in required_fields and field.name not in existing
+ )
+ return columns
+
+ def _project_schema(self, columns: list[str] | None) -> Schema:
+ if columns is None:
+ return self._schema
+ field_map = {field.name: field for field in self._schema}
+ return Schema.from_field_name_and_types(
+ [(name, field_map[name].dtype) for name in columns if name in
field_map]
+ )
+
+ def _fallback_read_builder(
+ self,
+ table: FileStoreTable,
+ read_columns: list[str] | None,
+ limit: int | None,
+ predicate: Predicate | None,
+ ) -> Any:
+ read_builder = table.new_read_builder()
+ if read_columns is not None:
+ read_builder = read_builder.with_projection(read_columns)
+ if limit is not None:
+ read_builder = read_builder.with_limit(limit)
+ if predicate is not None:
+ read_builder = read_builder.with_filter(predicate)
+ return read_builder
+
+ def _read_pushdown_state(
+ self,
+ table: FileStoreTable,
+ pushdowns: Pushdowns,
+ ) -> _ReadPushdownState:
+ reader_predicate, filters_consumed =
self._pushdown_filter_state(pushdowns)
+ planning_predicate = self._planning_predicate(reader_predicate)
+ requested_columns = self._valid_output_columns(pushdowns.columns)
+ task_columns = self._task_columns(table, requested_columns, pushdowns)
+ read_columns = self._fallback_read_columns(table, task_columns,
reader_predicate)
+ source_limit = self._source_limit(
+ pushdowns,
+ reader_predicate,
+ planning_predicate,
+ filters_consumed,
+ )
+ return _ReadPushdownState(
+ reader_predicate=reader_predicate,
+ planning_predicate=planning_predicate,
+ requested_columns=requested_columns,
+ task_columns=task_columns,
+ read_columns=read_columns,
+ source_limit=source_limit,
+ )
+
+ def _pushdown_filter_state(self, pushdowns: Pushdowns) -> tuple[Predicate
| None, bool]:
+ if self._remaining_filters is not None:
+ return self._paimon_predicate, not self._remaining_filters
+ if pushdowns.filters is None:
+ return None, True
+
+ py_expr = getattr(pushdowns.filters, "_expr", pushdowns.filters)
+ _, remaining_filters, paimon_predicate =
convert_filters_to_paimon(self._table, [py_expr])
+ return paimon_predicate, not remaining_filters
+
+ def _planning_predicate(self, pushdown_predicate: Predicate | None) ->
Predicate | None:
+ if pushdown_predicate is None:
+ return None
+ if not self._can_plan_predicate(pushdown_predicate):
+ return None
+ if self._paimon_predicate is not None or
self._requires_fallback_reader():
+ return pushdown_predicate
+ return None
+
+ @staticmethod
+ def _source_limit(
+ pushdowns: Pushdowns,
+ reader_predicate: Predicate | None,
+ planning_predicate: Predicate | None,
+ filters_consumed: bool,
+ ) -> int | None:
+ if pushdowns.limit is None:
+ return None
+ if pushdowns.partition_filters is not None:
+ return None
+ if not filters_consumed:
+ return None
+ if reader_predicate is not None and planning_predicate is None:
+ return None
+ return pushdowns.limit
+
+ def _requires_fallback_reader(self) -> bool:
+ return not self._is_parquet or self._has_blob_columns or
self._table.is_primary_key_table
+
+ def _can_plan_predicate(self, predicate: Predicate) -> bool:
+ # Missing value null-count stats make isNull unsafe for scan planning.
+ if not self._predicate_contains_is_null(predicate):
+ return True
+ return self._table.is_primary_key_table and not
self._table.options.deletion_vectors_enabled()
+
+ def _predicate_contains_is_null(self, predicate: Predicate) -> bool:
+ if predicate.method == "isNull":
+ return True
+ if predicate.method in ("and", "or"):
+ return any(self._predicate_contains_is_null(child) for child in
predicate.literals or [])
+ return False
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
index e9e32efa93..2eee5ba749 100644
--- a/paimon-python/pypaimon/tests/daft/daft_data_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -24,6 +24,7 @@ to create and populate test tables, then _read_table() is
validated.
from __future__ import annotations
+import asyncio
import decimal
import pyarrow as pa
@@ -57,6 +58,40 @@ def _write_to_paimon(table, arrow_table, mode="append",
overwrite_partition=None
table_commit.close()
+async def _read_paimon_source_batches(
+ table,
+ filter_expr=None,
+ columns=None,
+ limit=None,
+ call_push_filters=True,
+):
+ from daft import context, runners
+ from daft.daft import StorageConfig
+ from daft.io.pushdowns import Pushdowns
+
+ from pypaimon.daft.daft_datasource import PaimonDataSource
+
+ io_config = context.get_context().daft_planning_config.default_io_config
+ storage_config = StorageConfig(runners.get_or_create_runner().name !=
"ray", io_config)
+ source = PaimonDataSource(table, storage_config=storage_config,
catalog_options={})
+
+ if filter_expr is not None and call_push_filters:
+ pushed_filters, remaining_filters =
source.push_filters([filter_expr._expr])
+ assert pushed_filters
+ assert not remaining_filters
+
+ batches = []
+ fallback_task_count = 0
+ pushdowns = Pushdowns(filters=filter_expr, columns=columns, limit=limit)
+ async for task in source.get_tasks(pushdowns):
+ if type(task).__name__ == "_PaimonPKSplitTask":
+ fallback_task_count += 1
+ async for batch in task.read():
+ batches.append(batch.to_pydict())
+ assert fallback_task_count > 0
+ return batches
+
+
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@@ -312,6 +347,193 @@ def test_read_paimon_pk_table_deduplication(pk_table):
assert id1_row[0][1] == "new_a"
+def test_read_paimon_pk_fallback_applies_pushed_filter(pk_table):
+ """Fallback PK reads must apply filters pushed into the Paimon source."""
+ table, _ = pk_table
+ batch1 = pa.table(
+ {
+ "id": pa.array([1, 2], pa.int64()),
+ "name": pa.array(["old_a", "old_b"], pa.string()),
+ "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch1)
+
+ batch2 = pa.table(
+ {
+ "id": pa.array([1], pa.int64()),
+ "name": pa.array(["new_a"], pa.string()),
+ "dt": pa.array(["2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch2)
+
+ batches = asyncio.run(_read_paimon_source_batches(table,
filter_expr=col("id") == 1))
+ ids = [value for batch in batches for value in batch["id"]]
+ names = [value for batch in batches for value in batch["name"]]
+
+ assert ids == [1]
+ assert names == ["new_a"]
+
+
+def test_read_paimon_pk_fallback_filters_before_projection(pk_table):
+ """Fallback reads keep filter columns available for Daft's upper filter."""
+ table, _ = pk_table
+ batch1 = pa.table(
+ {
+ "id": pa.array([1, 2], pa.int64()),
+ "name": pa.array(["old_a", "old_b"], pa.string()),
+ "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch1)
+
+ batch2 = pa.table(
+ {
+ "id": pa.array([1], pa.int64()),
+ "name": pa.array(["new_a"], pa.string()),
+ "dt": pa.array(["2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch2)
+
+ batches = asyncio.run(
+ _read_paimon_source_batches(
+ table,
+ filter_expr=col("id") == 1,
+ columns=["name"],
+ )
+ )
+
+ assert batches == [{"name": ["new_a"], "id": [1]}]
+
+
+def
test_read_paimon_fallback_plans_pushdown_filter_without_push_filters(local_paimon_catalog):
+ """Fallback planning must use Pushdowns.filters even if push_filters was
not called."""
+ catalog, _ = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ ]),
+ options={
+ "file.format": "avro",
+ "source.split.target-size": "800b",
+ "source.split.open-file-cost": "600b",
+ },
+ )
+ catalog.create_table("test_db.avro_pushdown_filter", schema,
ignore_if_exists=False)
+ table = catalog.get_table("test_db.avro_pushdown_filter")
+ _write_to_paimon(table, pa.table({"id": [1], "name": ["first"]}))
+ _write_to_paimon(table, pa.table({"id": [999], "name": ["match"]}))
+
+ batches = asyncio.run(
+ _read_paimon_source_batches(
+ table,
+ filter_expr=col("id") == 999,
+ limit=1,
+ call_push_filters=False,
+ )
+ )
+
+ assert batches == [{"id": [999], "name": ["match"]}]
+
+
+def
test_read_paimon_fallback_keeps_limit_above_remaining_filter(local_paimon_catalog):
+ """Fallback reads must not apply limit before Daft evaluates remaining
filters."""
+ catalog, _ = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ ]),
+ options={"file.format": "avro"},
+ )
+ catalog.create_table("test_db.avro_remaining_filter_limit", schema,
ignore_if_exists=False)
+ table = catalog.get_table("test_db.avro_remaining_filter_limit")
+ _write_to_paimon(table, pa.table({"id": [1, 999], "name": ["first",
"match"]}))
+
+ result = _read_table(table).where(~(col("id") == 1)).limit(1).to_pydict()
+
+ assert result == {"id": [999], "name": ["match"]}
+
+
+def test_read_paimon_keeps_limit_above_partition_filter(append_only_table):
+ """Scan planning must not apply limit before datasource partition
pruning."""
+ table, _ = append_only_table
+ _write_to_paimon(
+ table,
+ pa.table(
+ {
+ "id": pa.array([1], pa.int64()),
+ "name": pa.array(["first"], pa.string()),
+ "value": pa.array([1.0], pa.float64()),
+ "dt": pa.array(["2024-01-01"], pa.string()),
+ }
+ ),
+ )
+ _write_to_paimon(
+ table,
+ pa.table(
+ {
+ "id": pa.array([2], pa.int64()),
+ "name": pa.array(["match"], pa.string()),
+ "value": pa.array([2.0], pa.float64()),
+ "dt": pa.array(["2024-01-02"], pa.string()),
+ }
+ ),
+ )
+
+ result = _read_table(table).where(col("dt") ==
"2024-01-02").limit(1).to_pydict()
+
+ assert result["id"] == [2]
+
+
+def test_read_paimon_pk_fallback_filter_then_project_dataframe(pk_table):
+ """Daft may keep the filter above the source while pushing projection."""
+ table, _ = pk_table
+ batch1 = pa.table(
+ {
+ "id": pa.array([1, 2], pa.int64()),
+ "name": pa.array(["old_a", "old_b"], pa.string()),
+ "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch1)
+
+ batch2 = pa.table(
+ {
+ "id": pa.array([1], pa.int64()),
+ "name": pa.array(["new_a"], pa.string()),
+ "dt": pa.array(["2024-01-01"], pa.string()),
+ }
+ )
+ _write_to_paimon(table, batch2)
+
+ result = _read_table(table).where(col("id") ==
1).select("name").to_pydict()
+
+ assert result == {"name": ["new_a"]}
+
+
+def test_read_paimon_pk_fallback_applies_limit(pk_table):
+ """Fallback PK reads must use the same limit as the planning read
builder."""
+ table, _ = pk_table
+ data = pa.table(
+ {
+ "id": pa.array([1, 2, 3], pa.int64()),
+ "name": pa.array(["a", "b", "c"], pa.string()),
+ "dt": pa.array(["2024-01-01", "2024-01-01", "2024-01-01"],
pa.string()),
+ }
+ )
+ _write_to_paimon(table, data)
+ _write_to_paimon(table, data)
+
+ batches = asyncio.run(_read_paimon_source_batches(table, limit=1))
+ ids = [value for batch in batches for value in batch["id"]]
+
+ assert len(ids) == 1
+
+
# ---------------------------------------------------------------------------
# Filter pushdown
# ---------------------------------------------------------------------------