timsaucer opened a new issue, #22367:
URL: https://github.com/apache/datafusion/issues/22367

   ## Summary
   
   Since #18916 introduced `FFI_PhysicalExpr` to pass physical expressions 
across the FFI boundary without going through the protobuf physical codec 
(avoiding the codec / `TaskContext` circular dependency described in the PR), 
physical expressions originating in one shared library and consumed in another 
arrive as `ForeignPhysicalExpr` opaque wrappers. Any code in `datafusion` core 
that relies on `Any::downcast_ref::<T>()` / `as_any().is::<T>()` to identify 
concrete physical expression types silently mis-classifies these expressions, 
because the wrapper's `TypeId` is `ForeignPhysicalExpr`, not the wrapped 
concrete type.
   
   The most visible symptom today: `simplify_const_expr_immediate` (in 
`datafusion/physical-expr/src/simplifier/const_evaluator.rs`) checks 
`expr.as_any().is::<Column>()` to short-circuit. For a foreign-wrapped 
`Column`, that check is `false`. The function then walks the (empty) children 
list, vacuously concludes "all children are literals," and attempts 
`evaluate(dummy_batch)` on what is effectively a column reference. The result 
is either a spurious literal substitution (replacing the column with a wrong / 
null scalar) or a runtime error that is swallowed by the `Err(_) => 
Transformed::no` arm. Either way, predicate semantics are broken downstream of 
the simplifier. This affects row-group pruning and `open()` paths in Parquet 
scans for any `TableProvider` consumed via FFI (concrete repro: 
`datafusion-python` with a third-party Rust `TableProvider` crate).
   
   ## Reproduction
   
   Minimal repro available on request. Query `SELECT * FROM dummy_table WHERE a 
< 5` against a parquet-backed `TableProvider` whose `scan()` builds its 
predicate via `state.create_physical_expr(...)` where `state` is the 
FFI-foreign `Session`. The predicate arrives at row-group filtering as 
`ForeignPhysicalExpr(Column { name: "a", index: 0 })` and is silently rewritten 
by the simplifier.
   
   A user-side workaround: bypass `state.create_physical_expr` and build the 
`PhysicalExpr` locally:
   
   ```rust
   datafusion::physical_expr::create_physical_expr(&predicate, &df_schema, 
&ExecutionProps::new())
   ```
   
   That returns a local-typed `Column` / `Literal` / `BinaryExpr`, and the 
simplifier downcasts succeed. This works for typical filter pushdown because 
the relevant `ExecutionProps` fields (var providers, query start time, alias 
generator) are rarely used in pushdown predicates, and `ScalarUDF` instances 
ride inside the logical `Expr` rather than being looked up via a registry. It 
is brittle — it ignores anything the foreign session would have provided via 
`ExecutionProps` — and it does not solve the underlying class of bug.
   
   ## Root cause
   
   The downcast pattern `as_any().is::<T>()` / `as_any().downcast_ref::<T>()` 
