This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 219de5fba6 impl agg and subquery plans to sql (#9606)
219de5fba6 is described below
commit 219de5fba6e5136b977c48ff8c595aaf5e6829f6
Author: Devin D'Angelo <[email protected]>
AuthorDate: Fri Mar 15 06:49:04 2024 -0400
impl agg and subquery plans to sql (#9606)
---
datafusion/sql/src/unparser/ast.rs | 3 +
datafusion/sql/src/unparser/plan.rs | 102 ++++++++++++++++++++++++++++----
datafusion/sql/tests/sql_integration.rs | 24 ++++++++
3 files changed, 116 insertions(+), 13 deletions(-)
diff --git a/datafusion/sql/src/unparser/ast.rs
b/datafusion/sql/src/unparser/ast.rs
index 955aabe74c..0c9dcde989 100644
--- a/datafusion/sql/src/unparser/ast.rs
+++ b/datafusion/sql/src/unparser/ast.rs
@@ -159,6 +159,9 @@ impl SelectBuilder {
new.projection = value;
new
}
+ pub fn already_projected(&self) -> bool {
+ !self.projection.is_empty()
+ }
pub fn into(&mut self, value: Option<ast::SelectInto>) -> &mut Self {
let new = self;
new.into = value;
diff --git a/datafusion/sql/src/unparser/plan.rs
b/datafusion/sql/src/unparser/plan.rs
index 21e4427c1f..8d9e0b1a6e 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
+use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError,
Result};
use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType,
LogicalPlan};
-use sqlparser::ast;
+use sqlparser::ast::{self, Ident, SelectItem};
use super::{
ast::{
- BuilderError, QueryBuilder, RelationBuilder, SelectBuilder,
TableRelationBuilder,
- TableWithJoinsBuilder,
+ BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
+ SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
},
Unparser,
};
@@ -129,14 +129,88 @@ impl Unparser<'_> {
Ok(())
}
LogicalPlan::Projection(p) => {
- let items = p
- .expr
- .iter()
- .map(|e| self.select_item_to_sql(e))
- .collect::<Result<Vec<_>>>()?;
- select.projection(items);
-
- self.select_to_sql_recursively(p.input.as_ref(), query,
select, relation)
+ // A second projection implies a derived tablefactor
+ if !select.already_projected() {
+ // Special handling when projecting an agregation plan
+ if let LogicalPlan::Aggregate(agg) = p.input.as_ref() {
+ let mut items = p
+ .expr
+ .iter()
+ .filter(|e| !matches!(e,
Expr::AggregateFunction(_)))
+ .map(|e| self.select_item_to_sql(e))
+ .collect::<Result<Vec<_>>>()?;
+
+ let proj_aggs = p
+ .expr
+ .iter()
+ .filter(|e| matches!(e,
Expr::AggregateFunction(_)))
+ .zip(agg.aggr_expr.iter())
+ .map(|(proj, agg_exp)| {
+ let sql_agg_expr =
self.select_item_to_sql(agg_exp)?;
+ let maybe_aliased =
+ if let Expr::Alias(Alias { name, .. }) =
proj {
+ if let
SelectItem::UnnamedExpr(aggregation_fun) =
+ sql_agg_expr
+ {
+ SelectItem::ExprWithAlias {
+ expr: aggregation_fun,
+ alias: Ident {
+ value: name.to_string(),
+ quote_style: None,
+ },
+ }
+ } else {
+ sql_agg_expr
+ }
+ } else {
+ sql_agg_expr
+ };
+ Ok(maybe_aliased)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ items.extend(proj_aggs);
+ select.projection(items);
+ select.group_by(ast::GroupByExpr::Expressions(
+ agg.group_expr
+ .iter()
+ .map(|expr| self.expr_to_sql(expr))
+ .collect::<Result<Vec<_>>>()?,
+ ));
+ self.select_to_sql_recursively(
+ agg.input.as_ref(),
+ query,
+ select,
+ relation,
+ )
+ } else {
+ let items = p
+ .expr
+ .iter()
+ .map(|e| self.select_item_to_sql(e))
+ .collect::<Result<Vec<_>>>()?;
+ select.projection(items);
+ self.select_to_sql_recursively(
+ p.input.as_ref(),
+ query,
+ select,
+ relation,
+ )
+ }
+ } else {
+ let mut derived_builder =
DerivedRelationBuilder::default();
+ derived_builder.lateral(false).alias(None).subquery({
+ let inner_statment = self.plan_to_sql(plan)?;
+ if let ast::Statement::Query(inner_query) =
inner_statment {
+ inner_query
+ } else {
+ return internal_err!(
+ "Subquery must be a Query, but found
{inner_statment:?}"
+ );
+ }
+ });
+ relation.derived(derived_builder);
+ Ok(())
+ }
}
LogicalPlan::Filter(filter) => {
let filter_expr = self.expr_to_sql(&filter.predicate)?;
@@ -176,7 +250,9 @@ impl Unparser<'_> {
)
}
LogicalPlan::Aggregate(_agg) => {
- not_impl_err!("Unsupported operator: {plan:?}")
+ not_impl_err!(
+ "Unsupported aggregation plan not following a projection:
{plan:?}"
+ )
}
LogicalPlan::Distinct(_distinct) => {
not_impl_err!("Unsupported operator: {plan:?}")
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index a6ea22db96..fffb540d87 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -4560,6 +4560,26 @@ fn roundtrip_statement() {
"select ta.j1_id, tb.j2_string, tc.j3_string from j1 ta join j2 tb
on (ta.j1_id = tb.j2_id) join j3 tc on (ta.j1_id = tc.j3_id);",
r#"SELECT ta.j1_id, tb.j2_string, tc.j3_string FROM j1 AS ta JOIN
j2 AS tb ON (ta.j1_id = tb.j2_id) JOIN j3 AS tc ON (ta.j1_id = tc.j3_id)"#,
),
+ (
+ "select * from (select id, first_name from person)",
+ "SELECT person.id, person.first_name FROM (SELECT person.id,
person.first_name FROM person)"
+ ),
+ (
+ "select * from (select id, first_name from (select * from
person))",
+ "SELECT person.id, person.first_name FROM (SELECT person.id,
person.first_name FROM (SELECT person.id, person.first_name, person.last_name,
person.age, person.state, person.salary, person.birth_date, person.😀 FROM
person))"
+ ),
+ (
+ "select id, count(*) as cnt from (select id from person) group by
id",
+ "SELECT person.id, COUNT(*) AS cnt FROM (SELECT person.id FROM
person) GROUP BY person.id"
+ ),
+ (
+ "select id, count(*) as cnt from (select p1.id as id from person
p1 inner join person p2 on p1.id=p2.id) group by id",
+ "SELECT p1.id, COUNT(*) AS cnt FROM (SELECT p1.id FROM person AS
p1 JOIN person AS p2 ON (p1.id = p2.id)) GROUP BY p1.id"
+ ),
+ (
+ "select id, count(*), first_name from person group by first_name,
id",
+ "SELECT person.id, COUNT(*), person.first_name FROM person GROUP
BY person.first_name, person.id"
+ ),
];
let roundtrip = |sql: &str| -> Result<String> {
@@ -4570,8 +4590,12 @@ fn roundtrip_statement() {
let sql_to_rel = SqlToRel::new(&context);
let plan = sql_to_rel.sql_statement_to_plan(statement)?;
+ println!("{}", plan.display_indent());
+
let ast = plan_to_sql(&plan)?;
+ println!("{ast}");
+
Ok(format!("{}", ast))
};