This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f52c56b1c7 Support unparsing implicit lateral `UNNEST` plan to SQL 
text (#13824)
f52c56b1c7 is described below

commit f52c56b1c7fd251fd360cb3cefa21d3803bdd733
Author: Jax Liu <[email protected]>
AuthorDate: Wed Dec 25 20:16:47 2024 +0800

    Support unparsing implicit lateral `UNNEST` plan to SQL text (#13824)
    
    * support unparsing the implicit lateral unnest plan
    
    * cargo clippy and fmt
    
    * refactor for `check_unnest_placeholder_with_outer_ref`
    
    * add const for the prefix string of unnest and outer refernece column
---
 datafusion/expr/src/expr.rs               |  9 +++-
 datafusion/sql/src/unparser/plan.rs       | 83 ++++++++++++++++++++++++++-----
 datafusion/sql/src/unparser/rewrite.rs    | 58 +++++++++++++++++++--
 datafusion/sql/src/unparser/utils.rs      | 25 ++++++++++
 datafusion/sql/tests/cases/plan_to_sql.rs | 24 +++++++++
 5 files changed, 181 insertions(+), 18 deletions(-)

diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 79e846e7af..b8e495ee7a 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2536,6 +2536,9 @@ pub fn schema_name_from_sorts(sorts: &[Sort]) -> 
Result<String, fmt::Error> {
     Ok(s)
 }
 
+pub const OUTER_REFERENCE_COLUMN_PREFIX: &str = "outer_ref";
+pub const UNNEST_COLUMN_PREFIX: &str = "UNNEST";
+
 /// Format expressions for display as part of a logical plan. In many cases, 
this will produce
 /// similar output to `Expr.name()` except that column names will be prefixed 
with '#'.
 impl Display for Expr {
@@ -2543,7 +2546,9 @@ impl Display for Expr {
         match self {
             Expr::Alias(Alias { expr, name, .. }) => write!(f, "{expr} AS 
{name}"),
             Expr::Column(c) => write!(f, "{c}"),
-            Expr::OuterReferenceColumn(_, c) => write!(f, "outer_ref({c})"),
+            Expr::OuterReferenceColumn(_, c) => {
+                write!(f, "{OUTER_REFERENCE_COLUMN_PREFIX}({c})")
+            }
             Expr::ScalarVariable(_, var_names) => write!(f, "{}", 
var_names.join(".")),
             Expr::Literal(v) => write!(f, "{v:?}"),
             Expr::Case(case) => {
@@ -2736,7 +2741,7 @@ impl Display for Expr {
             },
             Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
             Expr::Unnest(Unnest { expr }) => {
-                write!(f, "UNNEST({expr})")
+                write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
             }
         }
     }
diff --git a/datafusion/sql/src/unparser/plan.rs 
b/datafusion/sql/src/unparser/plan.rs
index f2d46a9f4c..2574ae5d52 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -33,13 +33,14 @@ use super::{
     Unparser,
 };
 use crate::unparser::ast::UnnestRelationBuilder;
-use crate::unparser::utils::unproject_agg_exprs;
+use crate::unparser::utils::{find_unnest_node_until_relation, 
unproject_agg_exprs};
 use crate::utils::UNNEST_PLACEHOLDER;
 use datafusion_common::{
     internal_err, not_impl_err,
     tree_node::{TransformedResult, TreeNode},
     Column, DataFusionError, Result, TableReference,
 };
+use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
 use datafusion_expr::{
     expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, 
LogicalPlan,
     LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
@@ -235,9 +236,10 @@ impl Unparser<'_> {
         plan: &LogicalPlan,
         relation: &mut RelationBuilder,
         alias: Option<ast::TableAlias>,
+        lateral: bool,
     ) -> Result<()> {
         let mut derived_builder = DerivedRelationBuilder::default();
-        derived_builder.lateral(false).alias(alias).subquery({
+        derived_builder.lateral(lateral).alias(alias).subquery({
             let inner_statement = self.plan_to_sql(plan)?;
             if let ast::Statement::Query(inner_query) = inner_statement {
                 inner_query
@@ -257,15 +259,17 @@ impl Unparser<'_> {
         alias: &str,
         plan: &LogicalPlan,
         relation: &mut RelationBuilder,
+        lateral: bool,
     ) -> Result<()> {
         if self.dialect.requires_derived_table_alias() {
             self.derive(
                 plan,
                 relation,
                 Some(self.new_table_alias(alias.to_string(), vec![])),
+                lateral,
             )
         } else {
-            self.derive(plan, relation, None)
+            self.derive(plan, relation, None, lateral)
         }
     }
 
@@ -317,10 +321,12 @@ impl Unparser<'_> {
                 // Projection can be top-level plan for unnest relation
                 // The projection generated by the `RecursiveUnnestRewriter` 
from a UNNEST relation will have
                 // only one expression, which is the placeholder column 
generated by the rewriter.
-                if self.dialect.unnest_as_table_factor()
-                    && p.expr.len() == 1
-                    && Self::is_unnest_placeholder(&p.expr[0])
-                {
+                let unnest_input_type = if p.expr.len() == 1 {
+                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
+                } else {
+                    None
+                };
+                if self.dialect.unnest_as_table_factor() && 
unnest_input_type.is_some() {
                     if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
                         return self
                             .unnest_to_table_factor_sql(unnest, query, select, 
relation);
@@ -333,6 +339,9 @@ impl Unparser<'_> {
                         "derived_projection",
                         plan,
                         relation,
+                        unnest_input_type
+                            .filter(|t| matches!(t, 
UnnestInputType::OuterReference))
+                            .is_some(),
                     );
                 }
                 self.reconstruct_select_statement(plan, p, select)?;
@@ -365,6 +374,7 @@ impl Unparser<'_> {
                         "derived_limit",
                         plan,
                         relation,
+                        false,
                     );
                 }
                 if let Some(fetch) = &limit.fetch {
@@ -402,6 +412,7 @@ impl Unparser<'_> {
                         "derived_sort",
                         plan,
                         relation,
+                        false,
                     );
                 }
                 let Some(query_ref) = query else {
@@ -472,6 +483,7 @@ impl Unparser<'_> {
                         "derived_distinct",
                         plan,
                         relation,
+                        false,
                     );
                 }
                 let (select_distinct, input) = match distinct {
@@ -658,6 +670,7 @@ impl Unparser<'_> {
                         "derived_union",
                         plan,
                         relation,
+                        false,
                     );
                 }
 
@@ -723,19 +736,54 @@ impl Unparser<'_> {
                     internal_err!("Unnest input is not a Projection: 
{unnest:?}")
                 }
             }
-            _ => not_impl_err!("Unsupported operator: {plan:?}"),
+            LogicalPlan::Subquery(subquery)
+                if find_unnest_node_until_relation(subquery.subquery.as_ref())
+                    .is_some() =>
+            {
+                if self.dialect.unnest_as_table_factor() {
+                    self.select_to_sql_recursively(
+                        subquery.subquery.as_ref(),
+                        query,
+                        select,
+                        relation,
+                    )
+                } else {
+                    self.derive_with_dialect_alias(
+                        "derived_unnest",
+                        subquery.subquery.as_ref(),
+                        relation,
+                        true,
+                    )
+                }
+            }
+            _ => {
+                not_impl_err!("Unsupported operator: {plan:?}")
+            }
         }
     }
 
-    /// Try to find the placeholder column name generated by 
`RecursiveUnnestRewriter`
-    /// Only match the pattern 
`Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
-    fn is_unnest_placeholder(expr: &Expr) -> bool {
+    /// Try to find the placeholder column name generated by 
`RecursiveUnnestRewriter`.
+    ///
+    /// - If the column is a placeholder column match the pattern 
`Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
+    ///     it means it is a scalar column, return [UnnestInputType::Scalar].
+    /// - If the column is a placeholder column match the pattern 
`Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
+    ///     it means it is an outer reference column, return 
[UnnestInputType::OuterReference].
+    /// - If the column is not a placeholder column, return [None].
+    ///
+    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
+    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> 
Option<UnnestInputType> {
         if let Expr::Alias(Alias { expr, .. }) = expr {
             if let Expr::Column(Column { name, .. }) = expr.as_ref() {
-                return name.starts_with(UNNEST_PLACEHOLDER);
+                if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
+                    if prefix.starts_with(&format!("({}(", 
OUTER_REFERENCE_COLUMN_PREFIX))
+                    {
+                        return Some(UnnestInputType::OuterReference);
+                    }
+                    return Some(UnnestInputType::Scalar);
+                }
             }
         }
-        false
+        None
     }
 
     fn unnest_to_table_factor_sql(
@@ -1092,3 +1140,12 @@ impl From<BuilderError> for DataFusionError {
         DataFusionError::External(Box::new(e))
     }
 }
+
+/// The type of the input to the UNNEST table factor.
+#[derive(Debug)]
+enum UnnestInputType {
+    /// The input is a column reference. It will be presented like 
`outer_ref(column_name)`.
+    OuterReference,
+    /// The input is a scalar value. It will be presented like a scalar array 
or struct.
+    Scalar,
+}
diff --git a/datafusion/sql/src/unparser/rewrite.rs 
b/datafusion/sql/src/unparser/rewrite.rs
index 68af121a41..db98374831 100644
--- a/datafusion/sql/src/unparser/rewrite.rs
+++ b/datafusion/sql/src/unparser/rewrite.rs
@@ -23,7 +23,7 @@ use datafusion_common::{
     tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter},
     Column, HashMap, Result, TableReference,
 };
-use datafusion_expr::expr::Alias;
+use datafusion_expr::expr::{Alias, UNNEST_COLUMN_PREFIX};
 use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr};
 use sqlparser::ast::Ident;
 
@@ -190,10 +190,11 @@ pub(super) fn 
rewrite_plan_for_sort_on_non_projected_fields(
     }
 }
 
-/// This logic is to work out the columns and inner query for SubqueryAlias 
plan for both types of
-/// subquery
+/// This logic is to work out the columns and inner query for SubqueryAlias 
plan for some types of
+/// subquery or unnest
 /// - `(SELECT column_a as a from table) AS A`
 /// - `(SELECT column_a from table) AS A (a)`
+/// - `SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)` (see 
[find_unnest_column_alias])
 ///
 /// A roundtrip example for table alias with columns
 ///
@@ -222,6 +223,15 @@ pub(super) fn subquery_alias_inner_query_and_columns(
 ) -> (&LogicalPlan, Vec<Ident>) {
     let plan: &LogicalPlan = subquery_alias.input.as_ref();
 
+    if let LogicalPlan::Subquery(subquery) = plan {
+        let (inner_projection, Some(column)) =
+            find_unnest_column_alias(subquery.subquery.as_ref())
+        else {
+            return (plan, vec![]);
+        };
+        return (inner_projection, vec![Ident::new(column)]);
+    }
+
     let LogicalPlan::Projection(outer_projections) = plan else {
         return (plan, vec![]);
     };
@@ -257,6 +267,48 @@ pub(super) fn subquery_alias_inner_query_and_columns(
     (outer_projections.input.as_ref(), columns)
 }
 
+/// Try to find the column alias for UNNEST in the inner projection.
+/// For example:
+/// ```sql
+///     SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)
+/// ```
+/// The above query will be parsed into the following plan:
+/// ```text
+/// Projection: *
+///   Cross Join:
+///     SubqueryAlias: t1
+///       TableScan: t
+///     SubqueryAlias: u
+///       Subquery:
+///         Projection: UNNEST(outer_ref(t1.c1)) AS c1
+///           Projection: __unnest_placeholder(outer_ref(t1.c1),depth=1) AS 
UNNEST(outer_ref(t1.c1))
+///             Unnest: lists[__unnest_placeholder(outer_ref(t1.c1))|depth=1] 
structs[]
+///               Projection: outer_ref(t1.c1) AS 
__unnest_placeholder(outer_ref(t1.c1))
+///                 EmptyRelation
+/// ```
+/// The function will return the inner projection and the column alias `c1` if 
the column name
+/// starts with `UNNEST(` (the `Display` result of [Expr::Unnest]) in the 
inner projection.
+pub(super) fn find_unnest_column_alias(
+    plan: &LogicalPlan,
+) -> (&LogicalPlan, Option<String>) {
+    if let LogicalPlan::Projection(projection) = plan {
+        if projection.expr.len() != 1 {
+            return (plan, None);
+        }
+        if let Some(Expr::Alias(alias)) = projection.expr.first() {
+            if alias
+                .expr
+                .schema_name()
+                .to_string()
+                .starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
+            {
+                return (projection.input.as_ref(), Some(alias.name.clone()));
+            }
+        }
+    }
+    (plan, None)
+}
+
 /// Injects column aliases into a subquery's logical plan. The function 
searches for a `Projection`
 /// within the given plan, which may be wrapped by other operators (e.g., 
LIMIT, SORT).
 /// If the top-level plan is a `Projection`, it directly injects the column 
