This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 045e8fcaea Implement Unparser for UNION ALL (#10603)
045e8fcaea is described below

commit 045e8fcaeac2f2c29afe5017e5efe5a3a2560080
Author: Phillip LeBlanc <[email protected]>
AuthorDate: Wed May 22 04:33:57 2024 +0900

    Implement Unparser for UNION ALL (#10603)
---
 datafusion/sql/src/unparser/ast.rs      |  3 ++
 datafusion/sql/src/unparser/plan.rs     | 77 +++++++++++++++++++++++++++------
 datafusion/sql/tests/sql_integration.rs |  8 ++++
 3 files changed, 75 insertions(+), 13 deletions(-)

diff --git a/datafusion/sql/src/unparser/ast.rs 
b/datafusion/sql/src/unparser/ast.rs
index 0a76aee2e0..c8f53cd7ba 100644
--- a/datafusion/sql/src/unparser/ast.rs
+++ b/datafusion/sql/src/unparser/ast.rs
@@ -50,6 +50,9 @@ impl QueryBuilder {
         new.body = Option::Some(value);
         new
     }
+    pub fn take_body(&mut self) -> Option<Box<ast::SetExpr>> {
+        self.body.take()
+    }
     pub fn order_by(&mut self, value: Vec<ast::OrderByExpr>) -> &mut Self {
         let new = self;
         new.order_by = value;
diff --git a/datafusion/sql/src/unparser/plan.rs 
b/datafusion/sql/src/unparser/plan.rs
index de36fb0371..3373220a84 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -17,7 +17,7 @@
 
 use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, 
Result};
 use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType, 
LogicalPlan};
-use sqlparser::ast::{self};
+use sqlparser::ast::{self, SetExpr};
 
 use crate::unparser::utils::unproject_agg_exprs;
 
@@ -78,7 +78,7 @@ impl Unparser<'_> {
             | LogicalPlan::Limit(_)
             | LogicalPlan::Statement(_)
             | LogicalPlan::Values(_)
-            | LogicalPlan::Distinct(_) => self.select_to_sql(plan),
+            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(plan),
             LogicalPlan::Dml(_) => self.dml_to_sql(plan),
             LogicalPlan::Explain(_)
             | LogicalPlan::Analyze(_)
@@ -92,32 +92,47 @@ impl Unparser<'_> {
         }
     }
 
-    fn select_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
-        let mut query_builder = QueryBuilder::default();
+    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> 
Result<ast::Statement> {
+        let mut query_builder = Some(QueryBuilder::default());
+
+        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
+
+        let query = query_builder.unwrap().body(Box::new(body)).build()?;
+
+        Ok(ast::Statement::Query(Box::new(query)))
+    }
+
+    fn select_to_sql_expr(
+        &self,
+        plan: &LogicalPlan,
+        query: &mut Option<QueryBuilder>,
+    ) -> Result<ast::SetExpr> {
         let mut select_builder = SelectBuilder::default();
         select_builder.push_from(TableWithJoinsBuilder::default());
         let mut relation_builder = RelationBuilder::default();
         self.select_to_sql_recursively(
             plan,
-            &mut query_builder,
+            query,
             &mut select_builder,
             &mut relation_builder,
         )?;
 
+        // If we were able to construct a full body (i.e. UNION ALL), return it
+        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
+            return Ok(*body);
+        }
+
         let mut twj = select_builder.pop_from().unwrap();
         twj.relation(relation_builder);
         select_builder.push_from(twj);
 
-        let body = ast::SetExpr::Select(Box::new(select_builder.build()?));
-        let query = query_builder.body(Box::new(body)).build()?;
-
-        Ok(ast::Statement::Query(Box::new(query)))
+        Ok(ast::SetExpr::Select(Box::new(select_builder.build()?)))
     }
 
     fn select_to_sql_recursively(
         &self,
         plan: &LogicalPlan,
-        query: &mut QueryBuilder,
+        query: &mut Option<QueryBuilder>,
         select: &mut SelectBuilder,
         relation: &mut RelationBuilder,
     ) -> Result<()> {
@@ -206,6 +221,11 @@ impl Unparser<'_> {
             }
             LogicalPlan::Limit(limit) => {
                 if let Some(fetch) = limit.fetch {
+                    let Some(query) = query.as_mut() else {
+                        return internal_err!(
+                            "Limit operator only valid in a statement context."
+                        );
+                    };
                     query.limit(Some(ast::Expr::Value(ast::Value::Number(
                         fetch.to_string(),
                         false,
@@ -220,7 +240,13 @@ impl Unparser<'_> {
                 )
             }
             LogicalPlan::Sort(sort) => {
-                query.order_by(self.sort_to_sql(sort.expr.clone())?);
+                if let Some(query_ref) = query {
+                    query_ref.order_by(self.sort_to_sql(sort.expr.clone())?);
+                } else {
+                    return internal_err!(
+                        "Sort operator only valid in a statement context."
+                    );
+                }
 
                 self.select_to_sql_recursively(
                     sort.input.as_ref(),
@@ -347,8 +373,33 @@ impl Unparser<'_> {
 
                 Ok(())
             }
-            LogicalPlan::Union(_union) => {
-                not_impl_err!("Unsupported operator: {plan:?}")
+            LogicalPlan::Union(union) => {
+                if union.inputs.len() != 2 {
+                    return not_impl_err!(
+                        "UNION ALL expected 2 inputs, but found {}",
+                        union.inputs.len()
+                    );
+                }
+
+                let input_exprs: Vec<SetExpr> = union
+                    .inputs
+                    .iter()
+                    .map(|input| self.select_to_sql_expr(input, &mut None))
+                    .collect::<Result<Vec<_>>>()?;
+
+                let union_expr = ast::SetExpr::SetOperation {
+                    op: ast::SetOperator::Union,
+                    set_quantifier: ast::SetQuantifier::All,
+                    left: Box::new(input_exprs[0].clone()),
+                    right: Box::new(input_exprs[1].clone()),
+                };
+
+                query
+                    .as_mut()
+                    .expect("to have a query builder")
+                    .body(Box::new(union_expr));
+
+                Ok(())
             }
             LogicalPlan::Window(_window) => {
                 not_impl_err!("Unsupported operator: {plan:?}")
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index 3247d7bb20..bbedaca6a8 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -4641,6 +4641,14 @@ fn roundtrip_statement() -> Result<()> {
             group by "Last Name", p.id 
             having count_first_name>5 and count_first_name<10
             order by count_first_name, "Last Name""#,
+            r#"SELECT j1_string as string FROM j1
+            UNION ALL
+            SELECT j2_string as string FROM j2"#,
+            r#"SELECT j1_string as string FROM j1
+            UNION ALL
+            SELECT j2_string as string FROM j2
+            ORDER BY string DESC
+            LIMIT 10"#
         ];
 
     // For each test sql string, we transform as follows:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to