assumes producer and consumer share a compilation unit. Across the 
`datafusion-ffi` boundary they do not: each shared library has its own 
monomorphization of every core type, so `TypeId::of::<Column>()` in the 
consumer differs from `TypeId::of::<Column>()` in the producer. 
`FFI_PhysicalExpr` (introduced by #18916) was designed to carry **behavior** 
across the boundary via a vtable; **identity** (the `TypeId`) is intentionally 
not carried. Core code that uses `TypeId` to dispatch — the simplifier, 
pruning, parts of the optimizer, stats extraction — therefore mis-classifies 
any FFI-wrapped expression as "not a `Column`," "not a `Literal`," etc., and 
falls through to whatever branch handles "unknown node." In 
`simplify_const_expr_immediate` that branch evaluates the expression against a 
dummy batch, which is incorrect for a column reference and silently corrupts 
the predicate.
   
   ## Alternatives considered
   
   ### Codec round-trip
   
   Serialize the `PhysicalExpr` via `PhysicalExtensionCodec` on the producer 
side and deserialize it on the consumer side. Rejected by #18916 because 
`PhysicalExtensionCodec::try_decode` requires a `FunctionRegistry` / 
`TaskContext`, which itself comes from the session being crossed — the 
round-trip dependency the PR was created to break. Any new proposal needs to 
avoid reintroducing this.
   
   ### `name()`-based dispatch
   
   Some prior discussions around analogous downcast-by-type fragility at the 
`ExecutionPlan` layer have proposed dispatching on `ExecutionPlan::name()` (the 
display name string) instead of `Any::downcast_ref`. It works mechanically — a 
foreign-wrapped `FilterExec` reports `"FilterExec"` through its vtable — but it 
is brittle:
   
   - `name()` is user-settable. Any third-party `ExecutionPlan` can return 
`"FilterExec"`, and dispatch will treat it as the core `FilterExec`.
   - It conflates display with identity. `name()` is intended for `EXPLAIN` 
output, not type discrimination. Coupling them locks in display strings as de 
facto type tags and discourages renaming for readability.
   - It does not generalize to `PhysicalExpr`, which has no equivalent stable 
display contract for every node.
   
   ### Adding identity methods to core traits (`is_column()`, `node_kind()`, 
accessors)
   
   Considered and rejected. The downcast pattern is widespread enough across 
the simplifier, pruning, and optimizer that providing a parallel method-based 
API in `PhysicalExpr` (and a corresponding one in `ExecutionPlan`) would mean a 
sustained boilerplate tax on every implementor — in core and downstream — for 
the benefit of an FFI code path that is not the common case. The cost falls on 
the wrong population.
   
   ## Proposed resolution: tiered reconstruction inside `datafusion-ffi`
   
   Two-tier model at the FFI boundary, no changes required to the 
`PhysicalExpr` or `ExecutionPlan` traits in core:
   
   - **Tier 1 — Known structural built-ins are reconstructed as local-typed 
instances on the consumer side.** At wrap time on the producer side, 
`datafusion-ffi` attempts `Any::downcast_ref` against a closed set of 
well-known core types and, on match, transports the minimal field data needed 
to rebuild:
     - `Column` → `(name, index)`
     - `Literal` → `ScalarValue` (+ optional `FieldMetadata`)
     - `BinaryExpr` → `(op, left, right)` with recursive reconstruction of 
children
     - (extend as concrete downcast call sites in core demand: `IsNullExpr`, 
`NotExpr`, `CastExpr`, `InListExpr`, `LikeExpr`, etc.)
   
     On the consumer side, FFI rebuilds these as the consumer's local `Column` 
/ `Literal` / ... so `as_any().is::<Column>()` and similar checks throughout 
core continue to work.
   
   - **Tier 2 — Unknown or third-party types stay opaque.** Wrapped via the 
existing `ForeignPhysicalExpr` vtable. Callers must not rely on `TypeId` for 
these.
   
   ### Rationale
   
   - **No core trait pollution.** Adding `is_column()` / `node_kind()` / 
accessor methods to `PhysicalExpr` (or `ExecutionPlan`) for FFI's sake would 
impose ongoing boilerplate on every implementor for the benefit of an exotic 
code path. FFI is not the common case in DataFusion; the boundary should bear 
its own cost.
   - **No codec / registry / `TaskContext` dependency.** Field-level copying of 
well-known shapes is independent of `PhysicalExtensionCodec` and the 
`FunctionRegistry`, avoiding the circular dependency #18916 was designed to 
break.
   - **Fixes the bug class, not one site.** Every `as_any().is::<Column>()` / 
`downcast_ref::<Literal>()` in core (simplifier, pruning, optimizer rules, 
stats extraction, partition pruning) starts working again for FFI-wrapped 
expressions, with no per-call-site migration.
   - **Not name-based.** Identity is established by `TypeId` on the producer 
