This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cf8f2f829a Introduce LogicalPlan invariants, begin automatically 
checking them (#13651)
cf8f2f829a is described below

commit cf8f2f829a5c8c108f934a4471e1e47accc5e393
Author: wiedld <[email protected]>
AuthorDate: Thu Dec 26 15:46:30 2024 -0500

    Introduce LogicalPlan invariants, begin automatically checking them (#13651)
    
    * minor(13525): perform LP validation before and after each possible 
mutation
    
    * minor(13525): validate unique field names on query and subquery schemas, 
after each optimizer pass
    
    * minor(13525): validate union after each optimizer passes
    
    * refactor: make explicit what is an invariant of the logical plan, versus 
assertions made after a given analyzer or optimizer pass
    
    * chore: add link to invariant docs
    
    * fix: add new invariants module
    
    * refactor: move all LP invariant checking into LP, delineate executable 
(valid semantic plan) vs basic LP invariants
    
    * test: update test for slight error message change
    
    * fix: push_down_filter optimization pass can push a IN(<subquery>) into a 
TableScan's filter clause
    
    * refactor: move collect_subquery_cols() to common utils crate
    
    * refactor: clarify the purpose of assert_valid_optimization(), runs after 
all optimizer passes, except in debug mode it runs after each pass.
    
    * refactor: based upon performance tests, run the maximum number of checks 
without impa ct:
    * assert_valid_optimization can run each optimizer pass
    * remove the recursive cehck_fields, which caused the performance regression
    * the full LP Invariants::Executable can only run in debug
    
    * chore: update error naming and terminology used in code comments
    
    * refactor: use proper error methods
    
    * chore: more cleanup of error messages
    
    * chore: handle option trailer to error message
    
    * test: update sqllogictests tests to not use multiline
---
 .../src/logical_plan/invariants.rs}                | 105 +++++++++++++++++++--
 datafusion/expr/src/logical_plan/mod.rs            |   2 +
 datafusion/expr/src/logical_plan/plan.rs           |  11 +++
 datafusion/expr/src/utils.rs                       |  20 +++-
 datafusion/optimizer/src/analyzer/mod.rs           |  60 +++++-------
 datafusion/optimizer/src/decorrelate.rs            |   5 +-
 .../src/decorrelate_predicate_subquery.rs          |   4 +-
 datafusion/optimizer/src/optimizer.rs              |  73 ++++++++------
 .../optimizer/src/scalar_subquery_to_join.rs       |   4 +-
 datafusion/optimizer/src/utils.rs                  |  17 ----
 datafusion/sqllogictest/test_files/subquery.slt    |  12 +--
 11 files changed, 209 insertions(+), 104 deletions(-)

diff --git a/datafusion/optimizer/src/analyzer/subquery.rs 
b/datafusion/expr/src/logical_plan/invariants.rs
similarity index 78%
rename from datafusion/optimizer/src/analyzer/subquery.rs
rename to datafusion/expr/src/logical_plan/invariants.rs
index 7129da85f3..bde4acaae5 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/expr/src/logical_plan/invariants.rs
@@ -15,14 +15,98 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::analyzer::check_plan;
-use crate::utils::collect_subquery_cols;
+use datafusion_common::{
+    internal_err, plan_err,
+    tree_node::{TreeNode, TreeNodeRecursion},
+    DFSchemaRef, Result,
+};
 
-use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
-use datafusion_common::{plan_err, Result};
-use datafusion_expr::expr_rewriter::strip_outer_reference;
-use datafusion_expr::utils::split_conjunction;
-use datafusion_expr::{Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, 
Window};
+use crate::{
+    expr::{Exists, InSubquery},
+    expr_rewriter::strip_outer_reference,
+    utils::{collect_subquery_cols, split_conjunction},
+    Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window,
+};
+
+pub enum InvariantLevel {
+    /// Invariants that are always true in DataFusion `LogicalPlan`s
+    /// such as the number of expected children and no duplicated output fields
+    Always,
+    /// Invariants that must hold true for the plan to be "executable"
+    /// such as the type and number of function arguments are correct and
+    /// that wildcards have been expanded
+    ///
+    /// To ensure a LogicalPlan satisfies the `Executable` invariants, run the
+    /// `Analyzer`
+    Executable,
+}
+
+pub fn assert_always_invariants(plan: &LogicalPlan) -> Result<()> {
+    // Refer to 
<https://datafusion.apache.org/contributor-guide/specification/invariants.html#relation-name-tuples-in-logical-fields-and-logical-columns-are-unique>
+    assert_unique_field_names(plan)?;
+
+    Ok(())
+}
+
+pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> {
+    assert_always_invariants(plan)?;
+    assert_valid_semantic_plan(plan)?;
+    Ok(())
+}
+
+/// Returns an error if plan, and subplans, do not have unique fields.
+///
+/// This invariant is subject to change.
+/// refer: 
<https://github.com/apache/datafusion/issues/13525#issuecomment-2494046463>
+fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> {
+    plan.schema().check_names()
+}
+
+/// Returns an error if the plan is not sematically valid.
+fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
+    assert_subqueries_are_valid(plan)?;
+
+    Ok(())
+}
+
+/// Returns an error if the plan does not have the expected schema.
+/// Ignores metadata and nullability.
+pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> 
Result<()> {
+    let equivalent = plan.schema().equivalent_names_and_types(schema);
+
+    if !equivalent {
+        internal_err!(
+            "Failed due to a difference in schemas, original schema: {:?}, new 
schema: {:?}",
+            schema,
+            plan.schema()
+        )
+    } else {
+        Ok(())
+    }
+}
+
+/// Asserts that the subqueries are structured properly with valid node 
placement.
+///
+/// Refer to [`check_subquery_expr`] for more details.
+fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> {
+    plan.apply_with_subqueries(|plan: &LogicalPlan| {
+        plan.apply_expressions(|expr| {
+            // recursively look for subqueries
+            expr.apply(|expr| {
+                match expr {
+                    Expr::Exists(Exists { subquery, .. })
+                    | Expr::InSubquery(InSubquery { subquery, .. })
+                    | Expr::ScalarSubquery(subquery) => {
+                        check_subquery_expr(plan, &subquery.subquery, expr)?;
+                    }
+                    _ => {}
+                };
+                Ok(TreeNodeRecursion::Continue)
+            })
+        })
+    })
+    .map(|_| ())
+}
 
 /// Do necessary check on subquery expressions and fail the invalid plan
 /// 1) Check whether the outer plan is in the allowed outer plans list to use 