aliases.
diff --git a/datafusion/sql/src/unparser/utils.rs 
b/datafusion/sql/src/unparser/utils.rs
index 3a7fa5ddca..f21fb2fcb4 100644
--- a/datafusion/sql/src/unparser/utils.rs
+++ b/datafusion/sql/src/unparser/utils.rs
@@ -89,6 +89,31 @@ pub(crate) fn find_unnest_node_within_select(plan: 
&LogicalPlan) -> Option<&Unne
     }
 }
 
+/// Recursively searches children of [LogicalPlan] to find Unnest node if exist
+/// until encountering a Relation node with single input
+pub(crate) fn find_unnest_node_until_relation(plan: &LogicalPlan) -> 
Option<&Unnest> {
+    // Note that none of the nodes that have a corresponding node can have more
+    // than 1 input node. E.g. Projection / Filter always have 1 input node.
+    let input = plan.inputs();
+    let input = if input.len() > 1 {
+        return None;
+    } else {
+        input.first()?
+    };
+
+    if let LogicalPlan::Unnest(unnest) = input {
+        Some(unnest)
+    } else if let LogicalPlan::TableScan(_) = input {
+        None
+    } else if let LogicalPlan::Subquery(_) = input {
+        None
+    } else if let LogicalPlan::SubqueryAlias(_) = input {
+        None
+    } else {
+        find_unnest_node_within_select(input)
+    }
+}
+
 /// Recursively searches children of [LogicalPlan] to find Window nodes if 
