This is an automated email from the ASF dual-hosted git repository.

weijun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b337fbcf78 feat: Attach `Diagnostic` to more than one column errors in 
scalar_subquery and in_subquery (#15143)
b337fbcf78 is described below

commit b337fbcf787ed52a3263b8c7c4b21766b13e3f26
Author: Chang <[email protected]>
AuthorDate: Fri Mar 14 17:18:51 2025 +0800

    feat: Attach `Diagnostic` to more than one column errors in scalar_subquery 
and in_subquery (#15143)
    
    * feat: diagnostic to more than one column error
    
    * test: improve test case
    
    * feat: multiple "in" queries diagnostic
    
    * refactor: reuse code in subquery.rs
    
    * chore: clearer note message
    
    * fix: use plain Spans
    
    * fix: better error messages
    
    * fix: remove the specific subquery type
    
    * fix: use iter_union to create full_span
    
    * fix: CI fail problems
    
    * Update expected error message
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/expr/src/expr_fn.rs                     |  7 +-
 datafusion/expr/src/expr_schema.rs                 |  3 +-
 datafusion/expr/src/logical_plan/plan.rs           | 10 ++-
 datafusion/expr/src/logical_plan/tree_node.rs      |  2 +
 datafusion/optimizer/src/analyzer/type_coercion.rs |  9 +-
 .../optimizer/src/scalar_subquery_to_join.rs       |  8 +-
 datafusion/sql/src/expr/subquery.rs                | 96 ++++++++++++++++++++--
 datafusion/sql/src/relation/mod.rs                 |  4 +-
 datafusion/sql/tests/cases/diagnostic.rs           | 68 +++++++++++++++
 datafusion/sqllogictest/test_files/subquery.slt    |  2 +-
 datafusion/substrait/src/logical_plan/consumer.rs  |  6 +-
 11 files changed, 194 insertions(+), 21 deletions(-)

diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index a8e7fd76d0..c08738c183 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -37,7 +37,7 @@ use arrow::compute::kernels::cast_utils::{
     parse_interval_day_time, parse_interval_month_day_nano, 
parse_interval_year_month,
 };
 use arrow::datatypes::{DataType, Field};
-use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
+use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, 
TableReference};
 use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
 use sqlparser::ast::NullTreatment;
@@ -252,6 +252,7 @@ pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
         subquery: Subquery {
             subquery,
             outer_ref_columns,
+            spans: Spans::new(),
         },
         negated: false,
     })
@@ -264,6 +265,7 @@ pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr {
         subquery: Subquery {
             subquery,
             outer_ref_columns,
+            spans: Spans::new(),
         },
         negated: true,
     })
@@ -277,6 +279,7 @@ pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) 
-> Expr {
         Subquery {
             subquery,
             outer_ref_columns,
+            spans: Spans::new(),
         },
         false,
     ))
@@ -290,6 +293,7 @@ pub fn not_in_subquery(expr: Expr, subquery: 
Arc<LogicalPlan>) -> Expr {
         Subquery {
             subquery,
             outer_ref_columns,
+            spans: Spans::new(),
         },
         true,
     ))
@@ -301,6 +305,7 @@ pub fn scalar_subquery(subquery: Arc<LogicalPlan>) -> Expr {
     Expr::ScalarSubquery(Subquery {
         subquery,
         outer_ref_columns,
+        spans: Spans::new(),
     })
 }
 
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 318640282f..a349c83a49 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -30,7 +30,7 @@ use arrow::compute::can_cast_types;
 use arrow::datatypes::{DataType, Field};
 use datafusion_common::{
     not_impl_err, plan_datafusion_err, plan_err, Column, DataFusionError, 
ExprSchema,
-    Result, TableReference,
+    Result, Spans, TableReference,
 };
 use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
 use datafusion_functions_window_common::field::WindowUDFFieldArgs;