subquery expressions,
@@ -36,7 +120,7 @@ pub fn check_subquery_expr(
     inner_plan: &LogicalPlan,
     expr: &Expr,
 ) -> Result<()> {
-    check_plan(inner_plan)?;
+    assert_subqueries_are_valid(inner_plan)?;
     if let Expr::ScalarSubquery(subquery) = expr {
         // Scalar subquery should only return one column
         if subquery.subquery.schema().fields().len() > 1 {
@@ -108,12 +192,13 @@ pub fn check_subquery_expr(
         match outer_plan {
             LogicalPlan::Projection(_)
             | LogicalPlan::Filter(_)
+            | LogicalPlan::TableScan(_)
             | LogicalPlan::Window(_)
             | LogicalPlan::Aggregate(_)
             | LogicalPlan::Join(_) => Ok(()),
             _ => plan_err!(
                 "In/Exist subquery can only be used in \
-                Projection, Filter, Window functions, Aggregate and Join plan 
nodes, \
+                Projection, Filter, TableScan, Window functions, Aggregate and 
Join plan nodes, \
                 but was used in [{}]",
                 outer_plan.display()
             ),
@@ -285,8 +370,8 @@ mod test {
     use std::cmp::Ordering;
     use std::sync::Arc;
 
+    use crate::{Extension, UserDefinedLogicalNodeCore};
     use datafusion_common::{DFSchema, DFSchemaRef};
-    use datafusion_expr::{Extension, UserDefinedLogicalNodeCore};
 
     use super::*;
 
diff --git a/datafusion/expr/src/logical_plan/mod.rs 
b/datafusion/expr/src/logical_plan/mod.rs
index 5d613d4e80..4049413786 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -20,6 +20,8 @@ mod ddl;
 pub mod display;
 pub mod dml;
 mod extension;
+pub(crate) mod invariants;
+pub use invariants::{assert_expected_schema, check_subquery_expr, 
InvariantLevel};
 mod plan;
 mod statement;
 pub mod tree_node;
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 47d9aac3ca..cc922709c8 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -24,6 +24,9 @@ use std::hash::{Hash, Hasher};
 use std::sync::{Arc, LazyLock};
 
 use super::dml::CopyTo;
+use super::invariants::{
+    assert_always_invariants, assert_executable_invariants, InvariantLevel,
+};
 use super::DdlStatement;
 use crate::builder::{change_redundant_column, unnest_with_options};
 use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction};
@@ -1127,6 +1130,14 @@ impl LogicalPlan {
         }
     }
 
+    /// checks that the plan conforms to the listed invariant level, returning 
an Error if not
+    pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
+        match check {
+            InvariantLevel::Always => assert_always_invariants(self),
+            InvariantLevel::Executable => assert_executable_invariants(self),
+        }
+    }
+
     /// Helper for [Self::with_new_exprs] to use when no expressions are 
expected.
     #[inline]
     #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to 
ensure it's not used again
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 9d0a2b5b95..b1e36e0292 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -18,7 +18,7 @@
 //! Expression utilities
 
 use std::cmp::Ordering;
-use std::collections::HashSet;
+use std::collections::{BTreeSet, HashSet};
 use std::ops::Deref;
 use std::sync::Arc;
 
@@ -1402,6 +1402,24 @@ pub fn format_state_name(name: &str, state_name: &str) 
-> String {
     format!("{name}[{state_name}]")
 }
 
+/// Determine the set of [`Column`]s produced by the subquery.
+pub fn collect_subquery_cols(
+    exprs: &[Expr],
+    subquery_schema: &DFSchema,
+) -> Result<BTreeSet<Column>> {
+    exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
+        let mut using_cols: Vec<Column> = vec![];
+        for col in expr.column_refs().into_iter() {
+            if subquery_schema.has_column(col) {
+                using_cols.push(col.clone());
+            }
+        }
+
+        cols.extend(using_cols);
+        Result::<_>::Ok(cols)
+    })
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/optimizer/src/analyzer/mod.rs 
b/datafusion/optimizer/src/analyzer/mod.rs
index f2fd61dfa8..9d0ac6b54c 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -24,18 +24,14 @@ use log::debug;
 
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::instant::Instant;
-use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::expr::Exists;
-use datafusion_expr::expr::InSubquery;
+use datafusion_common::Result;
 use datafusion_expr::expr_rewriter::FunctionRewrite;
-use datafusion_expr::{Expr, LogicalPlan};
+use datafusion_expr::{InvariantLevel, LogicalPlan};
 
 use crate::analyzer::count_wildcard_rule::CountWildcardRule;
 use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
 use crate::analyzer::inline_table_scan::InlineTableScan;
 use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
-use crate::analyzer::subquery::check_subquery_expr;
 use crate::analyzer::type_coercion::TypeCoercion;
 use crate::utils::log_plan;
 
@@ -46,9 +42,16 @@ pub mod expand_wildcard_rule;
 pub mod function_rewrite;
 pub mod inline_table_scan;
 pub mod resolve_grouping_function;
-pub mod subquery;
 pub mod type_coercion;
 
+pub mod subquery {
+    #[deprecated(
+        since = "44.0.0",
+        note = "please use `datafusion_expr::check_subquery_expr` instead"
+    )]
+    pub use datafusion_expr::check_subquery_expr;
+}
+
 /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make
 /// the plan valid prior to the rest of the DataFusion optimization process.
 ///