exist
 /// prior to encountering a Join, TableScan, or a nested subquery (derived 
table factor).
 /// If Window node is not found prior to this or at all before reaching the end
diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs 
b/datafusion/sql/tests/cases/plan_to_sql.rs
index 236b59432a..2905ba104c 100644
--- a/datafusion/sql/tests/cases/plan_to_sql.rs
+++ b/datafusion/sql/tests/cases/plan_to_sql.rs
@@ -615,6 +615,30 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
             parser_dialect: Box::new(GenericDialect {}),
             unparser_dialect: 
Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
         },
+        TestStatementWithDialect {
+            sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
+            expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN 
UNNEST(u.array_col)"#,
+            parser_dialect: Box::new(GenericDialect {}),
+            unparser_dialect: 
Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
+        },
+        TestStatementWithDialect {
+            sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 
(c1)",
+            expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN 
UNNEST(u.array_col) AS t1 (c1)"#,
+            parser_dialect: Box::new(GenericDialect {}),
+            unparser_dialect: 
Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
+        },
+        TestStatementWithDialect {
+            sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
+            expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL 
(SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))")"#,
+            parser_dialect: Box::new(GenericDialect {}),
+            unparser_dialect: Box::new(UnparserDefaultDialect {}),
+        },
+        TestStatementWithDialect {
+            sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 
(c1)",
+            expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL 
(SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))") AS t1 (c1)"#,
+            parser_dialect: Box::new(GenericDialect {}),
+            unparser_dialect: Box::new(UnparserDefaultDialect {}),
+        },
     ];
 
     for query in tests {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to