@@ -617,6 +617,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: 
&DataType) -> Result<Subq
     Ok(Subquery {
         subquery: Arc::new(new_plan),
         outer_ref_columns: subquery.outer_ref_columns,
+        spans: Spans::new(),
     })
 }
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 0534b04f5d..0dbce941a8 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -55,7 +55,7 @@ use datafusion_common::tree_node::{
 use datafusion_common::{
     aggregate_functional_dependencies, internal_err, plan_err, Column, 
Constraints,
     DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
-    FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference,
+    FunctionalDependencies, ParamValues, Result, ScalarValue, Spans, 
TableReference,
     UnnestOptions,
 };
 use indexmap::IndexSet;
@@ -939,7 +939,9 @@ impl LogicalPlan {
                 }))
             }
             LogicalPlan::Subquery(Subquery {
-                outer_ref_columns, ..
+                outer_ref_columns,
+                spans,
+                ..
             }) => {
                 self.assert_no_expressions(expr)?;
                 let input = self.only_input(inputs)?;
@@ -947,6 +949,7 @@ impl LogicalPlan {
                 Ok(LogicalPlan::Subquery(Subquery {
                     subquery: Arc::new(subquery),
                     outer_ref_columns: outer_ref_columns.clone(),
+                    spans: spans.clone(),
                 }))
             }
             LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
@@ -3615,6 +3618,8 @@ pub struct Subquery {
     pub subquery: Arc<LogicalPlan>,
     /// The outer references used in the subquery
     pub outer_ref_columns: Vec<Expr>,
+    /// Span information for subquery projection columns
+    pub spans: Spans,
 }
 
 impl Normalizeable for Subquery {
@@ -3649,6 +3654,7 @@ impl Subquery {
         Subquery {
             subquery: plan,
             outer_ref_columns: self.outer_ref_columns.clone(),
+            spans: Spans::new(),
         }
     }
 }
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index dfc18c74c7..87b49a3a2e 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -159,10 +159,12 @@ impl TreeNode for LogicalPlan {
             LogicalPlan::Subquery(Subquery {
                 subquery,
                 outer_ref_columns,
+                spans,
             }) => subquery.map_elements(f)?.update_data(|subquery| {
                 LogicalPlan::Subquery(Subquery {
                     subquery,
                     outer_ref_columns,
+                    spans,
                 })
             }),
             LogicalPlan::SubqueryAlias(SubqueryAlias {
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 538ef98ac7..c9c0b7a3b7 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -311,12 +311,14 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
             Expr::ScalarSubquery(Subquery {
                 subquery,
                 outer_ref_columns,
+                spans,
             }) => {
                 let new_plan =
                     analyze_internal(self.schema, 
Arc::unwrap_or_clone(subquery))?.data;
                 Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
                     subquery: Arc::new(new_plan),
                     outer_ref_columns,
+                    spans,
                 })))
             }
             Expr::Exists(Exists { subquery, negated }) => {
@@ -329,6 +331,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
                     subquery: Subquery {
                         subquery: Arc::new(new_plan),
                         outer_ref_columns: subquery.outer_ref_columns,
+                        spans: subquery.spans,
                     },
                     negated,
                 })))
@@ -352,6 +355,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
                 let new_subquery = Subquery {
                     subquery: Arc::new(new_plan),
                     outer_ref_columns: subquery.outer_ref_columns,
+                    spans: subquery.spans,
                 };
                 Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
                     Box::new(expr.cast_to(&common_type, self.schema)?),
@@ -1049,7 +1053,7 @@ mod test {
     use crate::test::{assert_analyzed_plan_eq, 
assert_analyzed_plan_with_config_eq};
     use datafusion_common::config::ConfigOptions;
     use datafusion_common::tree_node::{TransformedResult, TreeNode};
-    use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
+    use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue, Spans};
     use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
     use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort};
     use datafusion_expr::test::function_stub::avg_udaf;
@@ -2089,6 +2093,7 @@ mod test {
             Subquery {
                 subquery: empty_int32,
                 outer_ref_columns: vec![],
+                spans: Spans::new(),
             },
             false,
         ));