@@ -56,7 +59,7 @@ pub mod type_coercion;
 /// which must preserve the semantics of the `LogicalPlan`, while computing
 /// results in a more optimal way.
 ///
-/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific
+/// For example, an `AnalyzerRule` may resolve 
[`Expr`](datafusion_expr::Expr)s into more specific
 /// forms such as a subquery reference, or do type coercion to ensure the types
 /// of operands are correct.
 ///
@@ -140,6 +143,10 @@ impl Analyzer {
     where
         F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
     {
+        // verify the logical plan required invariants at the start, before 
analyzer
+        plan.check_invariants(InvariantLevel::Always)
+            .map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;
+
         let start_time = Instant::now();
         let mut new_plan = plan;
 
@@ -161,39 +168,20 @@ impl Analyzer {
 
         // TODO add common rule executor for Analyzer and Optimizer
         for rule in rules {
-            new_plan = rule.analyze(new_plan, config).map_err(|e| {
-                DataFusionError::Context(rule.name().to_string(), Box::new(e))
-            })?;
+            new_plan = rule
+                .analyze(new_plan, config)
+                .map_err(|e| e.context(rule.name()))?;
             log_plan(rule.name(), &new_plan);
             observer(&new_plan, rule.as_ref());
         }
-        // for easier display in explain output
-        check_plan(&new_plan).map_err(|e| {
-            DataFusionError::Context("check_analyzed_plan".to_string(), 
Box::new(e))
-        })?;
+
+        // verify at the end, after the last LP analyzer pass, that the plan 
is executable.
+        new_plan
+            .check_invariants(InvariantLevel::Executable)
+            .map_err(|e| e.context("Invalid (non-executable) plan after 
Analyzer"))?;
+
         log_plan("Final analyzed plan", &new_plan);
         debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
         Ok(new_plan)
     }
 }
-
-/// Do necessary check and fail the invalid plan
-fn check_plan(plan: &LogicalPlan) -> Result<()> {
-    plan.apply_with_subqueries(|plan: &LogicalPlan| {
-        plan.apply_expressions(|expr| {
-            // recursively look for subqueries
-            expr.apply(|expr| {
-                match expr {
-                    Expr::Exists(Exists { subquery, .. })
-                    | Expr::InSubquery(InSubquery { subquery, .. })
-                    | Expr::ScalarSubquery(subquery) => {
-                        check_subquery_expr(plan, &subquery.subquery, expr)?;
-                    }
-                    _ => {}
-                };
-                Ok(TreeNodeRecursion::Continue)
-            })
-        })
-    })
-    .map(|_| ())
-}
diff --git a/datafusion/optimizer/src/decorrelate.rs 
b/datafusion/optimizer/src/decorrelate.rs
index b5726d9991..ee6ea08b43 100644
--- a/datafusion/optimizer/src/decorrelate.rs
+++ b/datafusion/optimizer/src/decorrelate.rs
@@ -22,7 +22,6 @@ use std::ops::Deref;
 use std::sync::Arc;
 
 use crate::simplify_expressions::ExprSimplifier;
-use crate::utils::collect_subquery_cols;
 
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter,
@@ -30,7 +29,9 @@ use datafusion_common::tree_node::{
 use datafusion_common::{plan_err, Column, DFSchemaRef, HashMap, Result, 
ScalarValue};
 use datafusion_expr::expr::Alias;
 use datafusion_expr::simplify::SimplifyContext;
-use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
+use datafusion_expr::utils::{
+    collect_subquery_cols, conjunction, find_join_exprs, split_conjunction,
+};
 use datafusion_expr::{
     expr, lit, BinaryExpr, Cast, EmptyRelation, Expr, FetchType, LogicalPlan,
     LogicalPlanBuilder, Operator,
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs 
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index 3e5a85ea02..a87688c1a3 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -835,7 +835,7 @@ mod tests {
             .build()?;
 
         // Maybe okay if the table only has a single column?
-        let expected = "check_analyzed_plan\
+        let expected = "Invalid (non-executable) plan after Analyzer\
         \ncaused by\
         \nError during planning: InSubquery should only return one column, but 
found 4";
         assert_analyzer_check_err(vec![], plan, expected);
@@ -930,7 +930,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = "check_analyzed_plan\
+        let expected = "Invalid (non-executable) plan after Analyzer\
         \ncaused by\
         \nError during planning: InSubquery should only return one column";
         assert_analyzer_check_err(vec![], plan, expected);
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index dfdd0c110c..49bce3c1ce 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -22,6 +22,7 @@ use std::sync::Arc;
 
 use chrono::{DateTime, Utc};
 use datafusion_expr::registry::FunctionRegistry;
+use datafusion_expr::{assert_expected_schema, InvariantLevel};
 use log::{debug, warn};
 
 use datafusion_common::alias::AliasGenerator;
@@ -355,6 +356,10 @@ impl Optimizer {
     where
         F: FnMut(&LogicalPlan, &dyn OptimizerRule),
     {
+        // verify LP is valid, before the first LP optimizer pass.
+        plan.check_invariants(InvariantLevel::Executable)
+            .map_err(|e| e.context("Invalid input plan before LP 
Optimizers"))?;
+
         let start_time = Instant::now();
         let options = config.options();
         let mut new_plan = plan;
@@ -362,6 +367,8 @@ impl Optimizer {
         let mut previous_plans = HashSet::with_capacity(16);
         previous_plans.insert(LogicalPlanSignature::new(&new_plan));
 
+        let starting_schema = Arc::clone(new_plan.schema());
+
         let mut i = 0;
         while i < options.optimizer.max_passes {
             log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
@@ -384,9 +391,16 @@ impl Optimizer {
                     // rule handles recursion itself
                     None => optimize_plan_node(new_plan, rule.as_ref(), 
config),
                 }
-                // verify the rule didn't change the schema
                 .and_then(|tnr| {
-                    assert_schema_is_the_same(rule.name(), &starting_schema, 
&tnr.data)?;
+                    // run checks optimizer invariant checks, per optimizer 
rule applied
+                    assert_valid_optimization(&tnr.data, &starting_schema)
+                        .map_err(|e| e.context(format!("Check 
optimizer-specific invariants after optimizer rule: {}", rule.name())))?;
+
+                    // run LP invariant checks only in debug mode for 
performance reasons
+                    #[cfg(debug_assertions)]
+                    tnr.data.check_invariants(InvariantLevel::Executable)
+                        .map_err(|e| e.context(format!("Invalid 
(non-executable) plan after Optimizer rule: {}", rule.name())))?;
+
                     Ok(tnr)
                 });
 
@@ -445,35 +459,38 @@ impl Optimizer {
             }
             i += 1;
         }
+
+        // verify that the optimizer passes only mutated what was permitted.
+        assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| {
+            e.context("Check optimizer-specific invariants after all passes")
+        })?;
+
+        // verify LP is valid, after the last optimizer pass.
+        new_plan
+            .check_invariants(InvariantLevel::Executable)
+            .map_err(|e| {
+                e.context("Invalid (non-executable) plan after LP Optimizers")
+            })?;
+
         log_plan("Final optimized plan", &new_plan);
         debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
         Ok(new_plan)
     }
 }
 
-/// Returns an error if `new_plan`'s schema is different than `prev_schema`
+/// These are invariants which should hold true before and after 
[`LogicalPlan`] optimization.
 ///
-/// It ignores metadata and nullability.
-pub(crate) fn assert_schema_is_the_same(
-    rule_name: &str,
-    prev_schema: &DFSchema,
-    new_plan: &LogicalPlan,
+/// This differs from [`LogicalPlan::check_invariants`], which addresses if a 
singular
+/// LogicalPlan is valid. Instead this address if the optimization was valid 
based upon permitted changes.
+fn assert_valid_optimization(
+    plan: &LogicalPlan,
+    prev_schema: &Arc<DFSchema>,
 ) -> Result<()> {
-    let equivalent = new_plan.schema().equivalent_names_and_types(prev_schema);
+    // verify invariant: optimizer passes should not change the schema
+    // Refer to 
<https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
+    assert_expected_schema(prev_schema, plan)?;
 
-    if !equivalent {
-        let e = DataFusionError::Internal(format!(
-            "Failed due to a difference in schemas, original schema: {:?}, new 
schema: {:?}",
-            prev_schema,
-            new_plan.schema()
-        ));
-        Err(DataFusionError::Context(
-            String::from(rule_name),
-            Box::new(e),
-        ))
-    } else {
-        Ok(())
-    }
+    Ok(())
 }
 
 #[cfg(test)]
@@ -527,9 +544,11 @@ mod tests {
             schema: Arc::new(DFSchema::empty()),
         });
         let err = opt.optimize(plan, &config, &observe).unwrap_err();
-        assert_eq!(
+        assert!(err.strip_backtrace().starts_with(
             "Optimizer rule 'get table_scan rule' failed\n\
-            caused by\nget table_scan rule\ncaused by\n\
+            caused by\n\
+            Check optimizer-specific invariants after optimizer rule: get 
table_scan rule\n\
+            caused by\n\
             Internal error: Failed due to a difference in schemas, \
             original schema: DFSchema { inner: Schema { \
             fields: [], \
@@ -545,10 +564,8 @@ mod tests {
             ], \
             metadata: {} }, \
             field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { 
table: \"test\" }), Some(Bare { table: \"test\" })], \
-            functional_dependencies: FunctionalDependencies { deps: [] } }.\n\
-            This was likely caused by a bug in DataFusion's code and we would 
welcome that you file an bug report in our issue tracker",
-            err.strip_backtrace()
-        );
+            functional_dependencies: FunctionalDependencies { deps: [] } }",
+        ));
     }
 
     #[test]
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 9e7f8eed8a..3a8aef267b 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -731,7 +731,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = "check_analyzed_plan\
+        let expected = "Invalid (non-executable) plan after Analyzer\
         \ncaused by\
         \nError during planning: Scalar subquery should only return one 
column";
         assert_analyzer_check_err(vec![], plan, expected);
@@ -793,7 +793,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = "check_analyzed_plan\
+        let expected = "Invalid (non-executable) plan after Analyzer\
         \ncaused by\
         \nError during planning: Scalar subquery should only return one 
column";
         assert_analyzer_check_err(vec![], plan, expected);
diff --git a/datafusion/optimizer/src/utils.rs 
b/datafusion/optimizer/src/utils.rs
index 9f325bc01b..39f8cf285d 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -87,23 +87,6 @@ pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: 
&HashSet<Column>) ->
         == column_refs.len()
 }
 
-pub(crate) fn collect_subquery_cols(
-    exprs: &[Expr],
-    subquery_schema: &DFSchema,
-) -> Result<BTreeSet<Column>> {
-    exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
-        let mut using_cols: Vec<Column> = vec![];
-        for col in expr.column_refs().into_iter() {
-            if subquery_schema.has_column(col) {
-                using_cols.push(col.clone());
-            }
-        }
-
-        cols.extend(using_cols);
-        Result::<_>::Ok(cols)
-    })
-}
-
 pub(crate) fn replace_qualified_name(
     expr: Expr,
     cols: &BTreeSet<Column>,
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 027b5ca8dc..25fe4c7b03 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -433,16 +433,16 @@ logical_plan
 08)----------TableScan: t1 projection=[t1_int]
 
 #invalid_scalar_subquery
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during 
planning: Scalar subquery should only return one column, but found 2: t2.t2_id, 
t2.t2_name
+statement error DataFusion error: Invalid \(non-executable\) plan after 
Analyzer\ncaused by\nError during planning: Scalar subquery should only return 
one column, but found 2: t2.t2_id, t2.t2_name
 SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = 
t1.t1_int) FROM t1
 
 #subquery_not_allowed
 #In/Exist Subquery is not allowed in ORDER BY clause.
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during 
planning: In/Exist subquery can only be used in Projection, Filter, Window 
functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN 
\(<subquery>\) ASC NULLS LAST\]
+statement error DataFusion error: Invalid \(non-executable\) plan after 
Analyzer\ncaused by\nError during planning: In/Exist subquery can only be used 
in Projection, Filter, TableScan, Window functions, Aggregate and Join plan 
nodes, but was used in \[Sort: t1.t1_int IN \(<subquery>\) ASC NULLS LAST\]
 SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM 
t2 WHERE t1.t1_id > t1.t1_int)
 
 #non_aggregated_correlated_scalar_subquery
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during 
planning: Correlated scalar subquery must be aggregated to return at most one 
row
+statement error DataFusion error: Invalid \(non-executable\) plan after 
Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be 
aggregated to return at most one row
 SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int 
from t1
 
 #non_aggregated_correlated_scalar_subquery_unique
@@ -456,11 +456,11 @@ SELECT t1_id, (SELECT t3_int FROM t3 WHERE t3.t3_id = 
t1.t1_id) as t3_int from t
 
 
 #non_aggregated_correlated_scalar_subquery
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during 
planning: Correlated scalar subquery must be aggregated to return at most one 
row
+statement error DataFusion error: Invalid \(non-executable\) plan after 
Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be 
aggregated to return at most one row
 SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) 
as t2_int from t1
 
 #non_aggregated_correlated_scalar_subquery_with_limit
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during 
planning: Correlated scalar subquery must be aggregated to return at most one 
row
+statement error DataFusion error: Invalid \(non-executable\) plan after 
Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be 
aggregated to return at most one row
 SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as 
t2_int from t1
 
 #non_aggregated_correlated_scalar_subquery_with_single_row
@@ -523,7 +523,7 @@ logical_plan
 07)--TableScan: t1 projection=[t1_id]
 
 #aggregated_correlated_scalar_subquery_with_extra_group_by_columns
-statement error DataFusion error: check_analyzed_plan\ncaused by\nError during 
planning: A GROUP BY clause in a scalar correlated subquery cannot contain 
non-correlated columns
+statement error DataFusion error: Invalid \(non-executable\) plan after 
Analyzer\ncaused by\nError during planning: A GROUP BY clause in a scalar 
correlated subquery cannot contain non-correlated columns
 SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by 
t2_name) as t2_sum from t1
 
 #support_agg_correlated_columns


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to