This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fc9681b74 feat: Support planning subqueries with OuterReferenceColumn 
belongs to non-adjacent outer relations  (#19930)
8fc9681b74 is described below

commit 8fc9681b7430c2e0203441177a01225a6aaabe32
Author: Michael Kleen <[email protected]>
AuthorDate: Mon Feb 9 14:51:59 2026 +0100

    feat: Support planning subqueries with OuterReferenceColumn belongs to 
non-adjacent outer relations  (#19930)
    
    ## Which issue does this PR close?
    
    Cleaned-up version of https://github.com/apache/datafusion/pull/18806
    with:
    
    - Removed `outer_queries_schema` from PlannerContext
    - Planning logic only (optimizer modifications removed)
    - sql logic tests moved to sql_integration.rs
    
    - Closes https://github.com/apache/datafusion/issues/19816
    
    
    ## Rationale for this change
    
    See https://github.com/apache/datafusion/pull/18806
    
    ## What changes are included in this PR?
    
    See https://github.com/apache/datafusion/pull/18806
    
    ## Are these changes tested?
    
    Yes
    
    ## Are there any user-facing changes?
    
    `outer_queries_schema` is removed from PlannerContext.
    
    ---------
    
    Co-authored-by: Duong Cong Toai <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/expr/src/logical_plan/invariants.rs     |   6 +-
 datafusion/sql/src/expr/identifier.rs              |  55 ++++-----
 datafusion/sql/src/expr/subquery.rs                |  20 ++-
 datafusion/sql/src/planner.rs                      |  49 ++++++--
 datafusion/sql/src/relation/mod.rs                 |  23 ++--
 datafusion/sql/src/select.rs                       |  19 +--
 datafusion/sql/tests/common/mod.rs                 |  16 +++
 datafusion/sql/tests/sql_integration.rs            |  85 +++++++++++--
 datafusion/sqllogictest/test_files/subquery.slt    | 136 +++++++++++++++++++++
 docs/source/library-user-guide/upgrading/53.0.0.md |  23 ++++
 10 files changed, 353 insertions(+), 79 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/invariants.rs 
b/datafusion/expr/src/logical_plan/invariants.rs
index b39b23e30f..0889afd08f 100644
--- a/datafusion/expr/src/logical_plan/invariants.rs
+++ b/datafusion/expr/src/logical_plan/invariants.rs
@@ -208,14 +208,16 @@ pub fn check_subquery_expr(
                     if group_expr.contains(expr) && !aggr_expr.contains(expr) {
                         // TODO revisit this validation logic
                         plan_err!(
-                            "Correlated scalar subquery in the GROUP BY clause 
must also be in the aggregate expressions"
+                            "Correlated scalar subquery in the GROUP BY clause 
must \
+                            also be in the aggregate expressions"
                         )
                     } else {
                         Ok(())
                     }
                 }
                 _ => plan_err!(
-                    "Correlated scalar subquery can only be used in 
Projection, Filter, Aggregate plan nodes"
+                    "Correlated scalar subquery can only be used in 
Projection, \
+                    Filter, Aggregate plan nodes"
                 ),
             }?;
         }
diff --git a/datafusion/sql/src/expr/identifier.rs 
b/datafusion/sql/src/expr/identifier.rs
index 34fbe2edf8..cca09df0db 100644
--- a/datafusion/sql/src/expr/identifier.rs
+++ b/datafusion/sql/src/expr/identifier.rs
@@ -76,15 +76,16 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
             }
 
             // Check the outer query schema
