This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 51d9441273 feat: support LogicalPlan to SQL String (#9596)
51d9441273 is described below
commit 51d9441273d7e58e30cb44071bd45d1d2ee20a16
Author: Michiel De Backker <[email protected]>
AuthorDate: Wed Mar 13 18:36:05 2024 +0100
feat: support LogicalPlan to SQL String (#9596)
Relates to #8661
---
datafusion/sql/src/unparser/ast.rs | 585 ++++++++++++++++++++++++++++++++
datafusion/sql/src/unparser/expr.rs | 25 +-
datafusion/sql/src/unparser/mod.rs | 3 +
datafusion/sql/src/unparser/plan.rs | 361 ++++++++++++++++++++
datafusion/sql/tests/sql_integration.rs | 86 ++++-
5 files changed, 1046 insertions(+), 14 deletions(-)
diff --git a/datafusion/sql/src/unparser/ast.rs
b/datafusion/sql/src/unparser/ast.rs
new file mode 100644
index 0000000000..955aabe74c
--- /dev/null
+++ b/datafusion/sql/src/unparser/ast.rs
@@ -0,0 +1,585 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file contains builders to create SQL ASTs. They are purposefully
+//! not exported as they will eventually be move to the SQLparser package.
+//!
+//!
+//! See <https://github.com/apache/arrow-datafusion/issues/8661>
+
+use core::fmt;
+
+use sqlparser::ast;
+
+#[derive(Clone)]
+pub(super) struct QueryBuilder {
+ with: Option<ast::With>,
+ body: Option<Box<ast::SetExpr>>,
+ order_by: Vec<ast::OrderByExpr>,
+ limit: Option<ast::Expr>,
+ limit_by: Vec<ast::Expr>,
+ offset: Option<ast::Offset>,
+ fetch: Option<ast::Fetch>,
+ locks: Vec<ast::LockClause>,
+ for_clause: Option<ast::ForClause>,
+}
+
+#[allow(dead_code)]
+impl QueryBuilder {
+ pub fn with(&mut self, value: Option<ast::With>) -> &mut Self {
+ let new = self;
+ new.with = value;
+ new
+ }
+ pub fn body(&mut self, value: Box<ast::SetExpr>) -> &mut Self {
+ let new = self;
+ new.body = Option::Some(value);
+ new
+ }
+ pub fn order_by(&mut self, value: Vec<ast::OrderByExpr>) -> &mut Self {
+ let new = self;
+ new.order_by = value;
+ new
+ }
+ pub fn limit(&mut self, value: Option<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.limit = value;
+ new
+ }
+ pub fn limit_by(&mut self, value: Vec<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.limit_by = value;
+ new
+ }
+ pub fn offset(&mut self, value: Option<ast::Offset>) -> &mut Self {
+ let new = self;
+ new.offset = value;
+ new
+ }
+ pub fn fetch(&mut self, value: Option<ast::Fetch>) -> &mut Self {
+ let new = self;
+ new.fetch = value;
+ new
+ }
+ pub fn locks(&mut self, value: Vec<ast::LockClause>) -> &mut Self {
+ let new = self;
+ new.locks = value;
+ new
+ }
+ pub fn for_clause(&mut self, value: Option<ast::ForClause>) -> &mut Self {
+ let new = self;
+ new.for_clause = value;
+ new
+ }
+ pub fn build(&self) -> Result<ast::Query, BuilderError> {
+ Ok(ast::Query {
+ with: self.with.clone(),
+ body: match self.body {
+ Some(ref value) => value.clone(),
+ None => {
+ return
Result::Err(Into::into(UninitializedFieldError::from("body")))
+ }
+ },
+ order_by: self.order_by.clone(),
+ limit: self.limit.clone(),
+ limit_by: self.limit_by.clone(),
+ offset: self.offset.clone(),
+ fetch: self.fetch.clone(),
+ locks: self.locks.clone(),
+ for_clause: self.for_clause.clone(),
+ })
+ }
+ fn create_empty() -> Self {
+ Self {
+ with: Default::default(),
+ body: Default::default(),
+ order_by: Default::default(),
+ limit: Default::default(),
+ limit_by: Default::default(),
+ offset: Default::default(),
+ fetch: Default::default(),
+ locks: Default::default(),
+ for_clause: Default::default(),
+ }
+ }
+}
+impl Default for QueryBuilder {
+ fn default() -> Self {
+ Self::create_empty()
+ }
+}
+
+#[derive(Clone)]
+pub(super) struct SelectBuilder {
+ distinct: Option<ast::Distinct>,
+ top: Option<ast::Top>,
+ projection: Vec<ast::SelectItem>,
+ into: Option<ast::SelectInto>,
+ from: Vec<TableWithJoinsBuilder>,
+ lateral_views: Vec<ast::LateralView>,
+ selection: Option<ast::Expr>,
+ group_by: Option<ast::GroupByExpr>,
+ cluster_by: Vec<ast::Expr>,
+ distribute_by: Vec<ast::Expr>,
+ sort_by: Vec<ast::Expr>,
+ having: Option<ast::Expr>,
+ named_window: Vec<ast::NamedWindowDefinition>,
+ qualify: Option<ast::Expr>,
+ value_table_mode: Option<ast::ValueTableMode>,
+}
+
+#[allow(dead_code)]
+impl SelectBuilder {
+ pub fn distinct(&mut self, value: Option<ast::Distinct>) -> &mut Self {
+ let new = self;
+ new.distinct = value;
+ new
+ }
+ pub fn top(&mut self, value: Option<ast::Top>) -> &mut Self {
+ let new = self;
+ new.top = value;
+ new
+ }
+ pub fn projection(&mut self, value: Vec<ast::SelectItem>) -> &mut Self {
+ let new = self;
+ new.projection = value;
+ new
+ }
+ pub fn into(&mut self, value: Option<ast::SelectInto>) -> &mut Self {
+ let new = self;
+ new.into = value;
+ new
+ }
+ pub fn from(&mut self, value: Vec<TableWithJoinsBuilder>) -> &mut Self {
+ let new = self;
+ new.from = value;
+ new
+ }
+ pub fn push_from(&mut self, value: TableWithJoinsBuilder) -> &mut Self {
+ let new = self;
+ new.from.push(value);
+ new
+ }
+ pub fn pop_from(&mut self) -> Option<TableWithJoinsBuilder> {
+ self.from.pop()
+ }
+ pub fn lateral_views(&mut self, value: Vec<ast::LateralView>) -> &mut Self
{
+ let new = self;
+ new.lateral_views = value;
+ new
+ }
+ pub fn selection(&mut self, value: Option<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.selection = value;
+ new
+ }
+ pub fn group_by(&mut self, value: ast::GroupByExpr) -> &mut Self {
+ let new = self;
+ new.group_by = Option::Some(value);
+ new
+ }
+ pub fn cluster_by(&mut self, value: Vec<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.cluster_by = value;
+ new
+ }
+ pub fn distribute_by(&mut self, value: Vec<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.distribute_by = value;
+ new
+ }
+ pub fn sort_by(&mut self, value: Vec<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.sort_by = value;
+ new
+ }
+ pub fn having(&mut self, value: Option<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.having = value;
+ new
+ }
+ pub fn named_window(&mut self, value: Vec<ast::NamedWindowDefinition>) ->
&mut Self {
+ let new = self;
+ new.named_window = value;
+ new
+ }
+ pub fn qualify(&mut self, value: Option<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.qualify = value;
+ new
+ }
+ pub fn value_table_mode(&mut self, value: Option<ast::ValueTableMode>) ->
&mut Self {
+ let new = self;
+ new.value_table_mode = value;
+ new
+ }
+ pub fn build(&self) -> Result<ast::Select, BuilderError> {
+ Ok(ast::Select {
+ distinct: self.distinct.clone(),
+ top: self.top.clone(),
+ projection: self.projection.clone(),
+ into: self.into.clone(),
+ from: self
+ .from
+ .iter()
+ .map(|b| b.build())
+ .collect::<Result<Vec<_>, BuilderError>>()?,
+ lateral_views: self.lateral_views.clone(),
+ selection: self.selection.clone(),
+ group_by: match self.group_by {
+ Some(ref value) => value.clone(),
+ None => {
+ return
Result::Err(Into::into(UninitializedFieldError::from(
+ "group_by",
+ )))
+ }
+ },
+ cluster_by: self.cluster_by.clone(),
+ distribute_by: self.distribute_by.clone(),
+ sort_by: self.sort_by.clone(),
+ having: self.having.clone(),
+ named_window: self.named_window.clone(),
+ qualify: self.qualify.clone(),
+ value_table_mode: self.value_table_mode,
+ })
+ }
+ fn create_empty() -> Self {
+ Self {
+ distinct: Default::default(),
+ top: Default::default(),
+ projection: Default::default(),
+ into: Default::default(),
+ from: Default::default(),
+ lateral_views: Default::default(),
+ selection: Default::default(),
+ group_by: Some(ast::GroupByExpr::Expressions(Vec::new())),
+ cluster_by: Default::default(),
+ distribute_by: Default::default(),
+ sort_by: Default::default(),
+ having: Default::default(),
+ named_window: Default::default(),
+ qualify: Default::default(),
+ value_table_mode: Default::default(),
+ }
+ }
+}
+impl Default for SelectBuilder {
+ fn default() -> Self {
+ Self::create_empty()
+ }
+}
+
+#[derive(Clone)]
+pub(super) struct TableWithJoinsBuilder {
+ relation: Option<RelationBuilder>,
+ joins: Vec<ast::Join>,
+}
+
+#[allow(dead_code)]
+impl TableWithJoinsBuilder {
+ pub fn relation(&mut self, value: RelationBuilder) -> &mut Self {
+ let new = self;
+ new.relation = Option::Some(value);
+ new
+ }
+
+ pub fn joins(&mut self, value: Vec<ast::Join>) -> &mut Self {
+ let new = self;
+ new.joins = value;
+ new
+ }
+ pub fn push_join(&mut self, value: ast::Join) -> &mut Self {
+ let new = self;
+ new.joins.push(value);
+ new
+ }
+
+ pub fn build(&self) -> Result<ast::TableWithJoins, BuilderError> {
+ Ok(ast::TableWithJoins {
+ relation: match self.relation {
+ Some(ref value) => value.build()?,
+ None => {
+ return
Result::Err(Into::into(UninitializedFieldError::from(
+ "relation",
+ )))
+ }
+ },
+ joins: self.joins.clone(),
+ })
+ }
+ fn create_empty() -> Self {
+ Self {
+ relation: Default::default(),
+ joins: Default::default(),
+ }
+ }
+}
+impl Default for TableWithJoinsBuilder {
+ fn default() -> Self {
+ Self::create_empty()
+ }
+}
+
+#[derive(Clone)]
+pub(super) struct RelationBuilder {
+ relation: Option<TableFactorBuilder>,
+}
+
+#[allow(dead_code)]
+#[derive(Clone)]
+enum TableFactorBuilder {
+ Table(TableRelationBuilder),
+ Derived(DerivedRelationBuilder),
+}
+
+#[allow(dead_code)]
+impl RelationBuilder {
+ pub fn has_relation(&self) -> bool {
+ self.relation.is_some()
+ }
+ pub fn table(&mut self, value: TableRelationBuilder) -> &mut Self {
+ let new = self;
+ new.relation = Option::Some(TableFactorBuilder::Table(value));
+ new
+ }
+ pub fn derived(&mut self, value: DerivedRelationBuilder) -> &mut Self {
+ let new = self;
+ new.relation = Option::Some(TableFactorBuilder::Derived(value));
+ new
+ }
+ pub fn alias(&mut self, value: Option<ast::TableAlias>) -> &mut Self {
+ let new = self;
+ match new.relation {
+ Some(TableFactorBuilder::Table(ref mut rel_builder)) => {
+ rel_builder.alias = value;
+ }
+ Some(TableFactorBuilder::Derived(ref mut rel_builder)) => {
+ rel_builder.alias = value;
+ }
+ None => (),
+ }
+ new
+ }
+ pub fn build(&self) -> Result<ast::TableFactor, BuilderError> {
+ Ok(match self.relation {
+ Some(TableFactorBuilder::Table(ref value)) => value.build()?,
+ Some(TableFactorBuilder::Derived(ref value)) => value.build()?,
+ None => {
+ return
Result::Err(Into::into(UninitializedFieldError::from("relation")))
+ }
+ })
+ }
+ fn create_empty() -> Self {
+ Self {
+ relation: Default::default(),
+ }
+ }
+}
+impl Default for RelationBuilder {
+ fn default() -> Self {
+ Self::create_empty()
+ }
+}
+
+#[derive(Clone)]
+pub(super) struct TableRelationBuilder {
+ name: Option<ast::ObjectName>,
+ alias: Option<ast::TableAlias>,
+ args: Option<Vec<ast::FunctionArg>>,
+ with_hints: Vec<ast::Expr>,
+ version: Option<ast::TableVersion>,
+ partitions: Vec<ast::Ident>,
+}
+
+#[allow(dead_code)]
+impl TableRelationBuilder {
+ pub fn name(&mut self, value: ast::ObjectName) -> &mut Self {
+ let new = self;
+ new.name = Option::Some(value);
+ new
+ }
+ pub fn alias(&mut self, value: Option<ast::TableAlias>) -> &mut Self {
+ let new = self;
+ new.alias = value;
+ new
+ }
+ pub fn args(&mut self, value: Option<Vec<ast::FunctionArg>>) -> &mut Self {
+ let new = self;
+ new.args = value;
+ new
+ }
+ pub fn with_hints(&mut self, value: Vec<ast::Expr>) -> &mut Self {
+ let new = self;
+ new.with_hints = value;
+ new
+ }
+ pub fn version(&mut self, value: Option<ast::TableVersion>) -> &mut Self {
+ let new = self;
+ new.version = value;
+ new
+ }
+ pub fn partitions(&mut self, value: Vec<ast::Ident>) -> &mut Self {
+ let new = self;
+ new.partitions = value;
+ new
+ }
+ pub fn build(&self) -> Result<ast::TableFactor, BuilderError> {
+ Ok(ast::TableFactor::Table {
+ name: match self.name {
+ Some(ref value) => value.clone(),
+ None => {
+ return
Result::Err(Into::into(UninitializedFieldError::from("name")))
+ }
+ },
+ alias: self.alias.clone(),
+ args: self.args.clone(),
+ with_hints: self.with_hints.clone(),
+ version: self.version.clone(),
+ partitions: self.partitions.clone(),
+ })
+ }
+ fn create_empty() -> Self {
+ Self {
+ name: Default::default(),
+ alias: Default::default(),
+ args: Default::default(),
+ with_hints: Default::default(),
+ version: Default::default(),
+ partitions: Default::default(),
+ }
+ }
+}
+impl Default for TableRelationBuilder {
+ fn default() -> Self {
+ Self::create_empty()
+ }
+}
+#[derive(Clone)]
+pub(super) struct DerivedRelationBuilder {
+ lateral: Option<bool>,
+ subquery: Option<Box<ast::Query>>,
+ alias: Option<ast::TableAlias>,
+}
+
+#[allow(dead_code)]
+impl DerivedRelationBuilder {
+ pub fn lateral(&mut self, value: bool) -> &mut Self {
+ let new = self;
+ new.lateral = Option::Some(value);
+ new
+ }
+ pub fn subquery(&mut self, value: Box<ast::Query>) -> &mut Self {
+ let new = self;
+ new.subquery = Option::Some(value);
+ new
+ }
+ pub fn alias(&mut self, value: Option<ast::TableAlias>) -> &mut Self {
+ let new = self;
+ new.alias = value;
+ new
+ }
+ fn build(&self) -> Result<ast::TableFactor, BuilderError> {
+ Ok(ast::TableFactor::Derived {
+ lateral: match self.lateral {
+ Some(ref value) => *value,
+ None => {
+ return
Result::Err(Into::into(UninitializedFieldError::from(
+ "lateral",
+ )))
+ }
+ },
+ subquery: match self.subquery {
+ Some(ref value) => value.clone(),
+ None => {
+ return
Result::Err(Into::into(UninitializedFieldError::from(
+ "subquery",
+ )))
+ }
+ },
+ alias: self.alias.clone(),
+ })
+ }
+ fn create_empty() -> Self {
+ Self {
+ lateral: Default::default(),
+ subquery: Default::default(),
+ alias: Default::default(),
+ }
+ }
+}
+impl Default for DerivedRelationBuilder {
+ fn default() -> Self {
+ Self::create_empty()
+ }
+}
+
+/// Runtime error when a `build()` method is called and one or more required
fields
+/// do not have a value.
+#[derive(Debug, Clone)]
+pub(super) struct UninitializedFieldError(&'static str);
+
+impl UninitializedFieldError {
+ /// Create a new `UnitializedFieldError` for the specified field name.
+ pub fn new(field_name: &'static str) -> Self {
+ UninitializedFieldError(field_name)
+ }
+
+ /// Get the name of the first-declared field that wasn't initialized
+ pub fn field_name(&self) -> &'static str {
+ self.0
+ }
+}
+
+impl fmt::Display for UninitializedFieldError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Field not initialized: {}", self.0)
+ }
+}
+
+impl From<&'static str> for UninitializedFieldError {
+ fn from(field_name: &'static str) -> Self {
+ Self::new(field_name)
+ }
+}
+impl std::error::Error for UninitializedFieldError {}
+
+#[derive(Debug)]
+pub enum BuilderError {
+ UninitializedField(&'static str),
+ ValidationError(String),
+}
+impl From<UninitializedFieldError> for BuilderError {
+ fn from(s: UninitializedFieldError) -> Self {
+ Self::UninitializedField(s.field_name())
+ }
+}
+impl From<String> for BuilderError {
+ fn from(s: String) -> Self {
+ Self::ValidationError(s)
+ }
+}
+impl fmt::Display for BuilderError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ Self::UninitializedField(ref field) => {
+ write!(f, "`{}` must be initialized", field)
+ }
+ Self::ValidationError(ref error) => write!(f, "{}", error),
+ }
+ }
+}
+impl std::error::Error for BuilderError {}
diff --git a/datafusion/sql/src/unparser/expr.rs
b/datafusion/sql/src/unparser/expr.rs
index bb14c8a707..2a9fdd47ad 100644
--- a/datafusion/sql/src/unparser/expr.rs
+++ b/datafusion/sql/src/unparser/expr.rs
@@ -118,14 +118,14 @@ impl Unparser<'_> {
Ok(ast::Expr::Identifier(self.new_ident(col.name.to_string())))
}
- fn new_ident(&self, str: String) -> ast::Ident {
+ pub(super) fn new_ident(&self, str: String) -> ast::Ident {
ast::Ident {
value: str,
quote_style: self.dialect.identifier_quote_style(),
}
}
- fn binary_op_to_sql(
+ pub(super) fn binary_op_to_sql(
&self,
lhs: ast::Expr,
rhs: ast::Expr,
@@ -312,19 +312,18 @@ mod tests {
use super::*;
+ // See sql::tests for E2E tests.
+
#[test]
fn expr_to_sql_ok() -> Result<()> {
- let tests: Vec<(Expr, &str)> = vec![
- (col("a").gt(lit(4)), r#"a > 4"#),
- (
- Expr::Column(Column {
- relation: Some(TableReference::partial("a", "b")),
- name: "c".to_string(),
- })
- .gt(lit(4)),
- r#"a.b.c > 4"#,
- ),
- ];
+ let tests: Vec<(Expr, &str)> = vec![(
+ Expr::Column(Column {
+ relation: Some(TableReference::partial("a", "b")),
+ name: "c".to_string(),
+ })
+ .gt(lit(4)),
+ r#"a.b.c > 4"#,
+ )];
for (expr, expected) in tests {
let ast = expr_to_sql(&expr)?;
diff --git a/datafusion/sql/src/unparser/mod.rs
b/datafusion/sql/src/unparser/mod.rs
index 77a9de0975..e67ebc1980 100644
--- a/datafusion/sql/src/unparser/mod.rs
+++ b/datafusion/sql/src/unparser/mod.rs
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+mod ast;
mod expr;
+mod plan;
pub use expr::expr_to_sql;
+pub use plan::plan_to_sql;
use self::dialect::{DefaultDialect, Dialect};
pub mod dialect;
diff --git a/datafusion/sql/src/unparser/plan.rs
b/datafusion/sql/src/unparser/plan.rs
new file mode 100644
index 0000000000..21e4427c1f
--- /dev/null
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
+use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType,
LogicalPlan};
+use sqlparser::ast;
+
+use super::{
+ ast::{
+ BuilderError, QueryBuilder, RelationBuilder, SelectBuilder,
TableRelationBuilder,
+ TableWithJoinsBuilder,
+ },
+ Unparser,
+};
+
+/// Convert a DataFusion [`LogicalPlan`] to `sqlparser::ast::Statement`
+///
+/// This function is the opposite of `SqlToRel::sql_statement_to_plan` and can
+/// be used to, among other things, convert `LogicalPlan`s to strings.
+///
+/// # Example
+/// ```
+/// use arrow::datatypes::{DataType, Field, Schema};
+/// use datafusion_expr::{col, logical_plan::table_scan};
+/// use datafusion_sql::unparser::plan_to_sql;
+/// let schema = Schema::new(vec![
+/// Field::new("id", DataType::Utf8, false),
+/// Field::new("value", DataType::Utf8, false),
+/// ]);
+/// let plan = table_scan(Some("table"), &schema, None)
+/// .unwrap()
+/// .project(vec![col("id"), col("value")])
+/// .unwrap()
+/// .build()
+/// .unwrap();
+/// let sql = plan_to_sql(&plan).unwrap();
+///
+/// assert_eq!(format!("{}", sql), "SELECT table.id, table.value FROM table")
+/// ```
+pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
+ let unparser = Unparser::default();
+ unparser.plan_to_sql(plan)
+}
+
+impl Unparser<'_> {
+ pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
+ match plan {
+ LogicalPlan::Projection(_)
+ | LogicalPlan::Filter(_)
+ | LogicalPlan::Window(_)
+ | LogicalPlan::Aggregate(_)
+ | LogicalPlan::Sort(_)
+ | LogicalPlan::Join(_)
+ | LogicalPlan::CrossJoin(_)
+ | LogicalPlan::Repartition(_)
+ | LogicalPlan::Union(_)
+ | LogicalPlan::TableScan(_)
+ | LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Subquery(_)
+ | LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Limit(_)
+ | LogicalPlan::Statement(_)
+ | LogicalPlan::Values(_)
+ | LogicalPlan::Distinct(_) => self.select_to_sql(plan),
+ LogicalPlan::Dml(_) => self.dml_to_sql(plan),
+ LogicalPlan::Explain(_)
+ | LogicalPlan::Analyze(_)
+ | LogicalPlan::Extension(_)
+ | LogicalPlan::Prepare(_)
+ | LogicalPlan::Ddl(_)
+ | LogicalPlan::Copy(_)
+ | LogicalPlan::DescribeTable(_)
+ | LogicalPlan::RecursiveQuery(_)
+ | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan:
{plan:?}"),
+ }
+ }
+
+ fn select_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
+ let mut query_builder = QueryBuilder::default();
+ let mut select_builder = SelectBuilder::default();
+ select_builder.push_from(TableWithJoinsBuilder::default());
+ let mut relation_builder = RelationBuilder::default();
+ self.select_to_sql_recursively(
+ plan,
+ &mut query_builder,
+ &mut select_builder,
+ &mut relation_builder,
+ )?;
+
+ let mut twj = select_builder.pop_from().unwrap();
+ twj.relation(relation_builder);
+ select_builder.push_from(twj);
+
+ let body = ast::SetExpr::Select(Box::new(select_builder.build()?));
+ let query = query_builder.body(Box::new(body)).build()?;
+
+ Ok(ast::Statement::Query(Box::new(query)))
+ }
+
+ fn select_to_sql_recursively(
+ &self,
+ plan: &LogicalPlan,
+ query: &mut QueryBuilder,
+ select: &mut SelectBuilder,
+ relation: &mut RelationBuilder,
+ ) -> Result<()> {
+ match plan {
+ LogicalPlan::TableScan(scan) => {
+ let mut builder = TableRelationBuilder::default();
+ builder.name(ast::ObjectName(vec![
+ self.new_ident(scan.table_name.table().to_string())
+ ]));
+ relation.table(builder);
+
+ Ok(())
+ }
+ LogicalPlan::Projection(p) => {
+ let items = p
+ .expr
+ .iter()
+ .map(|e| self.select_item_to_sql(e))
+ .collect::<Result<Vec<_>>>()?;
+ select.projection(items);
+
+ self.select_to_sql_recursively(p.input.as_ref(), query,
select, relation)
+ }
+ LogicalPlan::Filter(filter) => {
+ let filter_expr = self.expr_to_sql(&filter.predicate)?;
+
+ select.selection(Some(filter_expr));
+
+ self.select_to_sql_recursively(
+ filter.input.as_ref(),
+ query,
+ select,
+ relation,
+ )
+ }
+ LogicalPlan::Limit(limit) => {
+ if let Some(fetch) = limit.fetch {
+ query.limit(Some(ast::Expr::Value(ast::Value::Number(
+ fetch.to_string(),
+ false,
+ ))));
+ }
+
+ self.select_to_sql_recursively(
+ limit.input.as_ref(),
+ query,
+ select,
+ relation,
+ )
+ }
+ LogicalPlan::Sort(sort) => {
+ query.order_by(self.sort_to_sql(sort.expr.clone())?);
+
+ self.select_to_sql_recursively(
+ sort.input.as_ref(),
+ query,
+ select,
+ relation,
+ )
+ }
+ LogicalPlan::Aggregate(_agg) => {
+ not_impl_err!("Unsupported operator: {plan:?}")
+ }
+ LogicalPlan::Distinct(_distinct) => {
+ not_impl_err!("Unsupported operator: {plan:?}")
+ }
+ LogicalPlan::Join(join) => {
+ match join.join_constraint {
+ JoinConstraint::On => {}
+ JoinConstraint::Using => {
+ return not_impl_err!(
+ "Unsupported join constraint: {:?}",
+ join.join_constraint
+ )
+ }
+ }
+
+ // parse filter if exists
+ let join_filter = match &join.filter {
+ Some(filter) => Some(self.expr_to_sql(filter)?),
+ None => None,
+ };
+
+ // map join.on to `l.a = r.a AND l.b = r.b AND ...`
+ let eq_op = ast::BinaryOperator::Eq;
+ let join_on = self.join_conditions_to_sql(&join.on, eq_op)?;
+
+ // Merge `join_on` and `join_filter`
+ let join_expr = match (join_filter, join_on) {
+ (Some(filter), Some(on)) =>
Some(self.and_op_to_sql(filter, on)),
+ (Some(filter), None) => Some(filter),
+ (None, Some(on)) => Some(on),
+ (None, None) => None,
+ };
+ let join_constraint = match join_expr {
+ Some(expr) => ast::JoinConstraint::On(expr),
+ None => ast::JoinConstraint::None,
+ };
+
+ let mut right_relation = RelationBuilder::default();
+
+ self.select_to_sql_recursively(
+ join.left.as_ref(),
+ query,
+ select,
+ relation,
+ )?;
+ self.select_to_sql_recursively(
+ join.right.as_ref(),
+ query,
+ select,
+ &mut right_relation,
+ )?;
+
+ let ast_join = ast::Join {
+ relation: right_relation.build()?,
+ join_operator: self
+ .join_operator_to_sql(join.join_type, join_constraint),
+ };
+ let mut from = select.pop_from().unwrap();
+ from.push_join(ast_join);
+ select.push_from(from);
+
+ Ok(())
+ }
+ LogicalPlan::SubqueryAlias(plan_alias) => {
+ // Handle bottom-up to allocate relation
+ self.select_to_sql_recursively(
+ plan_alias.input.as_ref(),
+ query,
+ select,
+ relation,
+ )?;
+
+ relation.alias(Some(
+ self.new_table_alias(plan_alias.alias.table().to_string()),
+ ));
+
+ Ok(())
+ }
+ LogicalPlan::Union(_union) => {
+ not_impl_err!("Unsupported operator: {plan:?}")
+ }
+ LogicalPlan::Window(_window) => {
+ not_impl_err!("Unsupported operator: {plan:?}")
+ }
+ LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator:
{plan:?}"),
+ _ => not_impl_err!("Unsupported operator: {plan:?}"),
+ }
+ }
+
+ fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
+ match expr {
+ Expr::Alias(Alias { expr, name, .. }) => {
+ let inner = self.expr_to_sql(expr)?;
+
+ Ok(ast::SelectItem::ExprWithAlias {
+ expr: inner,
+ alias: self.new_ident(name.to_string()),
+ })
+ }
+ _ => {
+ let inner = self.expr_to_sql(expr)?;
+
+ Ok(ast::SelectItem::UnnamedExpr(inner))
+ }
+ }
+ }
+
+ fn sort_to_sql(&self, sort_exprs: Vec<Expr>) ->
Result<Vec<ast::OrderByExpr>> {
+ sort_exprs
+ .iter()
+ .map(|expr: &Expr| match expr {
+ Expr::Sort(sort_expr) => {
+ let col = self.expr_to_sql(&sort_expr.expr)?;
+ Ok(ast::OrderByExpr {
+ asc: Some(sort_expr.asc),
+ expr: col,
+ nulls_first: Some(sort_expr.nulls_first),
+ })
+ }
+ _ => plan_err!("Expecting Sort expr"),
+ })
+ .collect::<Result<Vec<_>>>()
+ }
+
+ fn join_operator_to_sql(
+ &self,
+ join_type: JoinType,
+ constraint: ast::JoinConstraint,
+ ) -> ast::JoinOperator {
+ match join_type {
+ JoinType::Inner => ast::JoinOperator::Inner(constraint),
+ JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
+ JoinType::Right => ast::JoinOperator::RightOuter(constraint),
+ JoinType::Full => ast::JoinOperator::FullOuter(constraint),
+ JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
+ JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
+ JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
+ JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
+ }
+ }
+
+ fn join_conditions_to_sql(
+ &self,
+ join_conditions: &Vec<(Expr, Expr)>,
+ eq_op: ast::BinaryOperator,
+ ) -> Result<Option<ast::Expr>> {
+ // Only support AND conjunction for each binary expression in join
conditions
+ let mut exprs: Vec<ast::Expr> = vec![];
+ for (left, right) in join_conditions {
+ // Parse left
+ let l = self.expr_to_sql(left)?;
+ // Parse right
+ let r = self.expr_to_sql(right)?;
+ // AND with existing expression
+ exprs.push(self.binary_op_to_sql(l, r, eq_op.clone()));
+ }
+ let join_expr: Option<ast::Expr> =
+ exprs.into_iter().reduce(|r, l| self.and_op_to_sql(r, l));
+ Ok(join_expr)
+ }
+
+ fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
+ self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
+ }
+
+ fn new_table_alias(&self, alias: String) -> ast::TableAlias {
+ ast::TableAlias {
+ name: self.new_ident(alias),
+ columns: Vec::new(),
+ }
+ }
+
+ fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
+ not_impl_err!("Unsupported plan: {plan:?}")
+ }
+}
+
+impl From<BuilderError> for DataFusionError {
+ fn from(e: BuilderError) -> Self {
+ DataFusionError::External(Box::new(e))
+ }
+}
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index 6681c3d025..fdf7ab8c3d 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -22,12 +22,14 @@ use std::{sync::Arc, vec};
use arrow_schema::TimeUnit::Nanosecond;
use arrow_schema::*;
+use datafusion_sql::planner::PlannerContext;
+use datafusion_sql::unparser::{expr_to_sql, plan_to_sql};
use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
use datafusion_common::{
config::ConfigOptions, DataFusionError, Result, ScalarValue,
TableReference,
};
-use datafusion_common::{plan_err, ParamValues};
+use datafusion_common::{plan_err, DFSchema, ParamValues};
use datafusion_expr::{
logical_plan::{LogicalPlan, Prepare},
AggregateUDF, ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature,
TableSource,
@@ -39,6 +41,7 @@ use datafusion_sql::{
};
use rstest::rstest;
+use sqlparser::parser::Parser;
#[test]
fn parse_decimals() {
@@ -4487,6 +4490,87 @@ impl TableSource for EmptyTable {
}
}
+#[test]
+fn roundtrip_expr() {
+ let tests: Vec<(TableReference, &str, &str)> = vec![
+ (TableReference::bare("person"), "age > 35", "age > 35"),
+ (TableReference::bare("person"), "id = '10'", "id = '10'"),
+ ];
+
+ let roundtrip = |table, sql: &str| -> Result<String> {
+ let dialect = GenericDialect {};
+ let sql_expr = Parser::new(&dialect).try_with_sql(sql)?.parse_expr()?;
+
+ let context = MockContextProvider::default();
+ let schema = context.get_table_source(table)?.schema();
+ let df_schema = DFSchema::try_from(schema.as_ref().clone())?;
+ let sql_to_rel = SqlToRel::new(&context);
+ let expr =
+ sql_to_rel.sql_to_expr(sql_expr, &df_schema, &mut
PlannerContext::new())?;
+
+ let ast = expr_to_sql(&expr)?;
+
+ Ok(format!("{}", ast))
+ };
+
+ for (table, query, expected) in tests {
+ let actual = roundtrip(table, query).unwrap();
+ assert_eq!(actual, expected);
+ }
+}
+
+#[test]
+fn roundtrip_statement() {
+ let tests: Vec<(&str, &str)> = vec![
+ (
+ "select ta.j1_id from j1 ta;",
+ r#"SELECT ta.j1_id FROM j1 AS ta"#,
+ ),
+ (
+ "select ta.j1_id from j1 ta order by ta.j1_id;",
+ r#"SELECT ta.j1_id FROM j1 AS ta ORDER BY ta.j1_id ASC NULLS
LAST"#,
+ ),
+ (
+ "select * from j1 ta order by ta.j1_id, ta.j1_string desc;",
+ r#"SELECT ta.j1_id, ta.j1_string FROM j1 AS ta ORDER BY ta.j1_id
ASC NULLS LAST, ta.j1_string DESC NULLS FIRST"#,
+ ),
+ (
+ "select * from j1 limit 10;",
+ r#"SELECT j1.j1_id, j1.j1_string FROM j1 LIMIT 10"#,
+ ),
+ (
+ "select ta.j1_id from j1 ta where ta.j1_id > 1;",
+ r#"SELECT ta.j1_id FROM j1 AS ta WHERE ta.j1_id > 1"#,
+ ),
+ (
+ "select ta.j1_id, tb.j2_string from j1 ta join j2 tb on ta.j1_id =
tb.j2_id;",
+ r#"SELECT ta.j1_id, tb.j2_string FROM j1 AS ta JOIN j2 AS tb ON
ta.j1_id = tb.j2_id"#,
+ ),
+ (
+ "select ta.j1_id, tb.j2_string, tc.j3_string from j1 ta join j2 tb
on ta.j1_id = tb.j2_id join j3 tc on ta.j1_id = tc.j3_id;",
+ r#"SELECT ta.j1_id, tb.j2_string, tc.j3_string FROM j1 AS ta JOIN
j2 AS tb ON ta.j1_id = tb.j2_id JOIN j3 AS tc ON ta.j1_id = tc.j3_id"#,
+ ),
+ ];
+
+ let roundtrip = |sql: &str| -> Result<String> {
+ let dialect = GenericDialect {};
+ let statement =
Parser::new(&dialect).try_with_sql(sql)?.parse_statement()?;
+
+ let context = MockContextProvider::default();
+ let sql_to_rel = SqlToRel::new(&context);
+ let plan = sql_to_rel.sql_statement_to_plan(statement)?;
+
+ let ast = plan_to_sql(&plan)?;
+
+ Ok(format!("{}", ast))
+ };
+
+ for (query, expected) in tests {
+ let actual = roundtrip(query).unwrap();
+ assert_eq!(actual, expected);
+ }
+}
+
#[cfg(test)]
#[ctor::ctor]
fn init() {