alamb commented on code in PR #21240:
URL: https://github.com/apache/datafusion/pull/21240#discussion_r3065415623
##########
datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part:
##########
@@ -49,61 +49,62 @@ limit 10;
logical_plan
01)Sort: value DESC NULLS FIRST, fetch=10
02)--Projection: partsupp.ps_partkey, sum(partsupp.ps_supplycost *
partsupp.ps_availqty) AS value
-03)----Inner Join: Filter: CAST(sum(partsupp.ps_supplycost *
partsupp.ps_availqty) AS Decimal128(38, 15)) >
__scalar_sq_1.sum(partsupp.ps_supplycost * partsupp.ps_availqty) *
Float64(0.0001)
-04)------Aggregate: groupBy=[[partsupp.ps_partkey]],
aggr=[[sum(partsupp.ps_supplycost * CAST(partsupp.ps_availqty AS Decimal128(10,
0)))]]
-05)--------Projection: partsupp.ps_partkey, partsupp.ps_availqty,
partsupp.ps_supplycost
-06)----------Inner Join: supplier.s_nationkey = nation.n_nationkey
-07)------------Projection: partsupp.ps_partkey, partsupp.ps_availqty,
partsupp.ps_supplycost, supplier.s_nationkey
-08)--------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-09)----------------TableScan: partsupp projection=[ps_partkey, ps_suppkey,
ps_availqty, ps_supplycost], partial_filters=[Boolean(true)]
-10)----------------TableScan: supplier projection=[s_suppkey, s_nationkey]
-11)------------Projection: nation.n_nationkey
-12)--------------Filter: nation.n_name = Utf8View("GERMANY")
-13)----------------TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("GERMANY")]
-14)------SubqueryAlias: __scalar_sq_1
-15)--------Projection: CAST(CAST(sum(partsupp.ps_supplycost *
partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS Decimal128(38, 15))
-16)----------Aggregate: groupBy=[[]], aggr=[[sum(partsupp.ps_supplycost *
CAST(partsupp.ps_availqty AS Decimal128(10, 0)))]]
-17)------------Projection: partsupp.ps_availqty, partsupp.ps_supplycost
-18)--------------Inner Join: supplier.s_nationkey = nation.n_nationkey
-19)----------------Projection: partsupp.ps_availqty, partsupp.ps_supplycost,
supplier.s_nationkey
-20)------------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-21)--------------------TableScan: partsupp projection=[ps_suppkey,
ps_availqty, ps_supplycost]
-22)--------------------TableScan: supplier projection=[s_suppkey, s_nationkey]
-23)----------------Projection: nation.n_nationkey
-24)------------------Filter: nation.n_name = Utf8View("GERMANY")
-25)--------------------TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8View("GERMANY")]
+03)----Filter: CAST(sum(partsupp.ps_supplycost * partsupp.ps_availqty) AS
Decimal128(38, 15)) > (<subquery>)
+04)------Subquery:
Review Comment:
the major difference here is that the join is replaced by a subquery, right?
##########
datafusion/physical-expr/src/scalar_subquery.rs:
##########
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExpr`] reads a cached [`ScalarValue`] that is populated
+//! at execution time by `ScalarSubqueryExec`.
+
+use std::any::Any;
+use std::fmt;
+use std::hash::Hash;
+use std::sync::{Arc, OnceLock};
+
+use arrow::datatypes::{DataType, Field, FieldRef, Schema};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, ScalarValue, internal_datafusion_err};
+use datafusion_expr_common::columnar_value::ColumnarValue;
+use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+/// A physical expression whose value is provided by a scalar subquery.
+///
+/// Subquery execution is handled by `ScalarSubqueryExec`, which stores the
+/// result in a shared results container. This expression simply reads from the
+/// shared results container at the appropriate index.
+///
+/// If the same subquery appears multiple times in a query, there will be
+/// multiple `ScalarSubqueryExpr` with the same result index.
+#[derive(Debug)]
+pub struct ScalarSubqueryExpr {
+ data_type: DataType,
+ nullable: bool,
+ /// Index of this subquery in the shared results container.
+ index: usize,
+ /// Shared results container populated by `ScalarSubqueryExec`.
+ results: Arc<Vec<OnceLock<ScalarValue>>>,
Review Comment:
If an execution plan is executed more than once (e.g.
`ExecutionPlan::reset_state` is called) I think the subquery results also need
to be cleared (so the subquery is reset as well)
I think implementing `reset_state` that clears results is probably enough
Maybe should also do the same thing in `new_with_children`
##########
datafusion/physical-plan/src/scalar_subquery.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExec`] wraps a main input plan and a set of subquery plans.
+//! At execution time, it runs each subquery exactly once, extracts the scalar
+//! result, and populates the shared results container that
+//! [`ScalarSubqueryExpr`] instances read from by index.
+//!
+//! [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+
+use std::fmt;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{Result, ScalarValue, Statistics, exec_err,
internal_err};
+use datafusion_execution::TaskContext;
+use datafusion_expr::execution_props::ScalarSubqueryResults;
+use datafusion_physical_expr::PhysicalExpr;
+
+use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties};
+use crate::joins::utils::{OnceAsync, OnceFut};
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
+
+use futures::StreamExt;
+use futures::TryStreamExt;
+
+/// Links a scalar subquery's execution plan to its index in the shared results
+/// container. The [`ScalarSubqueryExec`] that owns these links populates
+/// `results[index]` at execution time, and [`ScalarSubqueryExpr`] instances
+/// with the same index read from it.
+///
+/// [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+#[derive(Debug, Clone)]
+pub struct ScalarSubqueryLink {
+ /// The physical plan for the subquery.
+ pub plan: Arc<dyn ExecutionPlan>,
+ /// Index into the shared results container.
+ pub index: usize,
Review Comment:
We could potentially tke the rust approach here and use a type wrapper like
```rust
pub struct SubqueryId(usize)
```
Mostly so any code that computes them needs to use that type explicitly
##########
datafusion/physical-expr/src/scalar_subquery.rs:
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExpr`] reads a cached [`ScalarValue`] that is populated
+//! at execution time by `ScalarSubqueryExec`.
+
+use std::any::Any;
+use std::fmt;
+use std::hash::Hash;
+use std::sync::{Arc, OnceLock};
+
+use arrow::datatypes::{DataType, Field, FieldRef, Schema};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, ScalarValue, internal_datafusion_err};
+use datafusion_expr_common::columnar_value::ColumnarValue;
+use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+/// A physical expression whose value is provided by a scalar subquery.
+///
+/// Subquery execution is handled by `ScalarSubqueryExec`, which stores the
+/// result in a shared results container. This expression simply reads from the
+/// shared results container at the appropriate index.
+///
+/// If the same subquery appears multiple times in a query, there will be
+/// multiple `ScalarSubqueryExpr` with the same result index.
+#[derive(Debug)]
+pub struct ScalarSubqueryExpr {
+ data_type: DataType,
+ nullable: bool,
+ /// Index of this subquery in the shared results container.
+ index: usize,
+ /// Shared results container populated by `ScalarSubqueryExec`.
+ results: Arc<Vec<OnceLock<ScalarValue>>>,
+}
+
+impl ScalarSubqueryExpr {
+ pub fn new(
+ data_type: DataType,
+ nullable: bool,
+ index: usize,
+ results: Arc<Vec<OnceLock<ScalarValue>>>,
+ ) -> Self {
+ Self {
+ data_type,
+ nullable,
+ index,
+ results,
+ }
+ }
+
+ /// Returns the index of this subquery in the shared results container.
+ pub fn index(&self) -> usize {
+ self.index
+ }
+
+ pub fn results(&self) -> &Arc<Vec<OnceLock<ScalarValue>>> {
+ &self.results
+ }
+}
+
+impl fmt::Display for ScalarSubqueryExpr {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.results.get(self.index).and_then(|slot| slot.get()) {
+ Some(v) => write!(f, "scalar_subquery({v})"),
+ None => write!(f, "scalar_subquery(<pending>)"),
+ }
+ }
+}
+
+// Two ScalarSubqueryExprs are the "same" if they share the same results
+// container and have the same index.
+impl Hash for ScalarSubqueryExpr {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ Arc::as_ptr(&self.results).hash(state);
+ self.index.hash(state);
+ }
+}
+
+impl PartialEq for ScalarSubqueryExpr {
+ fn eq(&self, other: &Self) -> bool {
+ Arc::ptr_eq(&self.results, &other.results) && self.index == other.index
+ }
+}
+
+impl Eq for ScalarSubqueryExpr {}
+
+impl PhysicalExpr for ScalarSubqueryExpr {
Review Comment:
I agree we should not mix planning / execution (at least at this stage)
Eagerly execution subqueries could be problemantic in its own right as some
of them might not ever run of the query has a limt, for example, so eager
execution could waste work 🤔
It may make sense to run some subset of the optimizers (simplifer for
example) after execution subqueries
##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -136,9 +136,11 @@ fn optimize_projections(
// their parents' required indices.
match plan {
LogicalPlan::Projection(proj) => {
- return merge_consecutive_projections(proj)?.transform_data(|proj| {
- rewrite_projection_given_requirements(proj, config, &indices)
- });
+ return merge_consecutive_projections(proj)?
+ .transform_data(|proj| {
+ rewrite_projection_given_requirements(proj, config,
&indices)
+ })?
+ .transform_data(|plan| optimize_subqueries(plan, config));
Review Comment:
this is unfortuante as it seems like we'll have to add this extra traversal
for all passes that want to recurse into subqueries.
Hwever, I think that is not introduced by this PR so we can perhaps deal
with it in a follow on
##########
datafusion/physical-expr/src/scalar_subquery.rs:
##########
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExpr`] reads a cached [`ScalarValue`] that is populated
+//! at execution time by `ScalarSubqueryExec`.
+
+use std::any::Any;
+use std::fmt;
+use std::hash::Hash;
+use std::sync::{Arc, OnceLock};
+
+use arrow::datatypes::{DataType, Field, FieldRef, Schema};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, ScalarValue, internal_datafusion_err};
+use datafusion_expr_common::columnar_value::ColumnarValue;
+use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+/// A physical expression whose value is provided by a scalar subquery.
+///
+/// Subquery execution is handled by `ScalarSubqueryExec`, which stores the
+/// result in a shared results container. This expression simply reads from the
+/// shared results container at the appropriate index.
+///
+/// If the same subquery appears multiple times in a query, there will be
+/// multiple `ScalarSubqueryExpr` with the same result index.
+#[derive(Debug)]
+pub struct ScalarSubqueryExpr {
+ data_type: DataType,
+ nullable: bool,
+ /// Index of this subquery in the shared results container.
+ index: usize,
+ /// Shared results container populated by `ScalarSubqueryExec`.
+ results: Arc<Vec<OnceLock<ScalarValue>>>,
Review Comment:
It seems like we clearly don't have good tests for
ExecutionPlan::reset_state. Maybe we should update `sqllogictest` so it runs
each plan twice and ensures the same results come out 🤔
##########
datafusion/physical-plan/src/scalar_subquery.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExec`] wraps a main input plan and a set of subquery plans.
+//! At execution time, it runs each subquery exactly once, extracts the scalar
+//! result, and populates the shared results container that
+//! [`ScalarSubqueryExpr`] instances read from by index.
+//!
+//! [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+
+use std::fmt;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{Result, ScalarValue, Statistics, exec_err,
internal_err};
+use datafusion_execution::TaskContext;
+use datafusion_expr::execution_props::ScalarSubqueryResults;
+use datafusion_physical_expr::PhysicalExpr;
+
+use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties};
+use crate::joins::utils::{OnceAsync, OnceFut};
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
+
+use futures::StreamExt;
+use futures::TryStreamExt;
+
+/// Links a scalar subquery's execution plan to its index in the shared results
+/// container. The [`ScalarSubqueryExec`] that owns these links populates
+/// `results[index]` at execution time, and [`ScalarSubqueryExpr`] instances
+/// with the same index read from it.
+///
+/// [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+#[derive(Debug, Clone)]
+pub struct ScalarSubqueryLink {
+ /// The physical plan for the subquery.
+ pub plan: Arc<dyn ExecutionPlan>,
+ /// Index into the shared results container.
+ pub index: usize,
+}
+
+/// Manages execution of uncorrelated scalar subqueries for a single plan
+/// level.
+///
+/// The first child node is the **main input plan**, whose batches are passed
+/// through unchanged. The remaining children are **subquery plans**, each of
+/// which must produce exactly zero or one row. Before any batches from the
main
+/// input are yielded, all subquery plans are executed and their scalar results
+/// are stored in a shared results container ([`ScalarSubqueryResults`]).
+/// [`ScalarSubqueryExpr`] nodes embedded in the main input's expressions read
+/// from this container by index.
+///
+/// All subqueries are evaluated eagerly when the first output partition is
+/// requested, before any rows from the main input are produced.
+///
+/// TODO: Consider overlapping computation of the subqueries with evaluating
the
Review Comment:
yeah -- a good follow on perhaps
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -65,26 +57,21 @@ pub trait Serializeable: Sized {
/// Convert `self` to an opaque byte stream
fn to_bytes(&self) -> Result<Bytes>;
- /// Convert `bytes` (the output of [`to_bytes`]) back into an
- /// object. This will error if the serialized bytes contain any
- /// user defined functions, in which case use
- /// [`from_bytes_with_registry`]
+ /// Convert `bytes` (the output of [`to_bytes`]) back into an object. This
+ /// will error if the serialized bytes contain any user defined functions,
+ /// in which case use [`from_bytes_with_ctx`]
///
/// [`to_bytes`]: Self::to_bytes
- /// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
+ /// [`from_bytes_with_ctx`]: Self::from_bytes_with_ctx
fn from_bytes(bytes: &[u8]) -> Result<Self> {
- Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
+ Self::from_bytes_with_ctx(bytes, &TaskContext::default())
}
- /// Convert `bytes` (the output of [`to_bytes`]) back into an
- /// object resolving user defined functions with the specified
- /// `registry`
+ /// Convert `bytes` (the output of [`to_bytes`]) back into an object
+ /// resolving user defined functions with the specified `ctx`
///
/// [`to_bytes`]: Self::to_bytes
- fn from_bytes_with_registry(
- bytes: &[u8],
- registry: &dyn FunctionRegistry,
- ) -> Result<Self>;
+ fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;
Review Comment:
I think this is technically a breaking API change -- maybe we can leave the
old method in there and mark it deprecated? Otherwise we should add a note to
the upgrade guide
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -326,7 +235,7 @@ pub fn physical_plan_from_json(
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
- let proto_converter = DefaultPhysicalProtoConverter {};
+ let proto_converter = DefaultPhysicalProtoConverter::default();
Review Comment:
this is also I think a technically a breaking change (not a big deal, but
one that we should probably also call out in the upgrade guide)
##########
datafusion/physical-plan/src/scalar_subquery.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExec`] wraps a main input plan and a set of subquery plans.
+//! At execution time, it runs each subquery exactly once, extracts the scalar
+//! result, and populates the shared results container that
+//! [`ScalarSubqueryExpr`] instances read from by index.
+//!
+//! [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+
+use std::fmt;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{Result, ScalarValue, Statistics, exec_err,
internal_err};
+use datafusion_execution::TaskContext;
+use datafusion_expr::execution_props::ScalarSubqueryResults;
+use datafusion_physical_expr::PhysicalExpr;
+
+use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties};
+use crate::joins::utils::{OnceAsync, OnceFut};
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
+
+use futures::StreamExt;
+use futures::TryStreamExt;
+
+/// Links a scalar subquery's execution plan to its index in the shared results
+/// container. The [`ScalarSubqueryExec`] that owns these links populates
Review Comment:
maybe we could link explicitly here to what the "shared results container"
is (the `TaskContext`)?
##########
datafusion/physical-plan/src/scalar_subquery.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExec`] wraps a main input plan and a set of subquery plans.
+//! At execution time, it runs each subquery exactly once, extracts the scalar
+//! result, and populates the shared results container that
+//! [`ScalarSubqueryExpr`] instances read from by index.
+//!
+//! [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+
+use std::fmt;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{Result, ScalarValue, Statistics, exec_err,
internal_err};
+use datafusion_execution::TaskContext;
+use datafusion_expr::execution_props::ScalarSubqueryResults;
+use datafusion_physical_expr::PhysicalExpr;
+
+use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties};
+use crate::joins::utils::{OnceAsync, OnceFut};
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
+
+use futures::StreamExt;
+use futures::TryStreamExt;
+
+/// Links a scalar subquery's execution plan to its index in the shared results
+/// container. The [`ScalarSubqueryExec`] that owns these links populates
+/// `results[index]` at execution time, and [`ScalarSubqueryExpr`] instances
+/// with the same index read from it.
+///
+/// [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+#[derive(Debug, Clone)]
+pub struct ScalarSubqueryLink {
+ /// The physical plan for the subquery.
+ pub plan: Arc<dyn ExecutionPlan>,
+ /// Index into the shared results container.
+ pub index: usize,
+}
+
+/// Manages execution of uncorrelated scalar subqueries for a single plan
+/// level.
+///
+/// The first child node is the **main input plan**, whose batches are passed
+/// through unchanged. The remaining children are **subquery plans**, each of
+/// which must produce exactly zero or one row. Before any batches from the
main
+/// input are yielded, all subquery plans are executed and their scalar results
+/// are stored in a shared results container ([`ScalarSubqueryResults`]).
+/// [`ScalarSubqueryExpr`] nodes embedded in the main input's expressions read
+/// from this container by index.
+///
+/// All subqueries are evaluated eagerly when the first output partition is
+/// requested, before any rows from the main input are produced.
+///
+/// TODO: Consider overlapping computation of the subqueries with evaluating
the
+/// main query.
+///
+/// [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+#[derive(Debug)]
+pub struct ScalarSubqueryExec {
+ /// The main input plan whose output is passed through.
+ input: Arc<dyn ExecutionPlan>,
+ /// Subquery plans and their result indexes.
+ subqueries: Vec<ScalarSubqueryLink>,
+ /// Shared one-time async computation of subquery results.
+ subquery_future: Arc<OnceAsync<()>>,
+ /// Shared results container; same instance held by ScalarSubqueryExpr
nodes.
+ results: ScalarSubqueryResults,
+ /// Cached plan properties (copied from input).
+ cache: Arc<PlanProperties>,
+}
+
+impl ScalarSubqueryExec {
+ pub fn new(
+ input: Arc<dyn ExecutionPlan>,
+ subqueries: Vec<ScalarSubqueryLink>,
+ results: ScalarSubqueryResults,
+ ) -> Self {
+ let cache = Arc::clone(input.properties());
+ Self {
+ input,
+ subqueries,
+ subquery_future: Arc::default(),
+ results,
+ cache,
+ }
+ }
+
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ pub fn subqueries(&self) -> &[ScalarSubqueryLink] {
+ &self.subqueries
+ }
+
+ pub fn results(&self) -> &ScalarSubqueryResults {
+ &self.results
+ }
+
+ /// Returns a per-child bool vec that is `true` for the main input
+ /// (child 0) and `false` for every subquery child.
+ fn true_for_input_only(&self) -> Vec<bool> {
+ std::iter::once(true)
+ .chain(std::iter::repeat_n(false, self.subqueries.len()))
+ .collect()
+ }
+}
+
+impl DisplayAs for ScalarSubqueryExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "ScalarSubqueryExec: subqueries={}",
+ self.subqueries.len()
+ )
+ }
+ DisplayFormatType::TreeRender => {
+ write!(f, "")
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for ScalarSubqueryExec {
+ fn name(&self) -> &'static str {
+ "ScalarSubqueryExec"
+ }
+
+ fn properties(&self) -> &Arc<PlanProperties> {
+ &self.cache
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ let mut children = vec![&self.input];
+ for sq in &self.subqueries {
+ children.push(&sq.plan);
+ }
+ children
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // First child is the main input, the rest are subquery plans.
+ let input = children.remove(0);
+ let subqueries = self
+ .subqueries
+ .iter()
+ .zip(children)
+ .map(|(sq, new_plan)| ScalarSubqueryLink {
+ plan: new_plan,
+ index: sq.index,
+ })
+ .collect();
+ Ok(Arc::new(ScalarSubqueryExec::new(
+ input,
+ subqueries,
+ Arc::clone(&self.results),
+ )))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let subqueries = self.subqueries.clone();
+ let results = Arc::clone(&self.results);
+ let subquery_ctx = Arc::clone(&context);
+ let mut subquery_future = self.subquery_future.try_once(move || {
+ Ok(async move { execute_subqueries(subqueries, results,
subquery_ctx).await })
+ })?;
+ let input = Arc::clone(&self.input);
+ let schema = self.schema();
+
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ schema,
+ futures::stream::once(async move {
+ // Execute all subqueries exactly once, even when multiple
+ // partitions call execute() concurrently.
+ wait_for_subqueries(&mut subquery_future).await?;
+
+ // Now that the subqueries have finished execution, we can
+ // safely execute the main input
+ input.execute(partition, context)
+ })
+ .try_flatten(),
+ )))
+ }
+
+ fn apply_expressions(
+ &self,
+ _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
+ ) -> Result<TreeNodeRecursion> {
+ Ok(TreeNodeRecursion::Continue)
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ // Only the main input (first child); subquery children don't
+ // contribute to ordering.
+ self.true_for_input_only()
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ // Only the main input; subquery children produce at most one
+ // row, so repartitioning them adds overhead with no benefit.
+ self.true_for_input_only()
+ }
+
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Arc<Statistics>> {
+ self.input.partition_statistics(partition)
+ }
+
+ fn cardinality_effect(&self) -> CardinalityEffect {
+ CardinalityEffect::Equal
+ }
+}
+
+/// Execute all subquery plans, extract their scalar results, and populate
+/// the shared results container.
+async fn wait_for_subqueries(fut: &mut OnceFut<()>) -> Result<()> {
+ std::future::poll_fn(|cx| fut.get_shared(cx)).await?;
+ Ok(())
+}
+
+async fn execute_subqueries(
+ subqueries: Vec<ScalarSubqueryLink>,
+ results: ScalarSubqueryResults,
+ context: Arc<TaskContext>,
+) -> Result<()> {
+ // Evaluate subqueries in parallel; wait for them all to finish evaluation
+ // before returning.
+ let futures = subqueries.iter().map(|sq| {
+ let plan = Arc::clone(&sq.plan);
+ let ctx = Arc::clone(&context);
+ let results = Arc::clone(&results);
+ let index = sq.index;
+ async move {
+ let value = execute_scalar_subquery(plan, ctx).await?;
+ if results[index].set(value).is_err() {
+ return internal_err!(
+ "ScalarSubqueryExec: result for index {index} was already
populated"
+ );
+ }
+ Ok(()) as Result<()>
+ }
+ });
+ futures::future::try_join_all(futures).await?;
+ Ok(())
+}
+
+/// Execute a single subquery plan and extract the scalar value.
+/// Returns NULL for 0 rows, the scalar value for exactly 1 row,
+/// or an error for >1 rows.
+async fn execute_scalar_subquery(
+ plan: Arc<dyn ExecutionPlan>,
+ context: Arc<TaskContext>,
+) -> Result<ScalarValue> {
+ let schema = plan.schema();
+ if schema.fields().len() != 1 {
+ // Should be enforced by the physical planner.
+ return internal_err!(
+ "Scalar subquery must return exactly one column, got {}",
+ schema.fields().len()
+ );
+ }
+
+ let mut stream = crate::execute_stream(plan, context)?;
+ let mut result: Option<ScalarValue> = None;
+
+ while let Some(batch) = stream.next().await.transpose()? {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+ if result.is_some() || batch.num_rows() > 1 {
+ return exec_err!("Scalar subquery returned more than one row");
+ }
+ result = Some(ScalarValue::try_from_array(batch.column(0), 0)?);
+ }
+
+ // 0 rows → typed NULL per SQL semantics
+ match result {
+ Some(v) => Ok(v),
+ None => ScalarValue::try_from(schema.field(0).data_type()),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test::{self, TestMemoryExec};
+
+ use std::sync::OnceLock;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ use crate::test::exec::ErrorExec;
+ use arrow::array::{Int32Array, Int64Array};
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
+
+ #[derive(Debug)]
+ struct CountingExec {
+ inner: Arc<dyn ExecutionPlan>,
+ execute_calls: Arc<AtomicUsize>,
+ }
+
+ impl CountingExec {
+ fn new(inner: Arc<dyn ExecutionPlan>, execute_calls: Arc<AtomicUsize>)
-> Self {
+ Self {
+ inner,
+ execute_calls,
+ }
+ }
+ }
+
+ impl DisplayAs for CountingExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "CountingExec")
+ }
+ DisplayFormatType::TreeRender => write!(f, ""),
+ }
+ }
+ }
+
+ impl ExecutionPlan for CountingExec {
+ fn name(&self) -> &'static str {
+ "CountingExec"
+ }
+
+ fn properties(&self) -> &Arc<PlanProperties> {
+ self.inner.properties()
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.inner]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(Self::new(
+ children.remove(0),
+ Arc::clone(&self.execute_calls),
+ )))
+ }
+
+ fn apply_expressions(
+ &self,
+ _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
+ ) -> Result<TreeNodeRecursion> {
+ Ok(TreeNodeRecursion::Continue)
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ self.execute_calls.fetch_add(1, Ordering::SeqCst);
+ self.inner.execute(partition, context)
+ }
+ }
+
+ fn make_subquery_plan(batches: Vec<RecordBatch>) -> Arc<dyn ExecutionPlan>
{
+ let schema = batches[0].schema();
+ TestMemoryExec::try_new_exec(&[batches], schema, None).unwrap()
+ }
+
+ fn make_results(n: usize) -> ScalarSubqueryResults {
+ Arc::new((0..n).map(|_| OnceLock::new()).collect())
+ }
+
+ #[tokio::test]
+ async fn test_single_row_subquery() -> Result<()> {
Review Comment:
A lot of these tests seems to be boilerplate / the same
Refactoring the common code into helper functions could potentially make
this easier to see what is important for testing rtaher than common setup code
##########
datafusion/sqllogictest/test_files/metadata.slt:
##########
@@ -47,16 +47,12 @@ select id, name from table_with_metadata;
NULL bar
3 baz
-query I rowsort
+query error DataFusion error: Execution error: Scalar subquery returned more
than one row
Review Comment:
I double checked that this is consistent with postrges:
```sql
postgres=# create table foo(id int, name varchar);
CREATE TABLE
postgres=# insert into foo values (1, 'f');
INSERT 0 1
postgres=# insert into foo values (2, 'b');
INSERT 0 1
postgres=# select (select id from foo) UNION (select id from foo);
ERROR: more than one row returned by a subquery used as an expression
postgres=#
```
##########
datafusion/physical-expr/src/planner.rs:
##########
@@ -409,6 +410,29 @@ pub fn create_physical_expr(
expressions::in_list(value_expr, list_exprs, negated,
input_schema)
}
},
+ Expr::ScalarSubquery(sq) => {
+ match execution_props.subquery_indexes.get(sq) {
+ Some(&index) => {
+ let schema = sq.subquery.schema();
+ let dt = schema.field(0).data_type().clone();
Review Comment:
is it worth checking here that the schema actually hs one field? Maybe
something like
```rust
assert_or_internal_error(schema.len(),1, "Subquery output expected to be a
single field");
```
##########
datafusion/physical-plan/src/scalar_subquery.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for uncorrelated scalar subqueries.
+//!
+//! [`ScalarSubqueryExec`] wraps a main input plan and a set of subquery plans.
+//! At execution time, it runs each subquery exactly once, extracts the scalar
+//! result, and populates the shared results container that
+//! [`ScalarSubqueryExpr`] instances read from by index.
+//!
+//! [`ScalarSubqueryExpr`]:
datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr
+
+use std::fmt;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{Result, ScalarValue, Statistics, exec_err,
internal_err};
+use datafusion_execution::TaskContext;
+use datafusion_expr::execution_props::ScalarSubqueryResults;
+use datafusion_physical_expr::PhysicalExpr;
+
+use crate::execution_plan::{CardinalityEffect, ExecutionPlan, PlanProperties};
+use crate::joins::utils::{OnceAsync, OnceFut};
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayAs, DisplayFormatType, SendableRecordBatchStream};
+
+use futures::StreamExt;
+use futures::TryStreamExt;
+
+/// Links a scalar subquery's execution plan to its index in the shared results
Review Comment:
It might also point out that from a results perspective this is a "NoOp" (it
just passes the inputs through), but that it has a sideeffect of calculating
scalar value subqueries
##########
datafusion/expr/src/execution_props.rs:
##########
@@ -42,6 +55,12 @@ pub struct ExecutionProps {
pub config_options: Option<Arc<ConfigOptions>>,
/// Providers for scalar variables
pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send +
Sync>>>,
+ /// Maps each logical `Subquery` to its index in `subquery_results`.
+ /// Populated by the physical planner before calling
`create_physical_expr`.
+ pub subquery_indexes: HashMap<crate::logical_plan::Subquery, usize>,
Review Comment:
Codex points out that two logically equivalent subqueries (aka had the same
SQL text) will actually be treated as being different beacuse their spans are
different
I think this is ok, and we could potentially detect and optimize away
duplcated scalar subqueries as a follow on PR (we would also have to detect
volatile (`random`) functions etc)
##########
.github/workflows/large_files.yml:
##########
@@ -34,9 +34,9 @@ jobs:
fetch-depth: 0
- name: Check size of new Git objects
env:
- # 1 MB ought to be enough for anybody.
Review Comment:
do we really need to up the limit? this repo gets checked out a lot
What is so large that required increasing to 2MB?
##########
datafusion/expr/src/execution_props.rs:
##########
@@ -18,9 +18,22 @@
use crate::var_provider::{VarProvider, VarType};
use chrono::{DateTime, Utc};
use datafusion_common::HashMap;
+use datafusion_common::ScalarValue;
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
+
+/// Shared results container for uncorrelated scalar subqueries.
+///
+/// Each entry corresponds to one scalar subquery, identified by its index.
Review Comment:
Since this is part of the public API I recommend wrapping this in its own
Struct so we can evolve it more easily wihtout breaking API changes
Something like
```rust
struct ScalarSubqueryResults {
// details
}
impl ScalarSubqueryResults {
fn new(n: usize) -> Self {
...
}
}
```
##########
datafusion/sqllogictest/test_files/subquery.slt:
##########
@@ -1672,6 +1726,197 @@ drop table employees;
statement count 0
drop table project_assignments;
+#############
Review Comment:
I am curious how this PR changes the behavior
Can you perhaps pull these new tests into a new PR? That way then we can
evaluate the change in behavior introduce in this PR by reviewing the
diffrences in the tests?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -379,8 +385,95 @@ impl DefaultPhysicalPlanner {
Ok(())
}
- /// Create a physical plan from a logical plan
- async fn create_initial_plan(
+ /// Collect uncorrelated scalar subqueries. We don't descend into nested
+ /// subqueries here: each call to `create_initial_plan` handles subqueries
+ /// at its level and then recurses in order to handle nested subqueries.
+ fn collect_scalar_subqueries(plan: &LogicalPlan) -> Vec<Subquery> {
+ let mut subqueries = Vec::new();
+ let mut seen = HashSet::new();
+ plan.apply(|node| {
+ for expr in node.expressions() {
+ expr.apply(|e| {
+ if let Expr::ScalarSubquery(sq) = e
+ && sq.outer_ref_columns.is_empty()
+ && seen.insert(sq.clone())
+ {
+ subqueries.push(sq.clone());
Review Comment:
does it need to clone them ? Maybe it could return Vec<&Subquery>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]