neilconway commented on code in PR #21240: URL: https://github.com/apache/datafusion/pull/21240#discussion_r3011533897
########## datafusion/physical-plan/src/scalar_subquery.rs: ########## @@ -0,0 +1,511 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for uncorrelated scalar subqueries. +//! +//! [`ScalarSubqueryExec`] wraps a main input plan and a set of subquery plans. +//! At execution time, it runs each subquery exactly once, extracts the scalar +//! result, and populates the shared results container that +//! [`ScalarSubqueryExpr`] instances read from by index. +//! +//! [`ScalarSubqueryExpr`]: datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err}; +use datafusion_execution::TaskContext; +use datafusion_expr::execution_props::ScalarSubqueryResults; +use datafusion_physical_expr::PhysicalExpr; + +use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties}; +use crate::joins::utils::{OnceAsync, OnceFut}; +use crate::stream::RecordBatchStreamAdapter; +use crate::{DisplayAs, DisplayFormatType, SendableRecordBatchStream}; + +use futures::StreamExt; +use futures::TryStreamExt; + +/// Links a scalar subquery's execution plan to its index in the shared results +/// container. The [`ScalarSubqueryExec`] that owns these links populates +/// `results[index]` at execution time, and [`ScalarSubqueryExpr`] instances +/// with the same index read from it. +/// +/// [`ScalarSubqueryExpr`]: datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr +#[derive(Debug, Clone)] +pub struct ScalarSubqueryLink { + /// The physical plan for the subquery. + pub plan: Arc<dyn ExecutionPlan>, + /// Index into the shared results container. + pub index: usize, +} + +/// Manages execution of uncorrelated scalar subqueries for a single plan +/// level. +/// +/// This node has an asymmetric set of children: the first child is the +/// **main input plan**, whose batches are passed through unchanged. The +/// remaining children are **subquery plans**, each of which must produce +/// exactly zero or one row. Before any batches from the main input are +/// yielded, all subquery plans are executed and their scalar results are +/// stored in a shared results container ([`ScalarSubqueryResults`]). +/// [`ScalarSubqueryExpr`] nodes embedded in the main input's expressions +/// read from this container by index. +/// +/// All subqueries are evaluated eagerly when the first output partition is +/// requested, before any rows from the main input are produced. +/// +/// TODO: Consider overlapping computation of the subqueries with evaluating the +/// main query. +/// +/// TODO: Subqueries are evaluated sequentially. Consider parallel evaluation in +/// the future. +/// +/// [`ScalarSubqueryExpr`]: datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr +#[derive(Debug)] +pub struct ScalarSubqueryExec { + /// The main input plan whose output is passed through. + input: Arc<dyn ExecutionPlan>, + /// Subquery plans and their result indexes. + subqueries: Vec<ScalarSubqueryLink>, + /// Shared one-time async computation of subquery results. + subquery_future: Arc<OnceAsync<()>>, + /// Shared results container; same instance held by ScalarSubqueryExpr nodes. + results: ScalarSubqueryResults, + /// Cached plan properties (copied from input). + cache: Arc<PlanProperties>, +} + +impl ScalarSubqueryExec { + pub fn new( + input: Arc<dyn ExecutionPlan>, + subqueries: Vec<ScalarSubqueryLink>, + results: ScalarSubqueryResults, + ) -> Self { + let cache = Arc::clone(input.properties()); + Self { + input, + subqueries, + subquery_future: Arc::default(), + results, + cache, + } + } + + pub fn input(&self) -> &Arc<dyn ExecutionPlan> { + &self.input + } + + pub fn subqueries(&self) -> &[ScalarSubqueryLink] { + &self.subqueries + } + + pub fn results(&self) -> &ScalarSubqueryResults { + &self.results + } + + /// Returns a per-child bool vec that is `true` for the main input + /// (child 0) and `false` for every subquery child. + fn true_for_input_only(&self) -> Vec<bool> { + std::iter::once(true) Review Comment: Sorry, can you elaborate on what you're suggesting? We want `[true, false, false, ...]`; we could use ```rust let mut v = vec![false; 1 + self.subqueries.len()]; v[0] = true v ``` Although it's debatable whether that's cleaner than the current code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
