timsaucer opened a new issue, #22367:
URL: https://github.com/apache/datafusion/issues/22367
## Summary
Since #18916 introduced `FFI_PhysicalExpr` to pass physical expressions
across the FFI boundary without going through the protobuf physical codec
(avoiding the codec / `TaskContext` circular dependency described in the PR),
physical expressions originating in one shared library and consumed in another
arrive as `ForeignPhysicalExpr` opaque wrappers. Any code in `datafusion` core
that relies on `Any::downcast_ref::<T>()` / `as_any().is::<T>()` to identify
concrete physical expression types silently mis-classifies these expressions,
because the wrapper's `TypeId` is `ForeignPhysicalExpr`, not the wrapped
concrete type.
The most visible symptom today: `simplify_const_expr_immediate` (in
`datafusion/physical-expr/src/simplifier/const_evaluator.rs`) checks
`expr.as_any().is::<Column>()` to short-circuit. For a foreign-wrapped
`Column`, that check is `false`. The function then walks the (empty) children
list, vacuously concludes "all children are literals," and attempts
`evaluate(dummy_batch)` on what is effectively a column reference. The result
is either a spurious literal substitution (replacing the column with a wrong /
null scalar) or a runtime error that is swallowed by the `Err(_) =>
Transformed::no` arm. Either way, predicate semantics are broken downstream of
the simplifier. This affects row-group pruning and `open()` paths in Parquet
scans for any `TableProvider` consumed via FFI (concrete repro:
`datafusion-python` with a third-party Rust `TableProvider` crate).
## Reproduction
Minimal repro available on request. Query `SELECT * FROM dummy_table WHERE a
< 5` against a parquet-backed `TableProvider` whose `scan()` builds its
predicate via `state.create_physical_expr(...)` where `state` is the
FFI-foreign `Session`. The predicate arrives at row-group filtering as
`ForeignPhysicalExpr(Column { name: "a", index: 0 })` and is silently rewritten
by the simplifier.
A user-side workaround: bypass `state.create_physical_expr` and build the
`PhysicalExpr` locally:
```rust
datafusion::physical_expr::create_physical_expr(&predicate, &df_schema,
&ExecutionProps::new())
```
That returns a local-typed `Column` / `Literal` / `BinaryExpr`, and the
simplifier downcasts succeed. This works for typical filter pushdown because
the relevant `ExecutionProps` fields (var providers, query start time, alias
generator) are rarely used in pushdown predicates, and `ScalarUDF` instances
ride inside the logical `Expr` rather than being looked up via a registry. It
is brittle — it ignores anything the foreign session would have provided via
`ExecutionProps` — and it does not solve the underlying class of bug.
## Root cause
The downcast pattern `as_any().is::<T>()` / `as_any().downcast_ref::<T>()`
assumes producer and consumer share a compilation unit. Across the
`datafusion-ffi` boundary they do not: each shared library has its own
monomorphization of every core type, so `TypeId::of::<Column>()` in the
consumer differs from `TypeId::of::<Column>()` in the producer.
`FFI_PhysicalExpr` (introduced by #18916) was designed to carry **behavior**
across the boundary via a vtable; **identity** (the `TypeId`) is intentionally
not carried. Core code that uses `TypeId` to dispatch — the simplifier,
pruning, parts of the optimizer, stats extraction — therefore mis-classifies
any FFI-wrapped expression as "not a `Column`," "not a `Literal`," etc., and
falls through to whatever branch handles "unknown node." In
`simplify_const_expr_immediate` that branch evaluates the expression against a
dummy batch, which is incorrect for a column reference and silently corrupts
the predicate.
## Alternatives considered
### Codec round-trip
Serialize the `PhysicalExpr` via `PhysicalExtensionCodec` on the producer
side and deserialize it on the consumer side. Rejected by #18916 because
`PhysicalExtensionCodec::try_decode` requires a `FunctionRegistry` /
`TaskContext`, which itself comes from the session being crossed — the
round-trip dependency the PR was created to break. Any new proposal needs to
avoid reintroducing this.
### `name()`-based dispatch
Some prior discussions around analogous downcast-by-type fragility at the
`ExecutionPlan` layer have proposed dispatching on `ExecutionPlan::name()` (the
display name string) instead of `Any::downcast_ref`. It works mechanically — a
foreign-wrapped `FilterExec` reports `"FilterExec"` through its vtable — but it
is brittle:
- `name()` is user-settable. Any third-party `ExecutionPlan` can return
`"FilterExec"`, and dispatch will treat it as the core `FilterExec`.
- It conflates display with identity. `name()` is intended for `EXPLAIN`
output, not type discrimination. Coupling them locks in display strings as de
facto type tags and discourages renaming for readability.
- It does not generalize to `PhysicalExpr`, which has no equivalent stable
display contract for every node.
### Adding identity methods to core traits (`is_column()`, `node_kind()`,
accessors)
Considered and rejected. The downcast pattern is widespread enough across
the simplifier, pruning, and optimizer that providing a parallel method-based
API in `PhysicalExpr` (and a corresponding one in `ExecutionPlan`) would mean a
sustained boilerplate tax on every implementor — in core and downstream — for
the benefit of an FFI code path that is not the common case. The cost falls on
the wrong population.
## Proposed resolution: tiered reconstruction inside `datafusion-ffi`
Two-tier model at the FFI boundary, no changes required to the
`PhysicalExpr` or `ExecutionPlan` traits in core:
- **Tier 1 — Known structural built-ins are reconstructed as local-typed
instances on the consumer side.** At wrap time on the producer side,
`datafusion-ffi` attempts `Any::downcast_ref` against a closed set of
well-known core types and, on match, transports the minimal field data needed
to rebuild:
- `Column` → `(name, index)`
- `Literal` → `ScalarValue` (+ optional `FieldMetadata`)
- `BinaryExpr` → `(op, left, right)` with recursive reconstruction of
children
- (extend as concrete downcast call sites in core demand: `IsNullExpr`,
`NotExpr`, `CastExpr`, `InListExpr`, `LikeExpr`, etc.)
On the consumer side, FFI rebuilds these as the consumer's local `Column`
/ `Literal` / ... so `as_any().is::<Column>()` and similar checks throughout
core continue to work.
- **Tier 2 — Unknown or third-party types stay opaque.** Wrapped via the
existing `ForeignPhysicalExpr` vtable. Callers must not rely on `TypeId` for
these.
### Rationale
- **No core trait pollution.** Adding `is_column()` / `node_kind()` /
accessor methods to `PhysicalExpr` (or `ExecutionPlan`) for FFI's sake would
impose ongoing boilerplate on every implementor for the benefit of an exotic
code path. FFI is not the common case in DataFusion; the boundary should bear
its own cost.
- **No codec / registry / `TaskContext` dependency.** Field-level copying of
well-known shapes is independent of `PhysicalExtensionCodec` and the
`FunctionRegistry`, avoiding the circular dependency #18916 was designed to
break.
- **Fixes the bug class, not one site.** Every `as_any().is::<Column>()` /
`downcast_ref::<Literal>()` in core (simplifier, pruning, optimizer rules,
stats extraction, partition pruning) starts working again for FFI-wrapped
expressions, with no per-call-site migration.
- **Not name-based.** Identity is established by `TypeId` on the producer
side at wrap time, where producer and core *are* in the same compilation unit.
Consumers receive concrete local types they own.
- **Stable surface.** The shapes of `Column { name, index }` and `Literal {
value, metadata }` are de facto stable public API. Future shape changes are
explicit FFI-version events, analogous to Arrow C Data Interface evolution.
### Extending to `ExecutionPlan`
The same pattern applies and addresses related downcast-by-type fragility
observed in plan-level code. Initial Tier 1 candidates are "structural" plans:
`FilterExec`, `ProjectionExec`, `CoalesceBatchesExec`, `RepartitionExec`,
`SortExec`, `LimitExec`, `UnionExec`, `EmptyExec`, `PlaceholderRowExec`. Source
plans (`DataSourceExec`, `ParquetExec`, custom sources) remain Tier 2 — they
are rarely downcast by optimizer rules in a way that requires concrete identity.
`PlanProperties` (schema, partitioning, equivalence) should be recomputed on
the consumer side after reconstruction rather than transported, since
equivalence groups are themselves physical expressions.
### A documentable rule
If this pattern is adopted, it is worth codifying in the `datafusion-ffi`
README:
> Across the FFI boundary, **structural** types (data carriers, no `dyn
Trait` extensibility) round-trip by value and survive `TypeId` checks.
**Behavioral** types (extensible via `dyn Trait`) round-trip by vtable and do
not survive `TypeId` checks. Code in core that downcasts a structural type is
fine; code that downcasts a behavioral type is a latent FFI bug.
This rule also makes core auditable: a `TypeId` check against a structural
type is safe; against a behavioral or extension type, it is a bug.
## Trade-offs
Cons of the proposed approach:
- Boilerplate moves to `datafusion-ffi`. Adding a new built-in physical
expression or execution plan requires updating the FFI encode / decode tables.
- `datafusion-ffi` becomes coupled to the internal field shape of the
well-known core types. Acceptable, but a real coupling.
- `ScalarFunction` cannot be fully Tier 1 without addressing UDF identity
across the boundary (lookup is by name and needs a registry on the consumer).
Recommend leaving `ScalarFunction` in Tier 2 for now.
- Recursive reconstruction of composite expressions costs O(n) FFI calls per
filter at scan-plan time. Amortizes well; pushdown happens once per query plan.
## Recommended path
1. Land a narrow defensive fix in `simplify_const_expr_immediate` so that an
expression with zero children is never treated as "all children literal" and
never falls through to `evaluate(dummy_batch)`. This is a small, isolated patch
that prevents incorrect predicate rewriting today regardless of FFI. Suggested
diff:
```rust
let children = expr.children();
if children.is_empty()
|| !children.iter().all(|c| c.as_any().is::<Literal>())
{
return Ok(Transformed::no(expr));
}
```
2. Implement Tier 1 reconstruction in `datafusion-ffi` starting with
`Column` and `Literal`, then `BinaryExpr` (recursive). These three cover the
great majority of pushdown-filter shapes and the great majority of `TypeId`
call sites in the simplifier / pruning code paths.
3. Extend Tier 1 to additional physical expressions as concrete downcast
bugs surface (`CastExpr`, `IsNullExpr`, `NotExpr`, `InListExpr`, etc.).
4. Apply the same pattern to a small `ExecutionPlan` Tier 1 (`FilterExec`,
`ProjectionExec`) once expression-level reconstruction is stable. Drive that
work from concrete optimizer / simplifier bugs rather than speculatively
covering every plan.
5. Document the structural-vs-behavioral rule in `datafusion-ffi` to set
caller expectations and prevent `name()`-based dispatch hacks proliferating.
## Related
- #18916 (`FFI_PhysicalExpr` introduction; the source of the foreign-wrapper
behavior)
- #18671 (the issue #18916 was solving; the codec / `TaskContext`
circular-dep context)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]