This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ad151c0aff [python][daft] Fix predicate conversion for unsupported 
expressions (#7980)
ad151c0aff is described below

commit ad151c0aff45ccd3bbeace36011a4bf32f93e83c
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 27 10:37:15 2026 +0800

    [python][daft] Fix predicate conversion for unsupported expressions (#7980)
    
    Daft predicate conversion used `None` to represent both unsupported
    expressions and literal values. As a result, comparison pushdown could
    treat unsupported expressions as literals, for example converting a cast
    expression into `id = None`.
    
    String predicates also called `str(...)` on visited arguments, so column
    or unsupported arguments could be pushed as literal strings. This could
    mark filters as pushed and remove them from Daft remaining filters,
    changing query semantics.
    
    This patch adds explicit internal markers for literals and unsupported
    expressions. Predicates are only pushed when the non-column side is a
    real scalar literal, and string predicates now require real string
    literals. Unsupported or mixed expressions stay in Daft remaining
    filters.
---
 .../pypaimon/daft/daft_predicate_visitor.py        | 155 ++++++++++++++-------
 .../pypaimon/tests/daft/daft_data_test.py          |  47 ++++++-
 2 files changed, 148 insertions(+), 54 deletions(-)

diff --git a/paimon-python/pypaimon/daft/daft_predicate_visitor.py 
b/paimon-python/pypaimon/daft/daft_predicate_visitor.py
index 0a3d03d5cf..5dfe4bca3d 100644
--- a/paimon-python/pypaimon/daft/daft_predicate_visitor.py
+++ b/paimon-python/pypaimon/daft/daft_predicate_visitor.py
@@ -49,11 +49,26 @@ class _ColRef:
     name: str
 
 
+@dataclass(frozen=True, slots=True)
+class _Literal:
+    """Literal marker to keep real literal values separate from unsupported 
expressions."""
+
+    value: Any
+
+
+class _Unsupported:
+    pass
+
+
+_UNSUPPORTED = _Unsupported()
+
+
 class PaimonPredicateVisitor(PredicateVisitor[Any]):
     """Tree fold visitor that converts Daft expressions to Paimon predicates.
 
-    Leaf nodes return their values (_ColRef for columns, raw values for 
literals).
-    Predicate nodes return Paimon Predicate objects, or None if unsupported.
+    Leaf nodes return marker objects (_ColRef for columns, _Literal for literal
+    values). Predicate nodes return Paimon Predicate objects, or _UNSUPPORTED 
if
+    they cannot be converted safely.
 
     Supported operations:
     - Comparison: ==, !=, <, <=, >, >=
@@ -72,43 +87,44 @@ class PaimonPredicateVisitor(PredicateVisitor[Any]):
     def visit_col(self, name: str) -> _ColRef:
         return _ColRef(name)
 
-    def visit_lit(self, value: Any) -> Any:
-        return value
+    def visit_lit(self, value: Any) -> _Literal:
+        return _Literal(value)
 
     def visit_alias(self, expr: Expression, alias: str) -> Any:
         return self.visit(expr)
 
-    def visit_cast(self, expr: Expression, dtype: Any) -> None:
-        return None
+    def visit_cast(self, expr: Expression, dtype: Any) -> _Unsupported:
+        return _UNSUPPORTED
 
-    def visit_coalesce(self, args: list[Expression]) -> None:
-        return None
+    def visit_coalesce(self, args: list[Expression]) -> _Unsupported:
+        return _UNSUPPORTED
 
-    def visit_function(self, name: str, args: list[Expression]) -> None:
+    def visit_function(self, name: str, args: list[Expression]) -> 
_Unsupported:
         logger.debug("Function '%s' is not supported for Paimon pushdown", 
name)
+        return _UNSUPPORTED
 
     # -- Logical operators --
 
-    def visit_and(self, left: Expression, right: Expression) -> Predicate | 
None:
+    def visit_and(self, left: Expression, right: Expression) -> Predicate | 
_Unsupported:
         left_pred = self.visit(left)
         right_pred = self.visit(right)
-        if left_pred is not None and right_pred is not None:
+        if self._is_predicate(left_pred) and self._is_predicate(right_pred):
             return self._builder.and_predicates([left_pred, right_pred])
-        return None
+        return _UNSUPPORTED
 
-    def visit_or(self, left: Expression, right: Expression) -> Predicate | 
None:
+    def visit_or(self, left: Expression, right: Expression) -> Predicate | 
_Unsupported:
         left_pred = self.visit(left)
         right_pred = self.visit(right)
-        if left_pred is not None and right_pred is not None:
+        if self._is_predicate(left_pred) and self._is_predicate(right_pred):
             return self._builder.or_predicates([left_pred, right_pred])
-        return None
+        return _UNSUPPORTED
 
-    def visit_not(self, expr: Expression) -> None:
-        return None
+    def visit_not(self, expr: Expression) -> _Unsupported:
+        return _UNSUPPORTED
 
     # -- Comparison operators --
 
-    def _cmp(self, left: Expression, right: Expression, fn: Any, fn_swapped: 
Any) -> Predicate | None:
+    def _cmp(self, left: Expression, right: Expression, fn: Any, fn_swapped: 
Any) -> Predicate | _Unsupported:
         """Fold a binary comparison: extract col ref and literal value, then 
apply fn.
 
         If the column is on the right side (e.g. ``3 < col``), apply 
``fn_swapped``
@@ -116,80 +132,112 @@ class PaimonPredicateVisitor(PredicateVisitor[Any]):
         operators (==, !=), ``fn`` and ``fn_swapped`` are the same.
         """
         lhs, rhs = self.visit(left), self.visit(right)
-        if isinstance(lhs, _ColRef) and not isinstance(rhs, _ColRef):
-            return fn(lhs.name, rhs)
-        if isinstance(rhs, _ColRef) and not isinstance(lhs, _ColRef):
-            return fn_swapped(rhs.name, lhs)
-        return None
+        if isinstance(lhs, _ColRef) and self._is_pushable_literal(rhs):
+            return fn(lhs.name, rhs.value)
+        if isinstance(rhs, _ColRef) and self._is_pushable_literal(lhs):
+            return fn_swapped(rhs.name, lhs.value)
+        return _UNSUPPORTED
 
-    def visit_equal(self, left: Expression, right: Expression) -> Predicate | 
None:
+    def visit_equal(self, left: Expression, right: Expression) -> Predicate | 
_Unsupported:
         return self._cmp(left, right, self._builder.equal, self._builder.equal)
 
-    def visit_not_equal(self, left: Expression, right: Expression) -> 
Predicate | None:
+    def visit_not_equal(self, left: Expression, right: Expression) -> 
Predicate | _Unsupported:
         return self._cmp(left, right, self._builder.not_equal, 
self._builder.not_equal)
 
-    def visit_less_than(self, left: Expression, right: Expression) -> 
Predicate | None:
+    def visit_less_than(self, left: Expression, right: Expression) -> 
Predicate | _Unsupported:
         return self._cmp(left, right, self._builder.less_than, 
self._builder.greater_than)
 
-    def visit_less_than_or_equal(self, left: Expression, right: Expression) -> 
Predicate | None:
+    def visit_less_than_or_equal(self, left: Expression, right: Expression) -> 
Predicate | _Unsupported:
         return self._cmp(left, right, self._builder.less_or_equal, 
self._builder.greater_or_equal)
 
-    def visit_greater_than(self, left: Expression, right: Expression) -> 
Predicate | None:
+    def visit_greater_than(self, left: Expression, right: Expression) -> 
Predicate | _Unsupported:
         return self._cmp(left, right, self._builder.greater_than, 
self._builder.less_than)
 
-    def visit_greater_than_or_equal(self, left: Expression, right: Expression) 
-> Predicate | None:
+    def visit_greater_than_or_equal(self, left: Expression, right: Expression) 
-> Predicate | _Unsupported:
         return self._cmp(left, right, self._builder.greater_or_equal, 
self._builder.less_or_equal)
 
     # -- Set/range predicates --
 
-    def visit_between(self, expr: Expression, lower: Expression, upper: 
Expression) -> Predicate | None:
+    def visit_between(self, expr: Expression, lower: Expression, upper: 
Expression) -> Predicate | _Unsupported:
         col = self.visit(expr)
         if not isinstance(col, _ColRef):
-            return None
+            return _UNSUPPORTED
         lower_val = self.visit(lower)
         upper_val = self.visit(upper)
-        if lower_val is None or upper_val is None:
-            return None
-        return self._builder.between(col.name, lower_val, upper_val)
+        if not self._is_pushable_literal(lower_val) or not 
self._is_pushable_literal(upper_val):
+            return _UNSUPPORTED
+        return self._builder.between(col.name, lower_val.value, 
upper_val.value)
 
-    def visit_is_in(self, expr: Expression, items: list[Expression]) -> 
Predicate | None:
+    def visit_is_in(self, expr: Expression, items: list[Expression]) -> 
Predicate | _Unsupported:
         col = self.visit(expr)
         if not isinstance(col, _ColRef):
-            return None
+            return _UNSUPPORTED
         values = [self.visit(item) for item in items]
-        if any(v is None or isinstance(v, _ColRef) for v in values):
-            return None
-        return self._builder.is_in(col.name, values)
+        if not all(isinstance(v, _Literal) for v in values):
+            return _UNSUPPORTED
+        literal_values = [v.value for v in values]
+        if any(not self._is_pushable_scalar(v) for v in literal_values):
+            return _UNSUPPORTED
+        return self._builder.is_in(col.name, literal_values)
 
     # -- Null predicates --
 
-    def visit_is_null(self, expr: Expression) -> Predicate | None:
+    def visit_is_null(self, expr: Expression) -> Predicate | _Unsupported:
         col = self.visit(expr)
-        return self._builder.is_null(col.name) if isinstance(col, _ColRef) 
else None
+        return self._builder.is_null(col.name) if isinstance(col, _ColRef) 
else _UNSUPPORTED
 
-    def visit_not_null(self, expr: Expression) -> Predicate | None:
+    def visit_not_null(self, expr: Expression) -> Predicate | _Unsupported:
         col = self.visit(expr)
-        return self._builder.is_not_null(col.name) if isinstance(col, _ColRef) 
else None
+        return self._builder.is_not_null(col.name) if isinstance(col, _ColRef) 
else _UNSUPPORTED
 
     # -- String predicates --
 
-    def visit_starts_with(self, input: Expression, prefix: Expression) -> 
Predicate | None:
+    def visit_starts_with(self, input: Expression, prefix: Expression) -> 
Predicate | _Unsupported:
         col = self.visit(input)
         if not isinstance(col, _ColRef):
-            return None
-        return self._builder.startswith(col.name, str(self.visit(prefix)))
+            return _UNSUPPORTED
+        prefix_value = self._string_literal(prefix)
+        if prefix_value is None:
+            return _UNSUPPORTED
+        return self._builder.startswith(col.name, prefix_value)
 
-    def visit_ends_with(self, input: Expression, suffix: Expression) -> 
Predicate | None:
+    def visit_ends_with(self, input: Expression, suffix: Expression) -> 
Predicate | _Unsupported:
         col = self.visit(input)
         if not isinstance(col, _ColRef):
-            return None
-        return self._builder.endswith(col.name, str(self.visit(suffix)))
+            return _UNSUPPORTED
+        suffix_value = self._string_literal(suffix)
+        if suffix_value is None:
+            return _UNSUPPORTED
+        return self._builder.endswith(col.name, suffix_value)
 
-    def visit_contains(self, input: Expression, substring: Expression) -> 
Predicate | None:
+    def visit_contains(self, input: Expression, substring: Expression) -> 
Predicate | _Unsupported:
         col = self.visit(input)
         if not isinstance(col, _ColRef):
-            return None
-        return self._builder.contains(col.name, str(self.visit(substring)))
+            return _UNSUPPORTED
+        substring_value = self._string_literal(substring)
+        if substring_value is None:
+            return _UNSUPPORTED
+        return self._builder.contains(col.name, substring_value)
+
+    @staticmethod
+    def _is_predicate(value: Any) -> bool:
+        from pypaimon.common.predicate import Predicate
+
+        return isinstance(value, Predicate)
+
+    def _string_literal(self, expr: Expression) -> str | None:
+        value = self.visit(expr)
+        if isinstance(value, _Literal) and isinstance(value.value, str):
+            return value.value
+        return None
+
+    @classmethod
+    def _is_pushable_literal(cls, value: Any) -> bool:
+        return isinstance(value, _Literal) and 
cls._is_pushable_scalar(value.value)
+
+    @staticmethod
+    def _is_pushable_scalar(value: Any) -> bool:
+        return value is not None and not isinstance(value, (list, tuple, dict, 
set))
 
 
 def convert_filters_to_paimon(
@@ -206,6 +254,7 @@ def convert_filters_to_paimon(
         Tuple of (pushed_filters, remaining_filters, combined_predicate)
     """
     from daft.expressions import Expression
+    from pypaimon.common.predicate import Predicate
 
     if not isinstance(py_filters, list):
         py_filters = [py_filters]
@@ -225,7 +274,7 @@ def convert_filters_to_paimon(
         expr = Expression._from_pyexpr(py_expr)
         predicate = converter.visit(expr)
 
-        if predicate is not None:
+        if isinstance(predicate, Predicate):
             pushed_filters.append(py_expr)
             predicates.append(predicate)
         else:
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py 
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
index 2eee5ba749..9d7795cb97 100644
--- a/paimon-python/pypaimon/tests/daft/daft_data_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -33,9 +33,10 @@ import pytest
 pypaimon = pytest.importorskip("pypaimon")
 daft = pytest.importorskip("daft")
 
-from daft import col
+from daft import col, lit
 
 from pypaimon.daft.daft_paimon import _read_table
+from pypaimon.daft.daft_predicate_visitor import convert_filters_to_paimon
 
 
 # ---------------------------------------------------------------------------
@@ -673,6 +674,50 @@ class TestFilterPushdown:
         result = df.to_pydict()
         assert result["id"] == [2, 3, 4]
 
+    def test_unsupported_expression_remains_in_daft(self, filter_table):
+        expressions = [
+            col("id") == lit(1).cast("int64"),
+            col("id") == lit(None),
+            col("value").contains(col("id")),
+            col("value").startswith(col("id")),
+            col("value").endswith(col("id")),
+            col("value").contains(lit("a").cast("string")),
+            col("id").between(col("id"), 3),
+            col("id").is_in([1, None]),
+        ]
+
+        for expr in expressions:
+            pushed_filters, remaining_filters, predicate = 
convert_filters_to_paimon(filter_table, expr._expr)
+
+            assert pushed_filters == []
+            assert remaining_filters == [expr._expr]
+            assert predicate is None
+
+    def test_mixed_string_expression_is_filtered_by_daft(self, 
local_paimon_catalog):
+        catalog, tmp_path = local_paimon_catalog
+        pa_schema = pa.schema([
+            ("id", pa.int64()),
+            ("value", pa.string()),
+            ("pattern", pa.string()),
+        ])
+        paimon_schema = pypaimon.Schema.from_pyarrow_schema(pa_schema)
+        catalog.create_table("test_db.filter_mixed_string", paimon_schema, 
ignore_if_exists=True)
+        table = catalog.get_table("test_db.filter_mixed_string")
+
+        data = pa.table(
+            {
+                "id": [1, 2, 3],
+                "value": ["alpha", "bravo", "charlie"],
+                "pattern": ["lp", "zz", "lie"],
+            }
+        )
+        _write_to_paimon(table, data)
+
+        df = _read_table(table).where(col("value").contains(col("pattern")))
+        result = df.sort("id").to_pydict()
+
+        assert result["id"] == [1, 3]
+
 
 # ---------------------------------------------------------------------------
 # Advanced data types

Reply via email to