This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 219de5fba6 impl agg and subquery plans to sql (#9606)
219de5fba6 is described below

commit 219de5fba6e5136b977c48ff8c595aaf5e6829f6
Author: Devin D'Angelo <[email protected]>
AuthorDate: Fri Mar 15 06:49:04 2024 -0400

    impl agg and subquery plans to sql (#9606)
---
 datafusion/sql/src/unparser/ast.rs      |   3 +
 datafusion/sql/src/unparser/plan.rs     | 102 ++++++++++++++++++++++++++++----
 datafusion/sql/tests/sql_integration.rs |  24 ++++++++
 3 files changed, 116 insertions(+), 13 deletions(-)

diff --git a/datafusion/sql/src/unparser/ast.rs 
b/datafusion/sql/src/unparser/ast.rs
index 955aabe74c..0c9dcde989 100644
--- a/datafusion/sql/src/unparser/ast.rs
+++ b/datafusion/sql/src/unparser/ast.rs
@@ -159,6 +159,9 @@ impl SelectBuilder {
         new.projection = value;
         new
     }
+    pub fn already_projected(&self) -> bool {
+        !self.projection.is_empty()
+    }
     pub fn into(&mut self, value: Option<ast::SelectInto>) -> &mut Self {
         let new = self;
         new.into = value;
diff --git a/datafusion/sql/src/unparser/plan.rs 
b/datafusion/sql/src/unparser/plan.rs
index 21e4427c1f..8d9e0b1a6e 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -15,14 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
+use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, 
Result};
 use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType, 
LogicalPlan};
-use sqlparser::ast;
+use sqlparser::ast::{self, Ident, SelectItem};
 
 use super::{
     ast::{
-        BuilderError, QueryBuilder, RelationBuilder, SelectBuilder, 
TableRelationBuilder,
-        TableWithJoinsBuilder,
+        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
+        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
     },
     Unparser,
 };
@@ -129,14 +129,88 @@ impl Unparser<'_> {
                 Ok(())
             }
             LogicalPlan::Projection(p) => {
-                let items = p
-                    .expr
-                    .iter()
-                    .map(|e| self.select_item_to_sql(e))
-                    .collect::<Result<Vec<_>>>()?;
-                select.projection(items);
-
-                self.select_to_sql_recursively(p.input.as_ref(), query, 
select, relation)
+                // A second projection implies a derived tablefactor
+                if !select.already_projected() {
+                    // Special handling when projecting an agregation plan
+                    if let LogicalPlan::Aggregate(agg) = p.input.as_ref() {
+                        let mut items = p
+                            .expr
+                            .iter()
+                            .filter(|e| !matches!(e, 
Expr::AggregateFunction(_)))
+                            .map(|e| self.select_item_to_sql(e))
+                            .collect::<Result<Vec<_>>>()?;
+
+                        let proj_aggs = p
+                            .expr
+                            .iter()
+                            .filter(|e| matches!(e, 
Expr::AggregateFunction(_)))
+                            .zip(agg.aggr_expr.iter())
+                            .map(|(proj, agg_exp)| {
+                                let sql_agg_expr = 
self.select_item_to_sql(agg_exp)?;
+                                let maybe_aliased =
+                                    if let Expr::Alias(Alias { name, .. }) = 
proj {
+                                        if let 
SelectItem::UnnamedExpr(aggregation_fun) =
+                                            sql_agg_expr
+                                        {
+                                            SelectItem::ExprWithAlias {
+                                                expr: aggregation_fun,
+                                                alias: Ident {
+                                                    value: name.to_string(),
+                                                    quote_style: None,
+                                                },
+                                            }
+                                        } else {
+                                            sql_agg_expr
+                                        }
+                                    } else {
+                                        sql_agg_expr
+                                    };
+                                Ok(maybe_aliased)
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+                        items.extend(proj_aggs);
+                        select.projection(items);
+                        select.group_by(ast::GroupByExpr::Expressions(
+                            agg.group_expr
+                                .iter()
+                                .map(|expr| self.expr_to_sql(expr))
+                                .collect::<Result<Vec<_>>>()?,
+                        ));
+                        self.select_to_sql_recursively(
+                            agg.input.as_ref(),
+                            query,
+                            select,
+                            relation,
+                        )
+                    } else {
+                        let items = p
+                            .expr
+                            .iter()
+                            .map(|e| self.select_item_to_sql(e))
+                            .collect::<Result<Vec<_>>>()?;
+                        select.projection(items);
+                        self.select_to_sql_recursively(
+                            p.input.as_ref(),
+                            query,
+                            select,
+                            relation,
+                        )
+                    }
+                } else {
+                    let mut derived_builder = 
DerivedRelationBuilder::default();
+                    derived_builder.lateral(false).alias(None).subquery({
+                        let inner_statment = self.plan_to_sql(plan)?;
+                        if let ast::Statement::Query(inner_query) = 
inner_statment {
+                            inner_query
+                        } else {
+                            return internal_err!(
+                                "Subquery must be a Query, but found 
{inner_statment:?}"
+                            );
+                        }
+                    });
+                    relation.derived(derived_builder);
+                    Ok(())
+                }
             }
             LogicalPlan::Filter(filter) => {
                 let filter_expr = self.expr_to_sql(&filter.predicate)?;
@@ -176,7 +250,9 @@ impl Unparser<'_> {
                 )
             }
             LogicalPlan::Aggregate(_agg) => {
-                not_impl_err!("Unsupported operator: {plan:?}")
+                not_impl_err!(
+                    "Unsupported aggregation plan not following a projection: 
{plan:?}"
+                )
             }
             LogicalPlan::Distinct(_distinct) => {
                 not_impl_err!("Unsupported operator: {plan:?}")
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index a6ea22db96..fffb540d87 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -4560,6 +4560,26 @@ fn roundtrip_statement() {
             "select ta.j1_id, tb.j2_string, tc.j3_string from j1 ta join j2 tb 
on (ta.j1_id = tb.j2_id) join j3 tc on (ta.j1_id = tc.j3_id);",
             r#"SELECT ta.j1_id, tb.j2_string, tc.j3_string FROM j1 AS ta JOIN 
j2 AS tb ON (ta.j1_id = tb.j2_id) JOIN j3 AS tc ON (ta.j1_id = tc.j3_id)"#,
         ),
+        (
+            "select * from (select id, first_name from person)",
+            "SELECT person.id, person.first_name FROM (SELECT person.id, 
person.first_name FROM person)"
+        ),
+        (
+            "select * from (select id, first_name from (select * from 
person))",
+            "SELECT person.id, person.first_name FROM (SELECT person.id, 
person.first_name FROM (SELECT person.id, person.first_name, person.last_name, 
person.age, person.state, person.salary, person.birth_date, person.😀 FROM 
person))"
+        ),
+        (
+            "select id, count(*) as cnt from (select id from person) group by 
id",
+            "SELECT person.id, COUNT(*) AS cnt FROM (SELECT person.id FROM 
person) GROUP BY person.id"
+        ),
+        (
+            "select id, count(*) as cnt from (select p1.id as id from person 
p1 inner join person p2 on p1.id=p2.id) group by id",
+            "SELECT p1.id, COUNT(*) AS cnt FROM (SELECT p1.id FROM person AS 
p1 JOIN person AS p2 ON (p1.id = p2.id)) GROUP BY p1.id"
+        ),
+        (
+            "select id, count(*), first_name from person group by first_name, 
id",
+            "SELECT person.id, COUNT(*), person.first_name FROM person GROUP 
BY person.first_name, person.id"
+        ),
     ];
 
     let roundtrip = |sql: &str| -> Result<String> {
@@ -4570,8 +4590,12 @@ fn roundtrip_statement() {
         let sql_to_rel = SqlToRel::new(&context);
         let plan = sql_to_rel.sql_statement_to_plan(statement)?;
 
+        println!("{}", plan.display_indent());
+
         let ast = plan_to_sql(&plan)?;
 
+        println!("{ast}");
+
         Ok(format!("{}", ast))
     };
 

Reply via email to