This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 58ed52a42c [python][daft] Preserve pushed filters across source
serialization (#8061)
58ed52a42c is described below
commit 58ed52a42c5b43d1ca6f9c23f3b4a5c6b5328949
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 1 22:04:06 2026 +0800
[python][daft] Preserve pushed filters across source serialization (#8061)
`PaimonDataSource.__setstate__()` restores pushed filter state before
reopening the table, but `_init_table()` then reset `_pushed_filters`,
`_paimon_predicate`, and `_remaining_filters` to `None`.
This made serialized Daft sources lose filters already accepted by
`push_filters()`. In fallback reads, `get_tasks(Pushdowns(filters=None,
limit=1))` could then plan an unfiltered limited read and return the
wrong row.
This PR moves pushdown state initialization to `__init__()`, keeps
`_init_table()` limited to table-derived metadata, and includes
`_pushed_filters` in the serialized state for consistent explain/debug
output.
---
paimon-python/pypaimon/daft/daft_datasource.py | 7 +--
.../pypaimon/tests/daft/daft_data_test.py | 66 +++++++++++++++++++---
2 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/paimon-python/pypaimon/daft/daft_datasource.py
b/paimon-python/pypaimon/daft/daft_datasource.py
index e7c8c547d1..5308ff0178 100644
--- a/paimon-python/pypaimon/daft/daft_datasource.py
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -283,6 +283,7 @@ class PaimonDataSource(DataSource):
table_path = getattr(table, "table_path", None)
self._table_path = str(table_path) if table_path is not None else None
self._table_options = _extract_table_options(table)
+ self._pushed_filters: list[PyExpr] | None = None
self._paimon_predicate: Predicate | None = None
self._remaining_filters: list[PyExpr] | None = None
self._init_table(table)
@@ -295,6 +296,7 @@ class PaimonDataSource(DataSource):
"_table_identifier": self._table_identifier,
"_table_path": self._table_path,
"_table_options": self._table_options,
+ "_pushed_filters": self._pushed_filters,
"_paimon_predicate": self._paimon_predicate,
"_remaining_filters": self._remaining_filters,
}
@@ -305,6 +307,7 @@ class PaimonDataSource(DataSource):
self._table_identifier = state["_table_identifier"]
self._table_path = state["_table_path"]
self._table_options = state["_table_options"]
+ self._pushed_filters = state.get("_pushed_filters")
self._paimon_predicate = state["_paimon_predicate"]
self._remaining_filters = state["_remaining_filters"]
self._storage_config = _build_storage_config(
@@ -361,10 +364,6 @@ class PaimonDataSource(DataSource):
else {}
)
- self._pushed_filters: list[PyExpr] | None = None
- self._paimon_predicate: Predicate | None = None
- self._remaining_filters: list[PyExpr] | None = None
-
@property
def name(self) -> str:
table_path = getattr(self._table, "table_path", None)
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
index 2a959e290a..5520b37100 100644
--- a/paimon-python/pypaimon/tests/daft/daft_data_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -89,6 +89,18 @@ def _write_to_paimon(table, arrow_table, mode="append",
overwrite_partition=None
table_commit.close()
+async def _collect_paimon_source_batches(source, pushdowns):
+ batches = []
+ fallback_task_count = 0
+ async for task in source.get_tasks(pushdowns):
+ if type(task).__name__ == "_PaimonPKSplitTask":
+ fallback_task_count += 1
+ async for batch in task.read():
+ batches.append(batch.to_pydict())
+ assert fallback_task_count > 0
+ return batches
+
+
async def _read_paimon_source_batches(
table,
filter_expr=None,
@@ -111,16 +123,8 @@ async def _read_paimon_source_batches(
assert pushed_filters
assert not remaining_filters
- batches = []
- fallback_task_count = 0
pushdowns = Pushdowns(filters=filter_expr, columns=columns, limit=limit)
- async for task in source.get_tasks(pushdowns):
- if type(task).__name__ == "_PaimonPKSplitTask":
- fallback_task_count += 1
- async for batch in task.read():
- batches.append(batch.to_pydict())
- assert fallback_task_count > 0
- return batches
+ return await _collect_paimon_source_batches(source, pushdowns)
# ---------------------------------------------------------------------------
@@ -245,6 +249,50 @@ def
test_read_paimon_source_is_serializable(append_only_table):
assert restored._storage_config.multithreaded_io is False
+def
test_read_paimon_source_serialization_preserves_pushed_filter_for_fallback(local_paimon_catalog):
+ """A serialized source must keep filters accepted by
SupportsPushdownFilters."""
+ from daft import context, runners
+ from daft.daft import StorageConfig
+ from daft.io.pushdowns import Pushdowns
+ from daft.pickle import dumps, loads
+
+ from pypaimon.daft.daft_datasource import PaimonDataSource
+
+ catalog, _ = local_paimon_catalog
+ schema = pypaimon.Schema.from_pyarrow_schema(
+ pa.schema([
+ pa.field("id", pa.int64()),
+ pa.field("name", pa.string()),
+ ]),
+ options={
+ "file.format": "avro",
+ "source.split.target-size": "800b",
+ "source.split.open-file-cost": "600b",
+ },
+ )
+ catalog.create_table("test_db.avro_serialized_pushdown_filter", schema,
ignore_if_exists=False)
+ table = catalog.get_table("test_db.avro_serialized_pushdown_filter")
+ _write_to_paimon(table, pa.table({"id": [1], "name": ["first"]}))
+ _write_to_paimon(table, pa.table({"id": [999], "name": ["match"]}))
+
+ io_config = context.get_context().daft_planning_config.default_io_config
+ storage_config = StorageConfig(runners.get_or_create_runner().name !=
"ray", io_config)
+ source = PaimonDataSource(table, storage_config=storage_config,
catalog_options={})
+ pushed_filters, remaining_filters = source.push_filters([(col("id") ==
999)._expr])
+ assert pushed_filters
+ assert not remaining_filters
+
+ restored = loads(dumps(source))
+ batches = asyncio.run(
+ _collect_paimon_source_batches(
+ restored,
+ Pushdowns(filters=None, limit=1),
+ )
+ )
+
+ assert batches == [{"id": [999], "name": ["match"]}]
+
+
def test_read_paimon_remote_ray_task_is_serializable(pk_table, monkeypatch):
"""A fallback PK split task must reopen the table from metadata on Ray
workers.