side at wrap time, where producer and core *are* in the same compilation unit. 
Consumers receive concrete local types they own.
   - **Stable surface.** The shapes of `Column { name, index }` and `Literal { 
value, metadata }` are de facto stable public API. Future shape changes are 
explicit FFI-version events, analogous to Arrow C Data Interface evolution.
   
   ### Extending to `ExecutionPlan`
   
   The same pattern applies and addresses related downcast-by-type fragility 
observed in plan-level code. Initial Tier 1 candidates are "structural" plans: 
`FilterExec`, `ProjectionExec`, `CoalesceBatchesExec`, `RepartitionExec`, 
`SortExec`, `LimitExec`, `UnionExec`, `EmptyExec`, `PlaceholderRowExec`. Source 
plans (`DataSourceExec`, `ParquetExec`, custom sources) remain Tier 2 — they 
are rarely downcast by optimizer rules in a way that requires concrete identity.
   
   `PlanProperties` (schema, partitioning, equivalence) should be recomputed on 
the consumer side after reconstruction rather than transported, since 
equivalence groups are themselves physical expressions.
   
   ### A documentable rule
   
   If this pattern is adopted, it is worth codifying in the `datafusion-ffi` 
README:
   
   > Across the FFI boundary, **structural** types (data carriers, no `dyn 
Trait` extensibility) round-trip by value and survive `TypeId` checks. 
**Behavioral** types (extensible via `dyn Trait`) round-trip by vtable and do 
not survive `TypeId` checks. Code in core that downcasts a structural type is 
fine; code that downcasts a behavioral type is a latent FFI bug.
   
   This rule also makes core auditable: a `TypeId` check against a structural 
type is safe; against a behavioral or extension type, it is a bug.
   
   ## Trade-offs
   
   Cons of the proposed approach:
   
   - Boilerplate moves to `datafusion-ffi`. Adding a new built-in physical 
expression or execution plan requires updating the FFI encode / decode tables.
   - `datafusion-ffi` becomes coupled to the internal field shape of the 
well-known core types. Acceptable, but a real coupling.
   - `ScalarFunction` cannot be fully Tier 1 without addressing UDF identity 
across the boundary (lookup is by name and needs a registry on the consumer). 
Recommend leaving `ScalarFunction` in Tier 2 for now.
   - Recursive reconstruction of composite expressions costs O(n) FFI calls per 
filter at scan-plan time. Amortizes well; pushdown happens once per query plan.
   
   ## Recommended path
   
   1. Land a narrow defensive fix in `simplify_const_expr_immediate` so that an 
expression with zero children is never treated as "all children literal" and 
never falls through to `evaluate(dummy_batch)`. This is a small, isolated patch 
that prevents incorrect predicate rewriting today regardless of FFI. Suggested 
diff:
   
      ```rust
      let children = expr.children();
      if children.is_empty()
          || !children.iter().all(|c| c.as_any().is::<Literal>())
      {
          return Ok(Transformed::no(expr));
      }
      ```
   
   2. Implement Tier 1 reconstruction in `datafusion-ffi` starting with 
`Column` and `Literal`, then `BinaryExpr` (recursive). These three cover the 
great majority of pushdown-filter shapes and the great majority of `TypeId` 
call sites in the simplifier / pruning code paths.
   
   3. Extend Tier 1 to additional physical expressions as concrete downcast 
bugs surface (`CastExpr`, `IsNullExpr`, `NotExpr`, `InListExpr`, etc.).
   
   4. Apply the same pattern to a small `ExecutionPlan` Tier 1 (`FilterExec`, 
`ProjectionExec`) once expression-level reconstruction is stable. Drive that 
work from concrete optimizer / simplifier bugs rather than speculatively 
covering every plan.
   
   5. Document the structural-vs-behavioral rule in `datafusion-ffi` to set 
caller expectations and prevent `name()`-based dispatch hacks proliferating.
   
   ## Related
   
   - #18916 (`FFI_PhysicalExpr` introduction; the source of the foreign-wrapper 
behavior)
   - #18671 (the issue #18916 was solving; the codec / `TaskContext` 
circular-dep context)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to