This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cf8f2f829a Introduce LogicalPlan invariants, begin automatically
checking them (#13651)
cf8f2f829a is described below
commit cf8f2f829a5c8c108f934a4471e1e47accc5e393
Author: wiedld <[email protected]>
AuthorDate: Thu Dec 26 15:46:30 2024 -0500
Introduce LogicalPlan invariants, begin automatically checking them (#13651)
* minor(13525): perform LP validation before and after each possible
mutation
* minor(13525): validate unique field names on query and subquery schemas,
after each optimizer pass
* minor(13525): validate union after each optimizer passes
* refactor: make explicit what is an invariant of the logical plan, versus
assertions made after a given analyzer or optimizer pass
* chore: add link to invariant docs
* fix: add new invariants module
* refactor: move all LP invariant checking into LP, delineate executable
(valid semantic plan) vs basic LP invariants
* test: update test for slight error message change
* fix: push_down_filter optimization pass can push a IN(<subquery>) into a
TableScan's filter clause
* refactor: move collect_subquery_cols() to common utils crate
* refactor: clarify the purpose of assert_valid_optimization(), runs after
all optimizer passes, except in debug mode it runs after each pass.
* refactor: based upon performance tests, run the maximum number of checks
without impa ct:
* assert_valid_optimization can run each optimizer pass
* remove the recursive cehck_fields, which caused the performance regression
* the full LP Invariants::Executable can only run in debug
* chore: update error naming and terminology used in code comments
* refactor: use proper error methods
* chore: more cleanup of error messages
* chore: handle option trailer to error message
* test: update sqllogictests tests to not use multiline
---
.../src/logical_plan/invariants.rs} | 105 +++++++++++++++++++--
datafusion/expr/src/logical_plan/mod.rs | 2 +
datafusion/expr/src/logical_plan/plan.rs | 11 +++
datafusion/expr/src/utils.rs | 20 +++-
datafusion/optimizer/src/analyzer/mod.rs | 60 +++++-------
datafusion/optimizer/src/decorrelate.rs | 5 +-
.../src/decorrelate_predicate_subquery.rs | 4 +-
datafusion/optimizer/src/optimizer.rs | 73 ++++++++------
.../optimizer/src/scalar_subquery_to_join.rs | 4 +-
datafusion/optimizer/src/utils.rs | 17 ----
datafusion/sqllogictest/test_files/subquery.slt | 12 +--
11 files changed, 209 insertions(+), 104 deletions(-)
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs
b/datafusion/expr/src/logical_plan/invariants.rs
similarity index 78%
rename from datafusion/optimizer/src/analyzer/subquery.rs
rename to datafusion/expr/src/logical_plan/invariants.rs
index 7129da85f3..bde4acaae5 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/expr/src/logical_plan/invariants.rs
@@ -15,14 +15,98 @@
// specific language governing permissions and limitations
// under the License.
-use crate::analyzer::check_plan;
-use crate::utils::collect_subquery_cols;
+use datafusion_common::{
+ internal_err, plan_err,
+ tree_node::{TreeNode, TreeNodeRecursion},
+ DFSchemaRef, Result,
+};
-use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
-use datafusion_common::{plan_err, Result};
-use datafusion_expr::expr_rewriter::strip_outer_reference;
-use datafusion_expr::utils::split_conjunction;
-use datafusion_expr::{Aggregate, Expr, Filter, Join, JoinType, LogicalPlan,
Window};
+use crate::{
+ expr::{Exists, InSubquery},
+ expr_rewriter::strip_outer_reference,
+ utils::{collect_subquery_cols, split_conjunction},
+ Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window,
+};
+
+pub enum InvariantLevel {
+ /// Invariants that are always true in DataFusion `LogicalPlan`s
+ /// such as the number of expected children and no duplicated output fields
+ Always,
+ /// Invariants that must hold true for the plan to be "executable"
+ /// such as the type and number of function arguments are correct and
+ /// that wildcards have been expanded
+ ///
+ /// To ensure a LogicalPlan satisfies the `Executable` invariants, run the
+ /// `Analyzer`
+ Executable,
+}
+
+pub fn assert_always_invariants(plan: &LogicalPlan) -> Result<()> {
+ // Refer to
<https://datafusion.apache.org/contributor-guide/specification/invariants.html#relation-name-tuples-in-logical-fields-and-logical-columns-are-unique>
+ assert_unique_field_names(plan)?;
+
+ Ok(())
+}
+
+pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> {
+ assert_always_invariants(plan)?;
+ assert_valid_semantic_plan(plan)?;
+ Ok(())
+}
+
+/// Returns an error if plan, and subplans, do not have unique fields.
+///
+/// This invariant is subject to change.
+/// refer:
<https://github.com/apache/datafusion/issues/13525#issuecomment-2494046463>
+fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> {
+ plan.schema().check_names()
+}
+
+/// Returns an error if the plan is not sematically valid.
+fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
+ assert_subqueries_are_valid(plan)?;
+
+ Ok(())
+}
+
+/// Returns an error if the plan does not have the expected schema.
+/// Ignores metadata and nullability.
+pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) ->
Result<()> {
+ let equivalent = plan.schema().equivalent_names_and_types(schema);
+
+ if !equivalent {
+ internal_err!(
+ "Failed due to a difference in schemas, original schema: {:?}, new
schema: {:?}",
+ schema,
+ plan.schema()
+ )
+ } else {
+ Ok(())
+ }
+}
+
+/// Asserts that the subqueries are structured properly with valid node
placement.
+///
+/// Refer to [`check_subquery_expr`] for more details.
+fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> {
+ plan.apply_with_subqueries(|plan: &LogicalPlan| {
+ plan.apply_expressions(|expr| {
+ // recursively look for subqueries
+ expr.apply(|expr| {
+ match expr {
+ Expr::Exists(Exists { subquery, .. })
+ | Expr::InSubquery(InSubquery { subquery, .. })
+ | Expr::ScalarSubquery(subquery) => {
+ check_subquery_expr(plan, &subquery.subquery, expr)?;
+ }
+ _ => {}
+ };
+ Ok(TreeNodeRecursion::Continue)
+ })
+ })
+ })
+ .map(|_| ())
+}
/// Do necessary check on subquery expressions and fail the invalid plan
/// 1) Check whether the outer plan is in the allowed outer plans list to use
subquery expressions,
@@ -36,7 +120,7 @@ pub fn check_subquery_expr(
inner_plan: &LogicalPlan,
expr: &Expr,
) -> Result<()> {
- check_plan(inner_plan)?;
+ assert_subqueries_are_valid(inner_plan)?;
if let Expr::ScalarSubquery(subquery) = expr {
// Scalar subquery should only return one column
if subquery.subquery.schema().fields().len() > 1 {
@@ -108,12 +192,13 @@ pub fn check_subquery_expr(
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
+ | LogicalPlan::TableScan(_)
| LogicalPlan::Window(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Join(_) => Ok(()),
_ => plan_err!(
"In/Exist subquery can only be used in \
- Projection, Filter, Window functions, Aggregate and Join plan
nodes, \
+ Projection, Filter, TableScan, Window functions, Aggregate and
Join plan nodes, \
but was used in [{}]",
outer_plan.display()
),
@@ -285,8 +370,8 @@ mod test {
use std::cmp::Ordering;
use std::sync::Arc;
+ use crate::{Extension, UserDefinedLogicalNodeCore};
use datafusion_common::{DFSchema, DFSchemaRef};
- use datafusion_expr::{Extension, UserDefinedLogicalNodeCore};
use super::*;
diff --git a/datafusion/expr/src/logical_plan/mod.rs
b/datafusion/expr/src/logical_plan/mod.rs
index 5d613d4e80..4049413786 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -20,6 +20,8 @@ mod ddl;
pub mod display;
pub mod dml;
mod extension;
+pub(crate) mod invariants;
+pub use invariants::{assert_expected_schema, check_subquery_expr,
InvariantLevel};
mod plan;
mod statement;
pub mod tree_node;
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 47d9aac3ca..cc922709c8 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -24,6 +24,9 @@ use std::hash::{Hash, Hasher};
use std::sync::{Arc, LazyLock};
use super::dml::CopyTo;
+use super::invariants::{
+ assert_always_invariants, assert_executable_invariants, InvariantLevel,
+};
use super::DdlStatement;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction};
@@ -1127,6 +1130,14 @@ impl LogicalPlan {
}
}
+ /// checks that the plan conforms to the listed invariant level, returning
an Error if not
+ pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
+ match check {
+ InvariantLevel::Always => assert_always_invariants(self),
+ InvariantLevel::Executable => assert_executable_invariants(self),
+ }
+ }
+
/// Helper for [Self::with_new_exprs] to use when no expressions are
expected.
#[inline]
#[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to
ensure it's not used again
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 9d0a2b5b95..b1e36e0292 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -18,7 +18,7 @@
//! Expression utilities
use std::cmp::Ordering;
-use std::collections::HashSet;
+use std::collections::{BTreeSet, HashSet};
use std::ops::Deref;
use std::sync::Arc;
@@ -1402,6 +1402,24 @@ pub fn format_state_name(name: &str, state_name: &str)
-> String {
format!("{name}[{state_name}]")
}
+/// Determine the set of [`Column`]s produced by the subquery.
+pub fn collect_subquery_cols(
+ exprs: &[Expr],
+ subquery_schema: &DFSchema,
+) -> Result<BTreeSet<Column>> {
+ exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
+ let mut using_cols: Vec<Column> = vec![];
+ for col in expr.column_refs().into_iter() {
+ if subquery_schema.has_column(col) {
+ using_cols.push(col.clone());
+ }
+ }
+
+ cols.extend(using_cols);
+ Result::<_>::Ok(cols)
+ })
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/optimizer/src/analyzer/mod.rs
b/datafusion/optimizer/src/analyzer/mod.rs
index f2fd61dfa8..9d0ac6b54c 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -24,18 +24,14 @@ use log::debug;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
-use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::expr::Exists;
-use datafusion_expr::expr::InSubquery;
+use datafusion_common::Result;
use datafusion_expr::expr_rewriter::FunctionRewrite;
-use datafusion_expr::{Expr, LogicalPlan};
+use datafusion_expr::{InvariantLevel, LogicalPlan};
use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
-use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;
@@ -46,9 +42,16 @@ pub mod expand_wildcard_rule;
pub mod function_rewrite;
pub mod inline_table_scan;
pub mod resolve_grouping_function;
-pub mod subquery;
pub mod type_coercion;
+pub mod subquery {
+ #[deprecated(
+ since = "44.0.0",
+ note = "please use `datafusion_expr::check_subquery_expr` instead"
+ )]
+ pub use datafusion_expr::check_subquery_expr;
+}
+
/// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make
/// the plan valid prior to the rest of the DataFusion optimization process.
///
@@ -56,7 +59,7 @@ pub mod type_coercion;
/// which must preserve the semantics of the `LogicalPlan`, while computing
/// results in a more optimal way.
///
-/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific
+/// For example, an `AnalyzerRule` may resolve
[`Expr`](datafusion_expr::Expr)s into more specific
/// forms such as a subquery reference, or do type coercion to ensure the types
/// of operands are correct.
///
@@ -140,6 +143,10 @@ impl Analyzer {
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
+ // verify the logical plan required invariants at the start, before
analyzer
+ plan.check_invariants(InvariantLevel::Always)
+ .map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;
+
let start_time = Instant::now();
let mut new_plan = plan;
@@ -161,39 +168,20 @@ impl Analyzer {
// TODO add common rule executor for Analyzer and Optimizer
for rule in rules {
- new_plan = rule.analyze(new_plan, config).map_err(|e| {
- DataFusionError::Context(rule.name().to_string(), Box::new(e))
- })?;
+ new_plan = rule
+ .analyze(new_plan, config)
+ .map_err(|e| e.context(rule.name()))?;
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
- // for easier display in explain output
- check_plan(&new_plan).map_err(|e| {
- DataFusionError::Context("check_analyzed_plan".to_string(),
Box::new(e))
- })?;
+
+ // verify at the end, after the last LP analyzer pass, that the plan
is executable.
+ new_plan
+ .check_invariants(InvariantLevel::Executable)
+ .map_err(|e| e.context("Invalid (non-executable) plan after
Analyzer"))?;
+
log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}
-
-/// Do necessary check and fail the invalid plan
-fn check_plan(plan: &LogicalPlan) -> Result<()> {
- plan.apply_with_subqueries(|plan: &LogicalPlan| {
- plan.apply_expressions(|expr| {
- // recursively look for subqueries
- expr.apply(|expr| {
- match expr {
- Expr::Exists(Exists { subquery, .. })
- | Expr::InSubquery(InSubquery { subquery, .. })
- | Expr::ScalarSubquery(subquery) => {
- check_subquery_expr(plan, &subquery.subquery, expr)?;
- }
- _ => {}
- };
- Ok(TreeNodeRecursion::Continue)
- })
- })
- })
- .map(|_| ())
-}
diff --git a/datafusion/optimizer/src/decorrelate.rs
b/datafusion/optimizer/src/decorrelate.rs
index b5726d9991..ee6ea08b43 100644
--- a/datafusion/optimizer/src/decorrelate.rs
+++ b/datafusion/optimizer/src/decorrelate.rs
@@ -22,7 +22,6 @@ use std::ops::Deref;
use std::sync::Arc;
use crate::simplify_expressions::ExprSimplifier;
-use crate::utils::collect_subquery_cols;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
TreeNodeRewriter,
@@ -30,7 +29,9 @@ use datafusion_common::tree_node::{
use datafusion_common::{plan_err, Column, DFSchemaRef, HashMap, Result,
ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::simplify::SimplifyContext;
-use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
+use datafusion_expr::utils::{
+ collect_subquery_cols, conjunction, find_join_exprs, split_conjunction,
+};
use datafusion_expr::{
expr, lit, BinaryExpr, Cast, EmptyRelation, Expr, FetchType, LogicalPlan,
LogicalPlanBuilder, Operator,
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index 3e5a85ea02..a87688c1a3 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -835,7 +835,7 @@ mod tests {
.build()?;
// Maybe okay if the table only has a single column?
- let expected = "check_analyzed_plan\
+ let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: InSubquery should only return one column, but
found 4";
assert_analyzer_check_err(vec![], plan, expected);
@@ -930,7 +930,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;
- let expected = "check_analyzed_plan\
+ let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: InSubquery should only return one column";
assert_analyzer_check_err(vec![], plan, expected);
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index dfdd0c110c..49bce3c1ce 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -22,6 +22,7 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use datafusion_expr::registry::FunctionRegistry;
+use datafusion_expr::{assert_expected_schema, InvariantLevel};
use log::{debug, warn};
use datafusion_common::alias::AliasGenerator;
@@ -355,6 +356,10 @@ impl Optimizer {
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
+ // verify LP is valid, before the first LP optimizer pass.
+ plan.check_invariants(InvariantLevel::Executable)
+ .map_err(|e| e.context("Invalid input plan before LP
Optimizers"))?;
+
let start_time = Instant::now();
let options = config.options();
let mut new_plan = plan;
@@ -362,6 +367,8 @@ impl Optimizer {
let mut previous_plans = HashSet::with_capacity(16);
previous_plans.insert(LogicalPlanSignature::new(&new_plan));
+ let starting_schema = Arc::clone(new_plan.schema());
+
let mut i = 0;
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
@@ -384,9 +391,16 @@ impl Optimizer {
// rule handles recursion itself
None => optimize_plan_node(new_plan, rule.as_ref(),
config),
}
- // verify the rule didn't change the schema
.and_then(|tnr| {
- assert_schema_is_the_same(rule.name(), &starting_schema,
&tnr.data)?;
+ // run checks optimizer invariant checks, per optimizer
rule applied
+ assert_valid_optimization(&tnr.data, &starting_schema)
+ .map_err(|e| e.context(format!("Check
optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
+
+ // run LP invariant checks only in debug mode for
performance reasons
+ #[cfg(debug_assertions)]
+ tnr.data.check_invariants(InvariantLevel::Executable)
+ .map_err(|e| e.context(format!("Invalid
(non-executable) plan after Optimizer rule: {}", rule.name())))?;
+
Ok(tnr)
});
@@ -445,35 +459,38 @@ impl Optimizer {
}
i += 1;
}
+
+ // verify that the optimizer passes only mutated what was permitted.
+ assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
+ e.context("Check optimizer-specific invariants after all passes")
+ })?;
+
+ // verify LP is valid, after the last optimizer pass.
+ new_plan
+ .check_invariants(InvariantLevel::Executable)
+ .map_err(|e| {
+ e.context("Invalid (non-executable) plan after LP Optimizers")
+ })?;
+
log_plan("Final optimized plan", &new_plan);
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}
-/// Returns an error if `new_plan`'s schema is different than `prev_schema`
+/// These are invariants which should hold true before and after
[`LogicalPlan`] optimization.
///
-/// It ignores metadata and nullability.
-pub(crate) fn assert_schema_is_the_same(
- rule_name: &str,
- prev_schema: &DFSchema,
- new_plan: &LogicalPlan,
+/// This differs from [`LogicalPlan::check_invariants`], which addresses if a
singular
+/// LogicalPlan is valid. Instead this address if the optimization was valid
based upon permitted changes.
+fn assert_valid_optimization(
+ plan: &LogicalPlan,
+ prev_schema: &Arc<DFSchema>,
) -> Result<()> {
- let equivalent = new_plan.schema().equivalent_names_and_types(prev_schema);
+ // verify invariant: optimizer passes should not change the schema
+ // Refer to
<https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
+ assert_expected_schema(prev_schema, plan)?;
- if !equivalent {
- let e = DataFusionError::Internal(format!(
- "Failed due to a difference in schemas, original schema: {:?}, new
schema: {:?}",
- prev_schema,
- new_plan.schema()
- ));
- Err(DataFusionError::Context(
- String::from(rule_name),
- Box::new(e),
- ))
- } else {
- Ok(())
- }
+ Ok(())
}
#[cfg(test)]
@@ -527,9 +544,11 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
});
let err = opt.optimize(plan, &config, &observe).unwrap_err();
- assert_eq!(
+ assert!(err.strip_backtrace().starts_with(
"Optimizer rule 'get table_scan rule' failed\n\
- caused by\nget table_scan rule\ncaused by\n\
+ caused by\n\
+ Check optimizer-specific invariants after optimizer rule: get
table_scan rule\n\
+ caused by\n\
Internal error: Failed due to a difference in schemas, \
original schema: DFSchema { inner: Schema { \
fields: [], \
@@ -545,10 +564,8 @@ mod tests {
], \
metadata: {} }, \
field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare {
table: \"test\" }), Some(Bare { table: \"test\" })], \
- functional_dependencies: FunctionalDependencies { deps: [] } }.\n\
- This was likely caused by a bug in DataFusion's code and we would
welcome that you file an bug report in our issue tracker",
- err.strip_backtrace()
- );
+ functional_dependencies: FunctionalDependencies { deps: [] } }",
+ ));
}
#[test]
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 9e7f8eed8a..3a8aef267b 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -731,7 +731,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;
- let expected = "check_analyzed_plan\
+ let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: Scalar subquery should only return one
column";
assert_analyzer_check_err(vec![], plan, expected);
@@ -793,7 +793,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;
- let expected = "check_analyzed_plan\
+ let expected = "Invalid (non-executable) plan after Analyzer\
\ncaused by\
\nError during planning: Scalar subquery should only return one
column";
assert_analyzer_check_err(vec![], plan, expected);
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index 9f325bc01b..39f8cf285d 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -87,23 +87,6 @@ pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols:
&HashSet<Column>) ->
== column_refs.len()
}
-pub(crate) fn collect_subquery_cols(
- exprs: &[Expr],
- subquery_schema: &DFSchema,
-) -> Result<BTreeSet<Column>> {
- exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
- let mut using_cols: Vec<Column> = vec![];
- for col in expr.column_refs().into_iter() {
- if subquery_schema.has_column(col) {
- using_cols.push(col.clone());
- }
- }
-
- cols.extend(using_cols);
- Result::<_>::Ok(cols)
- })
-}
-
pub(crate) fn replace_qualified_name(
expr: Expr,
cols: &BTreeSet<Column>,
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 027b5ca8dc..25fe4c7b03 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -433,16 +433,16 @@ logical_plan
08)----------TableScan: t1 projection=[t1_int]
#invalid_scalar_subquery
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during
planning: Scalar subquery should only return one column, but found 2: t2.t2_id,
t2.t2_name
+statement error DataFusion error: Invalid \(non-executable\) plan after
Analyzer\ncaused by\nError during planning: Scalar subquery should only return
one column, but found 2: t2.t2_id, t2.t2_name
SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id =
t1.t1_int) FROM t1
#subquery_not_allowed
#In/Exist Subquery is not allowed in ORDER BY clause.
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during
planning: In/Exist subquery can only be used in Projection, Filter, Window
functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN
\(<subquery>\) ASC NULLS LAST\]
+statement error DataFusion error: Invalid \(non-executable\) plan after
Analyzer\ncaused by\nError during planning: In/Exist subquery can only be used
in Projection, Filter, TableScan, Window functions, Aggregate and Join plan
nodes, but was used in \[Sort: t1.t1_int IN \(<subquery>\) ASC NULLS LAST\]
SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM
t2 WHERE t1.t1_id > t1.t1_int)
#non_aggregated_correlated_scalar_subquery
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during
planning: Correlated scalar subquery must be aggregated to return at most one
row
+statement error DataFusion error: Invalid \(non-executable\) plan after
Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be
aggregated to return at most one row
SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int
from t1
#non_aggregated_correlated_scalar_subquery_unique
@@ -456,11 +456,11 @@ SELECT t1_id, (SELECT t3_int FROM t3 WHERE t3.t3_id =
t1.t1_id) as t3_int from t
#non_aggregated_correlated_scalar_subquery
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during
planning: Correlated scalar subquery must be aggregated to return at most one
row
+statement error DataFusion error: Invalid \(non-executable\) plan after
Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be
aggregated to return at most one row
SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int)
as t2_int from t1
#non_aggregated_correlated_scalar_subquery_with_limit
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during
planning: Correlated scalar subquery must be aggregated to return at most one
row
+statement error DataFusion error: Invalid \(non-executable\) plan after
Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be
aggregated to return at most one row
SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as
t2_int from t1
#non_aggregated_correlated_scalar_subquery_with_single_row
@@ -523,7 +523,7 @@ logical_plan
07)--TableScan: t1 projection=[t1_id]
#aggregated_correlated_scalar_subquery_with_extra_group_by_columns
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during
planning: A GROUP BY clause in a scalar correlated subquery cannot contain
non-correlated columns
+statement error DataFusion error: Invalid \(non-executable\) plan after
Analyzer\ncaused by\nError during planning: A GROUP BY clause in a scalar
correlated subquery cannot contain non-correlated columns
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by
t2_name) as t2_sum from t1
#support_agg_correlated_columns
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]