neilconway commented on code in PR #21240:
URL: https://github.com/apache/datafusion/pull/21240#discussion_r3006629700
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -379,8 +382,98 @@ impl DefaultPhysicalPlanner {
Ok(())
}
- /// Create a physical plan from a logical plan
- async fn create_initial_plan(
+ /// Collect uncorrelated scalar subqueries. We don't descend into nested
+ /// subqueries here: each call to `create_initial_plan` handles subqueries
+ /// at its level and then recurses in order to handle nested subqueries.
+ fn collect_scalar_subqueries(plan: &LogicalPlan) -> Vec<Subquery> {
+ let mut subqueries = Vec::new();
+ let mut seen = HashSet::new();
+ plan.apply(|node| {
+ for expr in node.expressions() {
+ expr.apply(|e| {
+ if let Expr::ScalarSubquery(sq) = e
+ && sq.outer_ref_columns.is_empty()
+ && seen.insert(sq.clone())
+ {
+ subqueries.push(sq.clone());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .expect("infallible");
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })
+ .expect("infallible");
+ subqueries
+ }
+
+ /// Create a physical plan from a logical plan.
+ ///
+ /// Uncorrelated scalar subqueries in the plan's own expressions are
+ /// collected, planned as separate physical plans, and each assigned a
+ /// shared [`OnceLock`] slot that will hold its result at execution time.
+ /// These slots are registered in [`ExecutionProps`] so that
+ /// [`create_physical_expr`] can convert `Expr::ScalarSubquery` into
+ /// [`ScalarSubqueryExpr`] nodes that read from the slots.
+ ///
+ /// The resulting physical plan is wrapped in a [`ScalarSubqueryExec`]
+ /// that owns and executes those subquery plans before any data flows
+ /// through the main plan. If a subquery itself contains nested
+ /// uncorrelated subqueries, the recursive call produces its own
+ /// [`ScalarSubqueryExec`] inside the subquery plan — each level
+ /// manages only its own subqueries.
+ ///
+ /// Returns a [`BoxFuture`] rather than using `async fn` because of
+ /// this recursion.
+ ///
+ /// [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+ /// [`BoxFuture`]: futures::future::BoxFuture
+ fn create_initial_plan<'a>(
+ &'a self,
+ logical_plan: &'a LogicalPlan,
+ session_state: &'a SessionState,
+ ) -> futures::future::BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
+ Box::pin(async move {
+ let all_subqueries = Self::collect_scalar_subqueries(logical_plan);
+ let all_sq_refs: Vec<&_> = all_subqueries.iter().collect();
+ let (links, index_map) = self
+ .plan_scalar_subqueries(&all_sq_refs, session_state)
+ .await?;
+
+ // Create the shared results container and register it (along with
+ // the index map) in ExecutionProps so that `create_physical_expr`
+ // can resolve `Expr::ScalarSubquery` into `ScalarSubqueryExpr`
+ // nodes. We clone the SessionState so these are available
+ // throughout physical planning without mutating the caller's
state.
+ //
+ // Ideally, the subquery state would live in a dedicated planning
+ // context rather than on ExecutionProps (which is meant for
+ // session-level configuration). It's here because
+ // `create_physical_expr` only receives `&ExecutionProps`, and
+ // changing that signature would be a breaking public API change.
+ let results: Arc<Vec<OnceLock<ScalarValue>>> =
+ Arc::new((0..links.len()).map(|_| OnceLock::new()).collect());
+ let session_state = if links.is_empty() {
+ Cow::Borrowed(session_state)
+ } else {
+ let mut owned = session_state.clone();
+ owned.execution_props_mut().subquery_indexes = index_map;
+ owned.execution_props_mut().subquery_results =
Arc::clone(&results);
+ Cow::Owned(owned)
+ };
Review Comment:
This seemed a bit kludgy but I couldn't think of a better way to do it;
feedback/suggestions welcome.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]