-            if let Some(outer) = planner_context.outer_query_schema()
-                && let Ok((qualifier, field)) =
+            for outer in planner_context.outer_schemas_iter() {
+                if let Ok((qualifier, field)) =
                     
outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
-            {
-                // Found an exact match on a qualified name in the outer plan 
schema, so this is an outer reference column
-                return Ok(Expr::OuterReferenceColumn(
-                    Arc::clone(field),
-                    Column::from((qualifier, field)),
-                ));
+                {
+                    // Found an exact match on a qualified name in the outer 
plan schema, so this is an outer reference column
+                    return Ok(Expr::OuterReferenceColumn(
+                        Arc::clone(field),
+                        Column::from((qualifier, field)),
+                    ));
+                }
             }
 
             // Default case
@@ -172,14 +173,14 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                         not_impl_err!("compound identifier: {ids:?}")
                     } else {
                         // Check the outer_query_schema and try to find a match
-                        if let Some(outer) = 
planner_context.outer_query_schema() {
+                        for outer in planner_context.outer_schemas_iter() {
                             let search_result = search_dfschema(&ids, outer);
-                            match search_result {
+                            let result = match search_result {
                                 // Found matching field with spare 
identifier(s) for nested field(s) in structure
                                 Some((field, qualifier, nested_names))
                                     if !nested_names.is_empty() =>
                                 {
-                                    // TODO: remove when can support nested 
identifiers for OuterReferenceColumn
+                                    // TODO: remove this when we have support 
for nested identifiers for OuterReferenceColumn
                                     not_impl_err!(
                                         "Nested identifiers are not yet 
supported for OuterReferenceColumn {}",
                                         Column::from((qualifier, field))
@@ -195,26 +196,20 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                                     ))
                                 }
                                 // Found no matching field, will return a 
default
-                                None => {
-                                    let s = &ids[0..ids.len()];
-                                    // safe unwrap as s can never be empty or 
exceed the bounds
-                                    let (relation, column_name) =
-                                        form_identifier(s).unwrap();
-                                    Ok(Expr::Column(Column::new(relation, 
column_name)))
-                                }
-                            }
-                        } else {
-                            let s = &ids[0..ids.len()];
-                            // Safe unwrap as s can never be empty or exceed 
the bounds
-                            let (relation, column_name) = 
form_identifier(s).unwrap();
-                            let mut column = Column::new(relation, 
column_name);
-                            if self.options.collect_spans
-                                && let Some(span) = ids_span
-                            {
-                                column.spans_mut().add_span(span);
-                            }
-                            Ok(Expr::Column(column))
+                                None => continue,
+                            };
+                            return result;
+                        }
+                        // Safe unwrap as column name can never be empty or 
exceed the bounds
+                        let (relation, column_name) =
+                            form_identifier(&ids[0..ids.len()]).unwrap();
+                        let mut column = Column::new(relation, column_name);
+                        if self.options.collect_spans
+                            && let Some(span) = ids_span
+                        {
+                            column.spans_mut().add_span(span);
                         }
+                        Ok(Expr::Column(column))
                     }
                 }
             }
