alamb commented on code in PR #9596: URL: https://github.com/apache/arrow-datafusion/pull/9596#discussion_r1523528403
########## datafusion/sql/src/unparser/plan.rs: ########## @@ -0,0 +1,364 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType, LogicalPlan}; +use sqlparser::ast; + +use super::{ + ast::{ + BuilderError, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder, + TableWithJoinsBuilder, + }, + Unparser, +}; + +/// Convert a DataFusion [`LogicalPlan`] to `sqlparser::ast::Statement` +/// +/// This function is the opposite of `SqlToRel::sql_statement_to_plan` and can +/// be used to, among other things, convert `LogicalPlan`s to strings. +/// +/// # Example +/// ``` +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use datafusion_expr::{col, logical_plan::table_scan}; +/// use datafusion_sql::unparser::plan_to_sql; +/// let schema = Schema::new(vec![ +/// Field::new("id", DataType::Utf8, false), +/// Field::new("value", DataType::Utf8, false), +/// ]); +/// let plan = table_scan(Some("table"), &schema, None) +/// .unwrap() +/// .project(vec![col("id"), col("value")]) +/// .unwrap() +/// .build() +/// .unwrap(); +/// let sql = plan_to_sql(&plan).unwrap(); +/// +/// assert_eq!(format!("{}", sql), "SELECT table.id, table.value FROM table") +/// ``` +pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> { + let unparser = Unparser::default(); + unparser.plan_to_sql(plan) +} + +impl Unparser<'_> { + pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> { + match plan { + LogicalPlan::Projection(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Window(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Join(_) + | LogicalPlan::CrossJoin(_) + | LogicalPlan::Repartition(_) + | LogicalPlan::Union(_) + | LogicalPlan::TableScan(_) + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Values(_) + | LogicalPlan::Distinct(_) => self.select_to_sql(plan), + LogicalPlan::Dml(_) => self.dml_to_sql(plan), + LogicalPlan::Explain(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Extension(_) + | LogicalPlan::Prepare(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::RecursiveQuery(_) + | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"), + } + } + + fn select_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> { + let mut query_builder = QueryBuilder::default(); + let mut select_builder = SelectBuilder::default(); + select_builder.push_from(TableWithJoinsBuilder::default()); + let mut relation_builder = RelationBuilder::default(); + self.select_to_sql_recursively( + plan, + &mut query_builder, + &mut select_builder, + &mut relation_builder, + )?; + + let mut twj = select_builder.pop_from().unwrap(); + twj.relation(relation_builder); + select_builder.push_from(twj); + + let body = ast::SetExpr::Select(Box::new( + select_builder.build().map_err(builder_error_to_df)?, + )); + let query = query_builder + .body(Box::new(body)) + .build() + .map_err(builder_error_to_df)?; + + Ok(ast::Statement::Query(Box::new(query))) + } + + fn select_to_sql_recursively( + &self, + plan: &LogicalPlan, + query: &mut QueryBuilder, + select: &mut SelectBuilder, + relation: &mut RelationBuilder, + ) -> Result<()> { + match plan { + LogicalPlan::TableScan(scan) => { + let mut builder = TableRelationBuilder::default(); + builder.name(ast::ObjectName(vec![ + self.new_ident(scan.table_name.table().to_string()) + ])); + relation.table(builder); + + Ok(()) + } + LogicalPlan::Projection(p) => { + let items = p + .expr + .iter() + .map(|e| self.select_item_to_sql(e)) + .collect::<Result<Vec<_>>>()?; + select.projection(items); + + self.select_to_sql_recursively(p.input.as_ref(), query, select, relation) + } + LogicalPlan::Filter(filter) => { + let filter_expr = self.expr_to_sql(&filter.predicate)?; + + select.selection(Some(filter_expr)); + + self.select_to_sql_recursively( + filter.input.as_ref(), + query, + select, + relation, + ) + } + LogicalPlan::Limit(limit) => { + if let Some(fetch) = limit.fetch { + query.limit(Some(ast::Expr::Value(ast::Value::Number( + fetch.to_string(), + false, + )))); + } + + self.select_to_sql_recursively( + limit.input.as_ref(), + query, + select, + relation, + ) + } + LogicalPlan::Sort(sort) => { + query.order_by(self.sort_to_sql(sort.expr.clone())?); + + self.select_to_sql_recursively( + sort.input.as_ref(), + query, + select, + relation, + ) + } + LogicalPlan::Aggregate(_agg) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Distinct(_distinct) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Join(join) => { + match join.join_constraint { + JoinConstraint::On => {} + JoinConstraint::Using => { + return not_impl_err!( + "Unsupported join constraint: {:?}", + join.join_constraint + ) + } + } + + // parse filter if exists + let join_filter = match &join.filter { + Some(filter) => Some(self.expr_to_sql(filter)?), + None => None, + }; + + // map join.on to `l.a = r.a AND l.b = r.b AND ...` + let eq_op = ast::BinaryOperator::Eq; + let join_on = self.join_conditions_to_sql(&join.on, eq_op)?; + + // Merge `join_on` and `join_filter` + let join_expr = match (join_filter, join_on) { + (Some(filter), Some(on)) => Some(self.and_op_to_sql(filter, on)), + (Some(filter), None) => Some(filter), + (None, Some(on)) => Some(on), + (None, None) => None, + }; + let join_constraint = match join_expr { + Some(expr) => ast::JoinConstraint::On(expr), + None => ast::JoinConstraint::None, + }; + + let mut right_relation = RelationBuilder::default(); + + self.select_to_sql_recursively( + join.left.as_ref(), + query, + select, + relation, + )?; + self.select_to_sql_recursively( + join.right.as_ref(), + query, + select, + &mut right_relation, + )?; + + let ast_join = ast::Join { + relation: right_relation.build().map_err(builder_error_to_df)?, + join_operator: self + .join_operator_to_sql(join.join_type, join_constraint), + }; + let mut from = select.pop_from().unwrap(); + from.push_join(ast_join); + select.push_from(from); + + Ok(()) + } + LogicalPlan::SubqueryAlias(plan_alias) => { + // Handle bottom-up to allocate relation + self.select_to_sql_recursively( + plan_alias.input.as_ref(), + query, + select, + relation, + )?; + + relation.alias(Some( + self.new_table_alias(plan_alias.alias.table().to_string()), + )); + + Ok(()) + } + LogicalPlan::Union(_union) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Window(_window) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator: {plan:?}"), + _ => not_impl_err!("Unsupported operator: {plan:?}"), + } + } + + fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> { + match expr { + Expr::Alias(Alias { expr, name, .. }) => { + let inner = self.expr_to_sql(expr)?; + + Ok(ast::SelectItem::ExprWithAlias { + expr: inner, + alias: self.new_ident(name.to_string()), + }) + } + _ => { + let inner = self.expr_to_sql(expr)?; + + Ok(ast::SelectItem::UnnamedExpr(inner)) + } + } + } + + fn sort_to_sql(&self, sort_exprs: Vec<Expr>) -> Result<Vec<ast::OrderByExpr>> { + sort_exprs + .iter() + .map(|expr: &Expr| match expr { + Expr::Sort(sort_expr) => { + let col = self.expr_to_sql(&sort_expr.expr)?; + Ok(ast::OrderByExpr { + asc: Some(sort_expr.asc), + expr: col, + nulls_first: Some(sort_expr.nulls_first), + }) + } + _ => Err(DataFusionError::Plan("Expecting Sort expr".to_string())), Review Comment: ```suggestion _ => plan_err!("Expecting Sort expr"), ``` ########## datafusion/sql/src/unparser/plan.rs: ########## @@ -0,0 +1,364 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType, LogicalPlan}; +use sqlparser::ast; + +use super::{ + ast::{ + BuilderError, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder, + TableWithJoinsBuilder, + }, + Unparser, +}; + +/// Convert a DataFusion [`LogicalPlan`] to `sqlparser::ast::Statement` +/// +/// This function is the opposite of `SqlToRel::sql_statement_to_plan` and can +/// be used to, among other things, convert `LogicalPlan`s to strings. +/// +/// # Example +/// ``` +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use datafusion_expr::{col, logical_plan::table_scan}; +/// use datafusion_sql::unparser::plan_to_sql; +/// let schema = Schema::new(vec![ +/// Field::new("id", DataType::Utf8, false), +/// Field::new("value", DataType::Utf8, false), +/// ]); +/// let plan = table_scan(Some("table"), &schema, None) +/// .unwrap() +/// .project(vec![col("id"), col("value")]) +/// .unwrap() +/// .build() +/// .unwrap(); +/// let sql = plan_to_sql(&plan).unwrap(); +/// +/// assert_eq!(format!("{}", sql), "SELECT table.id, table.value FROM table") +/// ``` +pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> { + let unparser = Unparser::default(); + unparser.plan_to_sql(plan) +} + +impl Unparser<'_> { + pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> { + match plan { + LogicalPlan::Projection(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Window(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Join(_) + | LogicalPlan::CrossJoin(_) + | LogicalPlan::Repartition(_) + | LogicalPlan::Union(_) + | LogicalPlan::TableScan(_) + | LogicalPlan::EmptyRelation(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Values(_) + | LogicalPlan::Distinct(_) => self.select_to_sql(plan), + LogicalPlan::Dml(_) => self.dml_to_sql(plan), + LogicalPlan::Explain(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Extension(_) + | LogicalPlan::Prepare(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::RecursiveQuery(_) + | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"), + } + } + + fn select_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> { + let mut query_builder = QueryBuilder::default(); + let mut select_builder = SelectBuilder::default(); + select_builder.push_from(TableWithJoinsBuilder::default()); + let mut relation_builder = RelationBuilder::default(); + self.select_to_sql_recursively( + plan, + &mut query_builder, + &mut select_builder, + &mut relation_builder, + )?; + + let mut twj = select_builder.pop_from().unwrap(); + twj.relation(relation_builder); + select_builder.push_from(twj); + + let body = ast::SetExpr::Select(Box::new( + select_builder.build().map_err(builder_error_to_df)?, + )); + let query = query_builder + .body(Box::new(body)) + .build() + .map_err(builder_error_to_df)?; + + Ok(ast::Statement::Query(Box::new(query))) + } + + fn select_to_sql_recursively( + &self, + plan: &LogicalPlan, + query: &mut QueryBuilder, + select: &mut SelectBuilder, + relation: &mut RelationBuilder, + ) -> Result<()> { + match plan { + LogicalPlan::TableScan(scan) => { + let mut builder = TableRelationBuilder::default(); + builder.name(ast::ObjectName(vec![ + self.new_ident(scan.table_name.table().to_string()) + ])); + relation.table(builder); + + Ok(()) + } + LogicalPlan::Projection(p) => { + let items = p + .expr + .iter() + .map(|e| self.select_item_to_sql(e)) + .collect::<Result<Vec<_>>>()?; + select.projection(items); + + self.select_to_sql_recursively(p.input.as_ref(), query, select, relation) + } + LogicalPlan::Filter(filter) => { + let filter_expr = self.expr_to_sql(&filter.predicate)?; + + select.selection(Some(filter_expr)); + + self.select_to_sql_recursively( + filter.input.as_ref(), + query, + select, + relation, + ) + } + LogicalPlan::Limit(limit) => { + if let Some(fetch) = limit.fetch { + query.limit(Some(ast::Expr::Value(ast::Value::Number( + fetch.to_string(), + false, + )))); + } + + self.select_to_sql_recursively( + limit.input.as_ref(), + query, + select, + relation, + ) + } + LogicalPlan::Sort(sort) => { + query.order_by(self.sort_to_sql(sort.expr.clone())?); + + self.select_to_sql_recursively( + sort.input.as_ref(), + query, + select, + relation, + ) + } + LogicalPlan::Aggregate(_agg) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Distinct(_distinct) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Join(join) => { + match join.join_constraint { + JoinConstraint::On => {} + JoinConstraint::Using => { + return not_impl_err!( + "Unsupported join constraint: {:?}", + join.join_constraint + ) + } + } + + // parse filter if exists + let join_filter = match &join.filter { + Some(filter) => Some(self.expr_to_sql(filter)?), + None => None, + }; + + // map join.on to `l.a = r.a AND l.b = r.b AND ...` + let eq_op = ast::BinaryOperator::Eq; + let join_on = self.join_conditions_to_sql(&join.on, eq_op)?; + + // Merge `join_on` and `join_filter` + let join_expr = match (join_filter, join_on) { + (Some(filter), Some(on)) => Some(self.and_op_to_sql(filter, on)), + (Some(filter), None) => Some(filter), + (None, Some(on)) => Some(on), + (None, None) => None, + }; + let join_constraint = match join_expr { + Some(expr) => ast::JoinConstraint::On(expr), + None => ast::JoinConstraint::None, + }; + + let mut right_relation = RelationBuilder::default(); + + self.select_to_sql_recursively( + join.left.as_ref(), + query, + select, + relation, + )?; + self.select_to_sql_recursively( + join.right.as_ref(), + query, + select, + &mut right_relation, + )?; + + let ast_join = ast::Join { + relation: right_relation.build().map_err(builder_error_to_df)?, + join_operator: self + .join_operator_to_sql(join.join_type, join_constraint), + }; + let mut from = select.pop_from().unwrap(); + from.push_join(ast_join); + select.push_from(from); + + Ok(()) + } + LogicalPlan::SubqueryAlias(plan_alias) => { + // Handle bottom-up to allocate relation + self.select_to_sql_recursively( + plan_alias.input.as_ref(), + query, + select, + relation, + )?; + + relation.alias(Some( + self.new_table_alias(plan_alias.alias.table().to_string()), + )); + + Ok(()) + } + LogicalPlan::Union(_union) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Window(_window) => { + not_impl_err!("Unsupported operator: {plan:?}") + } + LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator: {plan:?}"), + _ => not_impl_err!("Unsupported operator: {plan:?}"), + } + } + + fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> { + match expr { + Expr::Alias(Alias { expr, name, .. }) => { + let inner = self.expr_to_sql(expr)?; + + Ok(ast::SelectItem::ExprWithAlias { + expr: inner, + alias: self.new_ident(name.to_string()), + }) + } + _ => { + let inner = self.expr_to_sql(expr)?; + + Ok(ast::SelectItem::UnnamedExpr(inner)) + } + } + } + + fn sort_to_sql(&self, sort_exprs: Vec<Expr>) -> Result<Vec<ast::OrderByExpr>> { + sort_exprs + .iter() + .map(|expr: &Expr| match expr { + Expr::Sort(sort_expr) => { + let col = self.expr_to_sql(&sort_expr.expr)?; + Ok(ast::OrderByExpr { + asc: Some(sort_expr.asc), + expr: col, + nulls_first: Some(sort_expr.nulls_first), + }) + } + _ => Err(DataFusionError::Plan("Expecting Sort expr".to_string())), + }) + .collect::<Result<Vec<_>>>() + } + + fn join_operator_to_sql( + &self, + join_type: JoinType, + constraint: ast::JoinConstraint, + ) -> ast::JoinOperator { + match join_type { + JoinType::Inner => ast::JoinOperator::Inner(constraint), + JoinType::Left => ast::JoinOperator::LeftOuter(constraint), + JoinType::Right => ast::JoinOperator::RightOuter(constraint), + JoinType::Full => ast::JoinOperator::FullOuter(constraint), + JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint), + JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint), + JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint), + JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint), + } + } + + fn join_conditions_to_sql( + &self, + join_conditions: &Vec<(Expr, Expr)>, + eq_op: ast::BinaryOperator, + ) -> Result<Option<ast::Expr>> { + // Only support AND conjunction for each binary expression in join conditions + let mut exprs: Vec<ast::Expr> = vec![]; + for (left, right) in join_conditions { + // Parse left + let l = self.expr_to_sql(left)?; + // Parse right + let r = self.expr_to_sql(right)?; + // AND with existing expression + exprs.push(self.binary_op_to_sql(l, r, eq_op.clone())); + } + let join_expr: Option<ast::Expr> = + exprs.into_iter().reduce(|r, l| self.and_op_to_sql(r, l)); + Ok(join_expr) + } + + fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr { + self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And) + } + + fn new_table_alias(&self, alias: String) -> ast::TableAlias { + ast::TableAlias { + name: self.new_ident(alias), + columns: Vec::new(), + } + } + + fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> { + not_impl_err!("Unsupported plan: {plan:?}") + } +} + +fn builder_error_to_df(e: BuilderError) -> DataFusionError { + DataFusionError::External(format!("{e}").into()) +} Review Comment: I think you can use a `impl From` here to get some nicer code: ```suggestion impl From<BuilderError> for DataFusionError { fn from(e: BuilderError) -> Self { DataFusionError::External(Box::new(e)) } } ``` Then above you can use `?`: ```diff diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 360bef4c8..941ec6713 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -106,12 +106,11 @@ impl Unparser<'_> { select_builder.push_from(twj); let body = ast::SetExpr::Select(Box::new( - select_builder.build().map_err(builder_error_to_df)?, + select_builder.build()? )); let query = query_builder .body(Box::new(body)) - .build() - .map_err(builder_error_to_df)?; + .build()?; Ok(ast::Statement::Query(Box::new(query))) } @@ -235,7 +234,7 @@ impl Unparser<'_> { )?; let ast_join = ast::Join { - relation: right_relation.build().map_err(builder_error_to_df)?, + relation: right_relation.build()?, join_operator: self .join_operator_to_sql(join.join_type, join_constraint), }; ``` ########## datafusion/sql/tests/sql_integration.rs: ########## @@ -4487,6 +4490,87 @@ impl TableSource for EmptyTable { } } +#[test] +fn roundtrip_expr() { + let tests: Vec<(TableReference, &str, &str)> = vec![ Review Comment: FYI @devinjdangelo -- maybe we can consolidate the round trip expr tests from https://github.com/apache/arrow-datafusion/pull/9578 here too ########## datafusion/sql/src/unparser/ast.rs: ########## @@ -0,0 +1,585 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use core::fmt; + +use sqlparser::ast; + +/// This file contains builders to create SQL ASTs. They are purposefully +/// not exported as they will eventually be move to the SQLparser package. +/// +/// +/// See <https://github.com/apache/arrow-datafusion/issues/8661> Review Comment: If you use `///` the comment will be attached to `QueryBuilder` I think It think the intention here is to attach the comments to the module with `//!` ```suggestion //! This file contains builders to create SQL ASTs. They are purposefully //! not exported as they will eventually be move to the SQLparser package. //! //! //! See <https://github.com/apache/arrow-datafusion/issues/8661> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