@@ -2114,6 +2119,7 @@ mod test {
             Subquery {
                 subquery: empty_int64,
                 outer_ref_columns: vec![],
+                spans: Spans::new(),
             },
             false,
         ));
@@ -2138,6 +2144,7 @@ mod test {
             Subquery {
                 subquery: empty_inside,
                 outer_ref_columns: vec![],
+                spans: Spans::new(),
             },
             false,
         ));
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 3a8aef267b..33f10400d3 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -731,9 +731,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = "Invalid (non-executable) plan after Analyzer\
-        \ncaused by\
-        \nError during planning: Scalar subquery should only return one 
column";
+        let expected = "Error during planning: Scalar subquery should only 
return one column, but found 4: orders.o_orderkey, orders.o_custkey, 
orders.o_orderstatus, orders.o_totalprice";
         assert_analyzer_check_err(vec![], plan, expected);
         Ok(())
     }
@@ -793,9 +791,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = "Invalid (non-executable) plan after Analyzer\
-        \ncaused by\
-        \nError during planning: Scalar subquery should only return one 
column";
+        let expected = "Error during planning: Scalar subquery should only 
return one column, but found 2: orders.o_custkey, orders.o_orderkey";
         assert_analyzer_check_err(vec![], plan, expected);
         Ok(())
     }
diff --git a/datafusion/sql/src/expr/subquery.rs 
b/datafusion/sql/src/expr/subquery.rs
index 481f024787..225c5d74c2 100644
--- a/datafusion/sql/src/expr/subquery.rs
+++ b/datafusion/sql/src/expr/subquery.rs
@@ -16,12 +16,11 @@
 // under the License.
 
 use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
-use datafusion_common::{DFSchema, Result};
-use datafusion_expr::expr::Exists;
-use datafusion_expr::expr::InSubquery;
-use datafusion_expr::{Expr, Subquery};
+use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans};
+use datafusion_expr::expr::{Exists, InSubquery};
+use datafusion_expr::{Expr, LogicalPlan, Subquery};
 use sqlparser::ast::Expr as SQLExpr;
-use sqlparser::ast::Query;
+use sqlparser::ast::{Query, SelectItem, SetExpr};
 use std::sync::Arc;
 
 impl<S: ContextProvider> SqlToRel<'_, S> {
@@ -41,6 +40,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
             subquery: Subquery {
                 subquery: Arc::new(sub_plan),
                 outer_ref_columns,
+                spans: Spans::new(),
             },
             negated,
         }))
@@ -56,15 +56,37 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
     ) -> Result<Expr> {
         let old_outer_query_schema =
             
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
+
+        let mut spans = Spans::new();
+        if let SetExpr::Select(select) = subquery.body.as_ref() {
+            for item in &select.projection {
+                if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = 
item {
+                    if let Some(span) = 
Span::try_from_sqlparser_span(ident.span) {
+                        spans.add_span(span);
+                    }
+                }
+            }
+        }
+
         let sub_plan = self.query_to_plan(subquery, planner_context)?;
         let outer_ref_columns = sub_plan.all_out_ref_exprs();
         planner_context.set_outer_query_schema(old_outer_query_schema);
-        let expr = Box::new(self.sql_to_expr(expr, input_schema, 
planner_context)?);
+
+        self.validate_single_column(
+            &sub_plan,
+            spans.clone(),
+            "Too many columns! The subquery should only return one column",
+            "Select only one column in the subquery",
+        )?;
+
+        let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?;
+
         Ok(Expr::InSubquery(InSubquery::new(
-            expr,
+            Box::new(expr_obj),
             Subquery {
                 subquery: Arc::new(sub_plan),
                 outer_ref_columns,
+                spans,
             },
             negated,
         )))
@@ -78,12 +100,72 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
     ) -> Result<Expr> {
         let old_outer_query_schema =
             
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
+        let mut spans = Spans::new();
+        if let SetExpr::Select(select) = subquery.body.as_ref() {
+            for item in &select.projection {
+                if let SelectItem::ExprWithAlias { alias, .. } = item {
+                    if let Some(span) = 
Span::try_from_sqlparser_span(alias.span) {
+                        spans.add_span(span);
+                    }
+                }
+            }
+        }
         let sub_plan = self.query_to_plan(subquery, planner_context)?;
         let outer_ref_columns = sub_plan.all_out_ref_exprs();
         planner_context.set_outer_query_schema(old_outer_query_schema);
+
+        self.validate_single_column(
+            &sub_plan,
+            spans.clone(),
+            "Too many columns! The subquery should only return one column",
+            "Select only one column in the subquery",
+        )?;
+
         Ok(Expr::ScalarSubquery(Subquery {
             subquery: Arc::new(sub_plan),
             outer_ref_columns,
+            spans,
         }))
     }