diff --git a/datafusion/sql/src/expr/subquery.rs 
b/datafusion/sql/src/expr/subquery.rs
index 6837b2671c..662c44f6f2 100644
--- a/datafusion/sql/src/expr/subquery.rs
+++ b/datafusion/sql/src/expr/subquery.rs
@@ -31,11 +31,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         input_schema: &DFSchema,
         planner_context: &mut PlannerContext,
     ) -> Result<Expr> {
-        let old_outer_query_schema =
-            
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
+        planner_context.append_outer_query_schema(input_schema.clone().into());
         let sub_plan = self.query_to_plan(subquery, planner_context)?;
         let outer_ref_columns = sub_plan.all_out_ref_exprs();
-        planner_context.set_outer_query_schema(old_outer_query_schema);
+        planner_context.pop_outer_query_schema();
         Ok(Expr::Exists(Exists {
             subquery: Subquery {
                 subquery: Arc::new(sub_plan),
@@ -54,8 +53,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         input_schema: &DFSchema,
         planner_context: &mut PlannerContext,
     ) -> Result<Expr> {
-        let old_outer_query_schema =
-            
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
+        
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
 
         let mut spans = Spans::new();
         if let SetExpr::Select(select) = &subquery.body.as_ref() {
@@ -70,7 +68,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
 
         let sub_plan = self.query_to_plan(subquery, planner_context)?;
         let outer_ref_columns = sub_plan.all_out_ref_exprs();
-        planner_context.set_outer_query_schema(old_outer_query_schema);
+        planner_context.pop_outer_query_schema();
 
         self.validate_single_column(
             &sub_plan,
@@ -98,8 +96,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         input_schema: &DFSchema,
         planner_context: &mut PlannerContext,
     ) -> Result<Expr> {
-        let old_outer_query_schema =
-            
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
+        
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
         let mut spans = Spans::new();
         if let SetExpr::Select(select) = subquery.body.as_ref() {
             for item in &select.projection {
@@ -112,7 +109,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         }
         let sub_plan = self.query_to_plan(subquery, planner_context)?;
         let outer_ref_columns = sub_plan.all_out_ref_exprs();
-        planner_context.set_outer_query_schema(old_outer_query_schema);
+        planner_context.pop_outer_query_schema();
 
         self.validate_single_column(
             &sub_plan,
@@ -172,8 +169,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         input_schema: &DFSchema,
         planner_context: &mut PlannerContext,
     ) -> Result<Expr> {
-        let old_outer_query_schema =
-            
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
+        
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
 
         let mut spans = Spans::new();
         if let SetExpr::Select(select) = subquery.body.as_ref() {
@@ -188,7 +184,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
 
         let sub_plan = self.query_to_plan(subquery, planner_context)?;
         let outer_ref_columns = sub_plan.all_out_ref_exprs();
-        planner_context.set_outer_query_schema(old_outer_query_schema);
+        planner_context.pop_outer_query_schema();
 
         self.validate_single_column(
             &sub_plan,
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 520a2d55ef..dd63cfce5e 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -261,8 +261,10 @@ pub struct PlannerContext {
     /// Map of CTE name to logical plan of the WITH clause.
     /// Use `Arc<LogicalPlan>` to allow cheap cloning
     ctes: HashMap<String, Arc<LogicalPlan>>,
-    /// The query schema of the outer query plan, used to resolve the columns 
in subquery
-    outer_query_schema: Option<DFSchemaRef>,
+
+    /// The queries schemas of outer query relations, used to resolve the 
outer referenced
+    /// columns in subquery (recursive aware)
+    outer_queries_schemas_stack: Vec<DFSchemaRef>,
     /// The joined schemas of all FROM clauses planned so far. When planning 
LATERAL
     /// FROM clauses, this should become a suffix of the `outer_query_schema`.
     outer_from_schema: Option<DFSchemaRef>,
@@ -282,7 +284,7 @@ impl PlannerContext {
         Self {
             prepare_param_data_types: Arc::new(vec![]),
             ctes: HashMap::new(),
-            outer_query_schema: None,
+            outer_queries_schemas_stack: vec![],
             outer_from_schema: None,
             create_table_schema: None,
         }
@@ -297,19 +299,42 @@ impl PlannerContext {
         self
     }
 
-    // Return a reference to the outer query's schema
-    pub fn outer_query_schema(&self) -> Option<&DFSchema> {
-        self.outer_query_schema.as_ref().map(|s| s.as_ref())
+    /// Return the stack of outer relations' schemas, the outer most
+    /// relation are at the first entry
+    pub fn outer_queries_schemas(&self) -> &[DFSchemaRef] {
+        &self.outer_queries_schemas_stack
+    }
+
+    /// Return an iterator of the subquery relations' schemas, innermost
+    /// relation is returned first.
+    ///
+    /// This order corresponds to the order of resolution when looking up 
column
+    /// references in subqueries, which start from the innermost relation and
+    /// then look up the outer relations one by one until a match is found or 
no
+    /// more outer relation exist.
+    ///
+    /// NOTE this is *REVERSED* order of [`Self::outer_queries_schemas`]
+    ///
+    /// This is useful to resolve the column reference in the subquery by
+    /// looking up the outer query schemas one by one.
+    pub fn outer_schemas_iter(&self) -> impl Iterator<Item = &DFSchemaRef> {
+        self.outer_queries_schemas_stack.iter().rev()
     }
 
     /// Sets the outer query schema, returning the existing one, if
     /// any
-    pub fn set_outer_query_schema(
-        &mut self,
-        mut schema: Option<DFSchemaRef>,
-    ) -> Option<DFSchemaRef> {
-        std::mem::swap(&mut self.outer_query_schema, &mut schema);
-        schema
+    pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
+        self.outer_queries_schemas_stack.push(schema);
+    }
+
+    /// The schema of the adjacent outer relation
+    pub fn latest_outer_query_schema(&self) -> Option<&DFSchemaRef> {
+        self.outer_queries_schemas_stack.last()
+    }
+
+    /// Remove the schema of the adjacent outer relation
+    pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
+        self.outer_queries_schemas_stack.pop()
     }
 
     pub fn set_table_schema(
diff --git a/datafusion/sql/src/relation/mod.rs 
b/datafusion/sql/src/relation/mod.rs
index cef3726c62..6558763ca4 100644
--- a/datafusion/sql/src/relation/mod.rs
+++ b/datafusion/sql/src/relation/mod.rs
@@ -262,9 +262,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
             } => {
                 let tbl_func_ref = self.object_name_to_table_reference(name)?;
                 let schema = planner_context
-                    .outer_query_schema()
+                    .outer_queries_schemas()
+                    .last()
                     .cloned()
-                    .unwrap_or_else(DFSchema::empty);
+                    .unwrap_or_else(|| Arc::new(DFSchema::empty()));
                 let func_args = args
                     .into_iter()
                     .map(|arg| match arg {
@@ -310,20 +311,24 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         let old_from_schema = planner_context
             .set_outer_from_schema(None)
             .unwrap_or_else(|| Arc::new(DFSchema::empty()));
-        let new_query_schema = match planner_context.outer_query_schema() {
-            Some(old_query_schema) => {
+        let outer_query_schema = planner_context.pop_outer_query_schema();
+        let new_query_schema = match outer_query_schema {
+            Some(ref old_query_schema) => {
                 let mut new_query_schema = old_from_schema.as_ref().clone();
-                new_query_schema.merge(old_query_schema);
-                Some(Arc::new(new_query_schema))
+                new_query_schema.merge(old_query_schema.as_ref());
+                Arc::new(new_query_schema)
             }
-            None => Some(Arc::clone(&old_from_schema)),
+            None => Arc::clone(&old_from_schema),
         };
-        let old_query_schema = 
planner_context.set_outer_query_schema(new_query_schema);
+        planner_context.append_outer_query_schema(new_query_schema);
 
         let plan = self.create_relation(subquery, planner_context)?;
         let outer_ref_columns = plan.all_out_ref_exprs();
 
-        planner_context.set_outer_query_schema(old_query_schema);
+        planner_context.pop_outer_query_schema();
+        if let Some(schema) = outer_query_schema {
+            planner_context.append_outer_query_schema(schema);
+        }
         planner_context.set_outer_from_schema(Some(old_from_schema));
 
         // We can omit the subquery wrapper if there are no columns
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 1d6ccde6be..28e7ac2f20 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -29,7 +29,7 @@ use crate::utils::{
 
 use datafusion_common::error::DataFusionErrorBuilder;
 use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
-use datafusion_common::{Column, Result, not_impl_err, plan_err};
+use datafusion_common::{Column, DFSchema, Result, not_impl_err, plan_err};
 use datafusion_common::{RecursionUnnestOption, UnnestOptions};
 use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
 use datafusion_expr::expr_rewriter::{
@@ -637,11 +637,6 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         match selection {
             Some(predicate_expr) => {
                 let fallback_schemas = plan.fallback_normalize_schemas();
-                let outer_query_schema = 
planner_context.outer_query_schema().cloned();
-                let outer_query_schema_vec = outer_query_schema
-                    .as_ref()
-                    .map(|schema| vec![schema])
-                    .unwrap_or_else(Vec::new);
 
                 let filter_expr =
                     self.sql_to_expr(predicate_expr, plan.schema(), 
planner_context)?;
@@ -657,9 +652,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
 
                 let mut using_columns = HashSet::new();
                 expr_to_columns(&filter_expr, &mut using_columns)?;
+                let mut schema_stack: Vec<Vec<&DFSchema>> =
+                    vec![vec![plan.schema()], fallback_schemas];
+                for sc in planner_context.outer_schemas_iter() {
+                    schema_stack.push(vec![sc.as_ref()]);
+                }
+
                 let filter_expr = 
normalize_col_with_schemas_and_ambiguity_check(
                     filter_expr,
-                    &[&[plan.schema()], &fallback_schemas, 
&outer_query_schema_vec],
+                    schema_stack
+                        .iter()
+                        .map(|sc| sc.as_slice())
+                        .collect::<Vec<&[&DFSchema]>>()
+                        .as_slice(),
                     &[using_columns],
                 )?;
 
diff --git a/datafusion/sql/tests/common/mod.rs 
b/datafusion/sql/tests/common/mod.rs
index 9dc6b895e4..4b8667c3c0 100644
--- a/datafusion/sql/tests/common/mod.rs
+++ b/datafusion/sql/tests/common/mod.rs
@@ -161,12 +161,26 @@ impl ContextProvider for MockContextProvider {
             ])),
             "orders" => Ok(Schema::new(vec![
                 Field::new("order_id", DataType::UInt32, false),
+                Field::new("o_orderkey", DataType::UInt32, false),
+                Field::new("o_custkey", DataType::UInt32, false),
+                Field::new("o_orderstatus", DataType::Utf8, false),
                 Field::new("customer_id", DataType::UInt32, false),
+                Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
                 Field::new("o_item_id", DataType::Utf8, false),
                 Field::new("qty", DataType::Int32, false),
                 Field::new("price", DataType::Float64, false),
                 Field::new("delivered", DataType::Boolean, false),
             ])),
+            "customer" => Ok(Schema::new(vec![
+                Field::new("c_custkey", DataType::UInt32, false),
+                Field::new("c_name", DataType::Utf8, false),
+                Field::new("c_address", DataType::Utf8, false),
+                Field::new("c_nationkey", DataType::UInt32, false),
+                Field::new("c_phone", DataType::Utf8, false),
+                Field::new("c_acctbal", DataType::Float64, false),
+                Field::new("c_mktsegment", DataType::Utf8, false),
+                Field::new("c_comment", DataType::Utf8, false),
+            ])),
             "array" => Ok(Schema::new(vec![
                 Field::new(
                     "left",
@@ -186,8 +200,10 @@ impl ContextProvider for MockContextProvider {
                 ),
             ])),
             "lineitem" => Ok(Schema::new(vec![
+                Field::new("l_orderkey", DataType::UInt32, false),
                 Field::new("l_item_id", DataType::UInt32, false),
                 Field::new("l_description", DataType::Utf8, false),
+                Field::new("l_extendedprice", DataType::Decimal128(15, 2), 
false),
                 Field::new("price", DataType::Float64, false),
             ])),
             "aggregate_test_100" => Ok(Schema::new(vec![
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index aaf0b0ae30..444bdae73a 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -995,15 +995,15 @@ fn select_nested_with_filters() {
 
 #[test]
 fn table_with_column_alias() {
-    let sql = "SELECT a, b, c
-                   FROM lineitem l (a, b, c)";
+    let sql = "SELECT a, b, c, d, e
+                   FROM lineitem l (a, b, c, d, e)";
     let plan = logical_plan(sql).unwrap();
     assert_snapshot!(
         plan,
         @r"
-    Projection: l.a, l.b, l.c
+    Projection: l.a, l.b, l.c, l.d, l.e
       SubqueryAlias: l
-        Projection: lineitem.l_item_id AS a, lineitem.l_description AS b, 
lineitem.price AS c
+        Projection: lineitem.l_orderkey AS a, lineitem.l_item_id AS b, 
lineitem.l_description AS c, lineitem.l_extendedprice AS d, lineitem.price AS e
           TableScan: lineitem
     "
     );
@@ -1017,7 +1017,7 @@ fn table_with_column_alias_number_cols() {
 
     assert_snapshot!(
         err.strip_backtrace(),
-        @"Error during planning: Source table contains 3 columns but only 2 
names given as column alias"
+        @"Error during planning: Source table contains 5 columns but only 2 
names given as column alias"
     );
 }
 
@@ -1058,7 +1058,7 @@ fn natural_left_join() {
         plan,
         @r"
     Projection: a.l_item_id
-      Left Join: Using a.l_item_id = b.l_item_id, a.l_description = 
b.l_description, a.price = b.price
+      Left Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = b.l_item_id, 
a.l_description = b.l_description, a.l_extendedprice = b.l_extendedprice, 
a.price = b.price
         SubqueryAlias: a
           TableScan: lineitem
         SubqueryAlias: b
@@ -1075,7 +1075,7 @@ fn natural_right_join() {
         plan,
         @r"
     Projection: a.l_item_id
-      Right Join: Using a.l_item_id = b.l_item_id, a.l_description = 
b.l_description, a.price = b.price
+      Right Join: Using a.l_orderkey = b.l_orderkey, a.l_item_id = 
b.l_item_id, a.l_description = b.l_description, a.l_extendedprice = 
b.l_extendedprice, a.price = b.price
         SubqueryAlias: a
           TableScan: lineitem
         SubqueryAlias: b
@@ -4801,7 +4801,11 @@ fn test_using_join_wildcard_schema() {
     // Only columns from one join side should be present
     let expected_fields = vec![
         "o1.order_id".to_string(),
+        "o1.o_orderkey".to_string(),
+        "o1.o_custkey".to_string(),
+        "o1.o_orderstatus".to_string(),
         "o1.customer_id".to_string(),
+        "o1.o_totalprice".to_string(),
         "o1.o_item_id".to_string(),
         "o1.qty".to_string(),
         "o1.price".to_string(),
@@ -4855,3 +4859,70 @@ fn test_using_join_wildcard_schema() {
         ]
     );
 }
+
+#[test]
+fn 
test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation()
+{
+    let sql = "SELECT * FROM j1 j1_outer, LATERAL (
+    SELECT * FROM j1 j1_inner, LATERAL (
+        SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id
+    ) as j2
+) as j2";
+
+    let plan = logical_plan(sql).unwrap();
+    assert_snapshot!(
+         plan,
+         @r#"
+Projection: j1_outer.j1_id, j1_outer.j1_string, j2.j1_id, j2.j1_string, 
j2.j2_id, j2.j2_string
+  Cross Join:
+    SubqueryAlias: j1_outer
+      TableScan: j1
+    SubqueryAlias: j2
+      Subquery:
+        Projection: j1_inner.j1_id, j1_inner.j1_string, j2.j2_id, j2.j2_string
+          Cross Join:
+            SubqueryAlias: j1_inner
+              TableScan: j1
+            SubqueryAlias: j2
+              Subquery:
+                Projection: j2.j2_id, j2.j2_string
+                  Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND 
outer_ref(j1_outer.j1_id) = j2.j2_id
+                    TableScan: j2
+"#
+    );
+}
+
+#[test]
+fn 
test_correlated_recursive_scalar_subquery_with_level_3_scalar_subquery_referencing_level1_relation()
+ {
+    let sql = "select c_custkey from customer
+            where c_acctbal < (
+            select sum(o_totalprice) from orders
+            where o_custkey = c_custkey
+            and o_totalprice < (
+            select sum(l_extendedprice) as price from lineitem where 
l_orderkey = o_orderkey
+            and l_extendedprice < c_acctbal
+        )
+        ) order by c_custkey";
+
+    let plan = logical_plan(sql).unwrap();
+    assert_snapshot!(
+         plan,
+         @r#"
+Sort: customer.c_custkey ASC NULLS LAST
+  Projection: customer.c_custkey
+    Filter: customer.c_acctbal < (<subquery>)
+      Subquery:
+        Projection: sum(orders.o_totalprice)
+          Aggregate: groupBy=[[]], aggr=[[sum(orders.o_totalprice)]]
+            Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND 
orders.o_totalprice < (<subquery>)
+              Subquery:
+                Projection: sum(lineitem.l_extendedprice) AS price
+                  Aggregate: groupBy=[[]], 
aggr=[[sum(lineitem.l_extendedprice)]]
+                    Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey) 
AND lineitem.l_extendedprice < outer_ref(customer.c_acctbal)
+                      TableScan: lineitem
+              TableScan: orders
+      TableScan: customer
+"#
+    );
+}
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index e73f4ec3e3..9c7c2ddb5d 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1528,3 +1528,139 @@ logical_plan
 20)--------SubqueryAlias: set_cmp_s
 21)----------Projection: column1 AS v
 22)------------Values: (Int64(5)), (Int64(NULL))
+
+# 
correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation
+query TT
+explain select c_custkey from customer
+where c_acctbal < (
+    select sum(o_totalprice) from orders
+    where o_custkey = c_custkey
+    and exists (
+        select * from lineitem where l_orderkey = o_orderkey
+        and l_extendedprice < c_acctbal
+    )
+) order by c_custkey;
+----
+logical_plan
+01)Sort: customer.c_custkey ASC NULLS LAST
+02)--Projection: customer.c_custkey
+03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: 
CAST(customer.c_acctbal AS Decimal128(25, 2)) < 
__scalar_sq_2.sum(orders.o_totalprice)
+04)------TableScan: customer projection=[c_custkey, c_acctbal]
+05)------SubqueryAlias: __scalar_sq_2
+06)--------Projection: sum(orders.o_totalprice), orders.o_custkey
+07)----------Aggregate: groupBy=[[orders.o_custkey]], 
aggr=[[sum(orders.o_totalprice)]]
+08)------------Projection: orders.o_custkey, orders.o_totalprice
+09)--------------LeftSemi Join: orders.o_orderkey = 
__correlated_sq_1.l_orderkey Filter: __correlated_sq_1.l_extendedprice < 
customer.c_acctbal
+10)----------------TableScan: orders projection=[o_orderkey, o_custkey, 
o_totalprice]
+11)----------------SubqueryAlias: __correlated_sq_1
+12)------------------TableScan: lineitem projection=[l_orderkey, 
l_extendedprice]
+
+# 
correlated_recursive_scalar_subquery_with_level_3_in_subquery_referencing_level1_relation
+query TT
+explain select c_custkey from customer
+where c_acctbal < (
+    select sum(o_totalprice) from orders
+    where o_custkey = c_custkey
+    and o_totalprice in (
+        select l_extendedprice as price from lineitem where l_orderkey = 
o_orderkey
+        and l_extendedprice < c_acctbal
+    )
+) order by c_custkey;
+----
+logical_plan
+01)Sort: customer.c_custkey ASC NULLS LAST
+02)--Projection: customer.c_custkey
+03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: 
CAST(customer.c_acctbal AS Decimal128(25, 2)) < 
__scalar_sq_2.sum(orders.o_totalprice)
+04)------TableScan: customer projection=[c_custkey, c_acctbal]
+05)------SubqueryAlias: __scalar_sq_2
+06)--------Projection: sum(orders.o_totalprice), orders.o_custkey
+07)----------Aggregate: groupBy=[[orders.o_custkey]], 
aggr=[[sum(orders.o_totalprice)]]
+08)------------Projection: orders.o_custkey, orders.o_totalprice
+09)--------------LeftSemi Join: orders.o_totalprice = __correlated_sq_1.price, 
orders.o_orderkey = __correlated_sq_1.l_orderkey Filter: 
__correlated_sq_1.l_extendedprice < customer.c_acctbal
+10)----------------TableScan: orders projection=[o_orderkey, o_custkey, 
o_totalprice]
+11)----------------SubqueryAlias: __correlated_sq_1
+12)------------------Projection: lineitem.l_extendedprice AS price, 
lineitem.l_extendedprice, lineitem.l_orderkey
+13)--------------------TableScan: lineitem projection=[l_orderkey, 
l_extendedprice]
+
+# Setup tables for recursive correlation tests
+statement ok
+CREATE TABLE employees (
+    employee_id INTEGER,
+    employee_name VARCHAR,
+    dept_id INTEGER,
+    salary DECIMAL
+);
+
+statement ok
+CREATE TABLE project_assignments (
+    project_id INTEGER,
+    employee_id INTEGER,
+    priority INTEGER
+);
+
+# Provided recursive scalar subquery explain case
+query TT
+EXPLAIN SELECT e1.employee_name, e1.salary
+FROM employees e1
+WHERE e1.salary > (
+    SELECT AVG(e2.salary)
+    FROM employees e2
+    WHERE e2.dept_id = e1.dept_id
+    AND e2.salary > (
+        SELECT AVG(e3.salary)
+        FROM employees e3
+        WHERE e3.dept_id = e1.dept_id
+    )
+);
+----
+logical_plan
+01)Projection: e1.employee_name, e1.salary
+02)--Inner Join: e1.dept_id = __scalar_sq_1.dept_id Filter: CAST(e1.salary AS 
Decimal128(38, 14)) > __scalar_sq_1.avg(e2.salary)
+03)----SubqueryAlias: e1
+04)------TableScan: employees projection=[employee_name, dept_id, salary]
+05)----SubqueryAlias: __scalar_sq_1
+06)------Projection: avg(e2.salary), e2.dept_id
+07)--------Aggregate: groupBy=[[e2.dept_id]], aggr=[[avg(e2.salary)]]
+08)----------Projection: e2.dept_id, e2.salary
+09)------------Inner Join:  Filter: CAST(e2.salary AS Decimal128(38, 14)) > 
__scalar_sq_2.avg(e3.salary) AND __scalar_sq_2.dept_id = e1.dept_id
+10)--------------SubqueryAlias: e2
+11)----------------TableScan: employees projection=[dept_id, salary]
+12)--------------SubqueryAlias: __scalar_sq_2
+13)----------------Projection: avg(e3.salary), e3.dept_id
+14)------------------Aggregate: groupBy=[[e3.dept_id]], aggr=[[avg(e3.salary)]]
+15)--------------------SubqueryAlias: e3
+16)----------------------TableScan: employees projection=[dept_id, salary]
+
+# Check shadowing: `dept_id` should resolve to the nearest outer relation 
(`e2`)
+# in the innermost subquery rather than the outermost
+query TT
+EXPLAIN SELECT e1.employee_id
+FROM employees e1
+WHERE EXISTS (
+    SELECT 1
+    FROM employees e2
+    WHERE EXISTS (
+        SELECT 1
+        FROM project_assignments p
+        WHERE p.project_id = dept_id
+    )
+);
+----
+logical_plan
+01)LeftSemi Join:
+02)--SubqueryAlias: e1
+03)----TableScan: employees projection=[employee_id]
+04)--SubqueryAlias: __correlated_sq_2
+05)----Projection:
+06)------LeftSemi Join: e2.dept_id = __correlated_sq_1.project_id
+07)--------SubqueryAlias: e2
+08)----------TableScan: employees projection=[dept_id]
+09)--------SubqueryAlias: __correlated_sq_1
+10)----------SubqueryAlias: p
+11)------------TableScan: project_assignments projection=[project_id]
+
+statement count 0
+drop table employees;
+
+statement count 0
+drop table project_assignments;
diff --git a/docs/source/library-user-guide/upgrading/53.0.0.md 
b/docs/source/library-user-guide/upgrading/53.0.0.md
index 4716a5734a..06c917b2ab 100644
--- a/docs/source/library-user-guide/upgrading/53.0.0.md
+++ b/docs/source/library-user-guide/upgrading/53.0.0.md
@@ -28,6 +28,29 @@
 
 [#19692]: https://github.com/apache/datafusion/issues/19692
 
+### `PlannerContext` outer query schema API now uses a stack
+
+`PlannerContext` no longer stores a single `outer_query_schema`. It now tracks 
a
+stack of outer relation schemas so nested subqueries can access non-adjacent
+outer relations.
+
+**Before:**
+
+```rust,ignore
+let old_outer_query_schema =
+    planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
+let sub_plan = self.query_to_plan(subquery, planner_context)?;
+planner_context.set_outer_query_schema(old_outer_query_schema);
+```
+
+**After:**
+
+```rust,ignore
+planner_context.append_outer_query_schema(input_schema.clone().into());
+let sub_plan = self.query_to_plan(subquery, planner_context)?;
+planner_context.pop_outer_query_schema();
+```
+
 ### `FileSinkConfig` adds `file_output_mode`
 
 `FileSinkConfig` now includes a `file_output_mode: FileOutputMode` field to 
control


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to