andygrove opened a new issue, #1909:
URL: https://github.com/apache/datafusion-ballista/issues/1909

   ## Describe the bug
   
   On DataFusion 54, TPC-H queries with an **uncorrelated scalar subquery** 
fail in distributed execution: **q11, q15, q22**. The executor cannot 
deserialize the per-stage physical plan it receives:
   
   ```
   DataFusion error: Internal error: ScalarSubqueryExpr can only be 
deserialized as part
   of a surrounding ScalarSubqueryExec.
   ```
   
   (Correlated / `IN` subqueries that decorrelate to joins — e.g. q2, q17, q20 
— are unaffected.) These queries worked on DataFusion 53, so this is a DF54 
regression for Ballista.
   
   ## To Reproduce
   
   SF10 parquet, 1 scheduler + 1 executor (`-c 8`), release build:
   
   ```
   tpch benchmark ballista --query 15 --partitions 16 -c 
datafusion.optimizer.prefer_hash_join=false
   ```
   
   The query never completes (see #1908 — the scheduler marks the executor dead 
on the launch failure).
   
   ## Root cause
   
   DataFusion 54 plans an uncorrelated scalar subquery as a physical 
`ScalarSubqueryExec` that wraps a `ScalarSubqueryExpr`. In `datafusion-proto`, 
a `ScalarSubqueryExpr` can only be **deserialized inside its surrounding 
`ScalarSubqueryExec`**, which seeds the per-scope results handle in the decode 
context:
   
   - `datafusion-proto` `physical_plan/from_proto.rs` 
(`ExprType::ScalarSubquery`) requires `ctx.scalar_subquery_results()` to be 
present, else it returns the error above.
   - That handle is only set while decoding the `ScalarSubqueryExec` node 
(`PhysicalPlanDecodeContext::with_scalar_subquery_results`).
   
   A **whole** plan round-trips fine. But Ballista splits the physical plan 
into stages at shuffle boundaries, and the scalar subquery's input contains a 
shuffle (e.g. an aggregate), so the `ScalarSubqueryExec` and the operator that 
consumes its value land in **different stages**. The consuming stage is then 
serialized with a *bare* `ScalarSubqueryExpr`, and the executor decodes it 
without the surrounding `ScalarSubqueryExec` → failure.
   
   ## Possible directions
   
   - Evaluate the uncorrelated scalar subquery as its own job/stage and inline 
its single value as a literal into the main plan before stage splitting 
(closest to how a distributed engine usually handles a broadcast scalar).
   - Or keep the `ScalarSubqueryExec` and all of its consumers within a single 
stage so the proto round-trip stays whole.
   - Or thread the scalar-subquery results handle through Ballista's per-stage 
decode context so a split stage can reconstruct the expr.
   
   Input from maintainers on the preferred approach would be welcome; this 
likely needs design discussion (and possibly an upstream `datafusion-proto` 
change).
   
   ## Impact
   
   q11, q15, q22 cannot run in distributed mode on DataFusion 54. Blocks #1906.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to