+
+    fn validate_single_column(
+        &self,
+        sub_plan: &LogicalPlan,
+        spans: Spans,
+        error_message: &str,
+        help_message: &str,
+    ) -> Result<()> {
+        if sub_plan.schema().fields().len() > 1 {
+            let sub_schema = sub_plan.schema();
+            let field_names = sub_schema.field_names();
+
+            plan_err!("{}: {}", error_message, field_names.join(", 
")).map_err(|err| {
+                let diagnostic = self.build_multi_column_diagnostic(
+                    spans,
+                    error_message,
+                    help_message,
+                );
+                err.with_diagnostic(diagnostic)
+            })
+        } else {
+            Ok(())
+        }
+    }
+
+    fn build_multi_column_diagnostic(
+        &self,
+        spans: Spans,
+        error_message: &str,
+        help_message: &str,
+    ) -> Diagnostic {
+        let full_span = Span::union_iter(spans.0.iter().cloned());
+        let mut diagnostic = Diagnostic::new_error(error_message, full_span);
+
+        for (i, span) in spans.iter().skip(1).enumerate() {
+            diagnostic.add_note(format!("Extra column {}", i + 1), 
Some(*span));
+        }
+
+        diagnostic.add_help(help_message, None);
+        diagnostic
+    }
 }
diff --git a/datafusion/sql/src/relation/mod.rs 
b/datafusion/sql/src/relation/mod.rs
index 800dd151a1..8078261d91 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -21,7 +21,7 @@ use crate::planner::{ContextProvider, PlannerContext, 
SqlToRel};
 
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{
-    not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, TableReference,
+    not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, 
TableReference,
 };
 use datafusion_expr::builder::subquery_alias;
 use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
@@ -211,6 +211,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                     LogicalPlan::Subquery(Subquery {
                         subquery: input,
                         outer_ref_columns,
+                        spans: Spans::new(),
                     }),
                     alias,
                 )
@@ -218,6 +219,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
             plan => Ok(LogicalPlan::Subquery(Subquery {
                 subquery: Arc::new(plan),
                 outer_ref_columns,
+                spans: Spans::new(),
             })),
         }
     }
diff --git a/datafusion/sql/tests/cases/diagnostic.rs 
b/datafusion/sql/tests/cases/diagnostic.rs
index d70484f718..5481f046e7 100644
--- a/datafusion/sql/tests/cases/diagnostic.rs
+++ b/datafusion/sql/tests/cases/diagnostic.rs
@@ -286,3 +286,71 @@ fn test_invalid_function() -> Result<()> {
     assert_eq!(diag.span, Some(spans["whole"]));
     Ok(())
 }
