This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 58ed52a42c [python][daft] Preserve pushed filters across source 
serialization (#8061)
58ed52a42c is described below

commit 58ed52a42c5b43d1ca6f9c23f3b4a5c6b5328949
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 1 22:04:06 2026 +0800

    [python][daft] Preserve pushed filters across source serialization (#8061)
    
    `PaimonDataSource.__setstate__()` restores pushed filter state before
    reopening the table, but `_init_table()` then reset `_pushed_filters`,
    `_paimon_predicate`, and `_remaining_filters` to `None`.
    
    This made serialized Daft sources lose filters already accepted by
    `push_filters()`. In fallback reads, `get_tasks(Pushdowns(filters=None,
    limit=1))` could then plan an unfiltered limited read and return the
    wrong row.
    
    This PR moves pushdown state initialization to `__init__()`, keeps
    `_init_table()` limited to table-derived metadata, and includes
    `_pushed_filters` in the serialized state for consistent explain/debug
    output.
---
 paimon-python/pypaimon/daft/daft_datasource.py     |  7 +--
 .../pypaimon/tests/daft/daft_data_test.py          | 66 +++++++++++++++++++---
 2 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/paimon-python/pypaimon/daft/daft_datasource.py 
b/paimon-python/pypaimon/daft/daft_datasource.py
index e7c8c547d1..5308ff0178 100644
--- a/paimon-python/pypaimon/daft/daft_datasource.py
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -283,6 +283,7 @@ class PaimonDataSource(DataSource):
         table_path = getattr(table, "table_path", None)
         self._table_path = str(table_path) if table_path is not None else None
         self._table_options = _extract_table_options(table)
+        self._pushed_filters: list[PyExpr] | None = None
         self._paimon_predicate: Predicate | None = None
         self._remaining_filters: list[PyExpr] | None = None
         self._init_table(table)
@@ -295,6 +296,7 @@ class PaimonDataSource(DataSource):
             "_table_identifier": self._table_identifier,
             "_table_path": self._table_path,
             "_table_options": self._table_options,
+            "_pushed_filters": self._pushed_filters,
             "_paimon_predicate": self._paimon_predicate,
             "_remaining_filters": self._remaining_filters,
         }
@@ -305,6 +307,7 @@ class PaimonDataSource(DataSource):
         self._table_identifier = state["_table_identifier"]
         self._table_path = state["_table_path"]
         self._table_options = state["_table_options"]
+        self._pushed_filters = state.get("_pushed_filters")
         self._paimon_predicate = state["_paimon_predicate"]
         self._remaining_filters = state["_remaining_filters"]
         self._storage_config = _build_storage_config(
@@ -361,10 +364,6 @@ class PaimonDataSource(DataSource):
             else {}
         )
 
-        self._pushed_filters: list[PyExpr] | None = None
-        self._paimon_predicate: Predicate | None = None
-        self._remaining_filters: list[PyExpr] | None = None
-
     @property
     def name(self) -> str:
         table_path = getattr(self._table, "table_path", None)
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py 
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
index 2a959e290a..5520b37100 100644
--- a/paimon-python/pypaimon/tests/daft/daft_data_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -89,6 +89,18 @@ def _write_to_paimon(table, arrow_table, mode="append", 
overwrite_partition=None
         table_commit.close()
 
 
+async def _collect_paimon_source_batches(source, pushdowns):
+    batches = []
+    fallback_task_count = 0
+    async for task in source.get_tasks(pushdowns):
+        if type(task).__name__ == "_PaimonPKSplitTask":
+            fallback_task_count += 1
+        async for batch in task.read():
+            batches.append(batch.to_pydict())
+    assert fallback_task_count > 0
+    return batches
+
+
 async def _read_paimon_source_batches(
     table,
     filter_expr=None,
@@ -111,16 +123,8 @@ async def _read_paimon_source_batches(
         assert pushed_filters
         assert not remaining_filters
 
-    batches = []
-    fallback_task_count = 0
     pushdowns = Pushdowns(filters=filter_expr, columns=columns, limit=limit)
-    async for task in source.get_tasks(pushdowns):
-        if type(task).__name__ == "_PaimonPKSplitTask":
-            fallback_task_count += 1
-        async for batch in task.read():
-            batches.append(batch.to_pydict())
-    assert fallback_task_count > 0
-    return batches
+    return await _collect_paimon_source_batches(source, pushdowns)
 
 
 # ---------------------------------------------------------------------------
@@ -245,6 +249,50 @@ def 
test_read_paimon_source_is_serializable(append_only_table):
     assert restored._storage_config.multithreaded_io is False
 
 
+def 
test_read_paimon_source_serialization_preserves_pushed_filter_for_fallback(local_paimon_catalog):
+    """A serialized source must keep filters accepted by 
SupportsPushdownFilters."""
+    from daft import context, runners
+    from daft.daft import StorageConfig
+    from daft.io.pushdowns import Pushdowns
+    from daft.pickle import dumps, loads
+
+    from pypaimon.daft.daft_datasource import PaimonDataSource
+
+    catalog, _ = local_paimon_catalog
+    schema = pypaimon.Schema.from_pyarrow_schema(
+        pa.schema([
+            pa.field("id", pa.int64()),
+            pa.field("name", pa.string()),
+        ]),
+        options={
+            "file.format": "avro",
+            "source.split.target-size": "800b",
+            "source.split.open-file-cost": "600b",
+        },
+    )
+    catalog.create_table("test_db.avro_serialized_pushdown_filter", schema, 
ignore_if_exists=False)
+    table = catalog.get_table("test_db.avro_serialized_pushdown_filter")
+    _write_to_paimon(table, pa.table({"id": [1], "name": ["first"]}))
+    _write_to_paimon(table, pa.table({"id": [999], "name": ["match"]}))
+
+    io_config = context.get_context().daft_planning_config.default_io_config
+    storage_config = StorageConfig(runners.get_or_create_runner().name != 
"ray", io_config)
+    source = PaimonDataSource(table, storage_config=storage_config, 
catalog_options={})
+    pushed_filters, remaining_filters = source.push_filters([(col("id") == 
999)._expr])
+    assert pushed_filters
+    assert not remaining_filters
+
+    restored = loads(dumps(source))
+    batches = asyncio.run(
+        _collect_paimon_source_batches(
+            restored,
+            Pushdowns(filters=None, limit=1),
+        )
+    )
+
+    assert batches == [{"id": [999], "name": ["match"]}]
+
+
 def test_read_paimon_remote_ray_task_is_serializable(pk_table, monkeypatch):
     """A fallback PK split task must reopen the table from metadata on Ray 
workers.
 

Reply via email to