+#[test]
+fn test_scalar_subquery_multiple_columns() -> Result<(), Box<dyn 
std::error::Error>> {
+    let query = "SELECT (SELECT 1 AS /*x*/x/*x*/, 2 AS /*y*/y/*y*/) AS col";
+    let spans = get_spans(query);
+    let diag = do_query(query);
+
+    assert_eq!(
+        diag.message,
+        "Too many columns! The subquery should only return one column"
+    );
+
+    let expected_span = Some(Span {
+        start: spans["x"].start,
+        end: spans["y"].end,
+    });
+    assert_eq!(diag.span, expected_span);
+    assert_eq!(
+        diag.notes
+            .iter()
+            .map(|n| (n.message.as_str(), n.span))
+            .collect::<Vec<_>>(),
+        vec![("Extra column 1", Some(spans["y"]))]
+    );
+    assert_eq!(
+        diag.helps
+            .iter()
+            .map(|h| h.message.as_str())
+            .collect::<Vec<_>>(),
+        vec!["Select only one column in the subquery"]
+    );
+
+    Ok(())
+}
+
+#[test]
+fn test_in_subquery_multiple_columns() -> Result<(), Box<dyn 
std::error::Error>> {
+    // This query uses an IN subquery with multiple columns - this should 
trigger an error
+    let query = "SELECT * FROM person WHERE id IN (SELECT /*id*/id/*id*/, 
/*first*/first_name/*first*/ FROM person)";
+    let spans = get_spans(query);
+    let diag = do_query(query);
+
+    assert_eq!(
+        diag.message,
+        "Too many columns! The subquery should only return one column"
+    );
+
+    let expected_span = Some(Span {
+        start: spans["id"].start,
+        end: spans["first"].end,
+    });
+    assert_eq!(diag.span, expected_span);
+    assert_eq!(
+        diag.notes
+            .iter()
+            .map(|n| (n.message.as_str(), n.span))
+            .collect::<Vec<_>>(),
+        vec![("Extra column 1", Some(spans["first"]))]
+    );
+    assert_eq!(
+        diag.helps
+            .iter()
+            .map(|h| h.message.as_str())
+            .collect::<Vec<_>>(),
+        vec!["Select only one column in the subquery"]
+    );
+
+    Ok(())
+}
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 5a722c2288..4c1565c7f0 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -438,7 +438,7 @@ logical_plan
 08)----------TableScan: t1 projection=[t1_int]
 
 #invalid_scalar_subquery
-statement error DataFusion error: Invalid \(non-executable\) plan after 
Analyzer\ncaused by\nError during planning: Scalar subquery should only return 
one column, but found 2: t2.t2_id, t2.t2_name
+statement error DataFusion error: Error during planning: Too many columns! The 
subquery should only return one column: t2.t2_id, t2.t2_name
 SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = 
t1.t1_int) FROM t1
 
 #subquery_not_allowed
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs 
b/datafusion/substrait/src/logical_plan/consumer.rs
index 60e71ca39d..24eb23ded5 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -24,7 +24,8 @@ use datafusion::arrow::datatypes::{
 };
 use datafusion::common::{
     not_impl_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
-    substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, 
TableReference,
+    substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, Spans,
+    TableReference,
 };
 use datafusion::datasource::provider_as_source;
 use datafusion::logical_expr::expr::{Exists, InSubquery, Sort, 
WindowFunctionParams};
@@ -2280,6 +2281,7 @@ pub async fn from_subquery(
                             subquery: Subquery {
                                 subquery: Arc::new(haystack_expr),
                                 outer_ref_columns: outer_refs,
+                                spans: Spans::new(),
                             },
                             negated: false,
                         }))
@@ -2298,6 +2300,7 @@ pub async fn from_subquery(
                 Ok(Expr::ScalarSubquery(Subquery {
                     subquery: Arc::new(plan),
                     outer_ref_columns,
+                    spans: Spans::new(),
                 }))
             }
             SubqueryType::SetPredicate(predicate) => {
@@ -2313,6 +2316,7 @@ pub async fn from_subquery(
                             Subquery {
                                 subquery: Arc::new(plan),
                                 outer_ref_columns,
+                                spans: Spans::new(),
                             },
                             false,
                         )))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to