This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a8dedc8  Add support for UNION sql (#1068)
a8dedc8 is described below

commit a8dedc87a60f93faad1d78e77801de4859e28be9
Author: carlos <[email protected]>
AuthorDate: Mon Oct 4 20:57:18 2021 +0800

    Add support for UNION sql (#1068)
---
 README.md                                  |  2 +-
 datafusion/src/dataframe.rs                | 15 +++++++++++++++
 datafusion/src/execution/dataframe_impl.rs |  9 +++++++++
 datafusion/src/logical_plan/builder.rs     |  9 +++++++++
 datafusion/src/sql/planner.rs              | 10 ++++++++--
 datafusion/tests/sql.rs                    | 10 ++++++++++
 6 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/README.md b/README.md
index ed7e8bd..b2c56a3 100644
--- a/README.md
+++ b/README.md
@@ -212,7 +212,7 @@ DataFusion also includes a simple command-line interactive 
SQL utility. See the
 - [x] Common table expressions
 - [ ] Set Operations
   - [x] UNION ALL
-  - [ ] UNION
+  - [x] UNION
   - [ ] INTERSECT
   - [ ] MINUS
 - [x] Joins
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index da64ffa..5b157d0 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -147,6 +147,21 @@ pub trait DataFrame: Send + Sync {
     /// ```
     fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn 
DataFrame>>;
 
+    /// Calculate the union distinct two [`DataFrame`]s.  The two 
[`DataFrame`]s must have exactly the same schema
+    ///
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # fn main() -> Result<()> {
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let df = df.union(df.clone())?;
+    /// let df = df.distinct()?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    fn distinct(&self) -> Result<Arc<dyn DataFrame>>;
+
     /// Sort the DataFrame by the specified sorting expressions. Any 
expression can be turned into
     /// a sort expression by calling its 
[sort](../logical_plan/enum.Expr.html#method.sort) method.
     ///
diff --git a/datafusion/src/execution/dataframe_impl.rs 
b/datafusion/src/execution/dataframe_impl.rs
index cd39dd6..e57db17 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -214,6 +214,15 @@ impl DataFrame for DataFrameImpl {
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
+
+    fn distinct(&self) -> Result<Arc<dyn DataFrame>> {
+        Ok(Arc::new(DataFrameImpl::new(
+            self.ctx_state.clone(),
+            &LogicalPlanBuilder::from(self.to_logical_plan())
+                .distinct()?
+                .build()?,
+        )))
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index d902d6f..20908d7 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -291,6 +291,15 @@ impl LogicalPlanBuilder {
         Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
     }
 
+    /// Apply deduplication: Only distinct (different) values are returned)
+    pub fn distinct(&self) -> Result<Self> {
+        let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
+        let plan = LogicalPlanBuilder::from(self.plan.clone())
+            .aggregate(projection_expr, vec![])?
+            .build()?;
+        Self::from(plan).project(vec![Expr::Wildcard])
+    }
+
     /// Apply a join with on constraint
     pub fn join(
         &self,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 140ae26..78601bc 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -170,8 +170,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     let right_plan = self.set_expr_to_plan(right.as_ref(), 
None, ctes)?;
                     union_with_alias(left_plan, right_plan, alias)
                 }
+                (SetOperator::Union, false) => {
+                    let left_plan = self.set_expr_to_plan(left.as_ref(), None, 
ctes)?;
+                    let right_plan = self.set_expr_to_plan(right.as_ref(), 
None, ctes)?;
+                    let union_plan = union_with_alias(left_plan, right_plan, 
alias)?;
+                    LogicalPlanBuilder::from(union_plan).distinct()?.build()
+                }
                 _ => Err(DataFusionError::NotImplemented(format!(
-                    "Only UNION ALL is supported, found {}",
+                    "Only UNION ALL and UNION [DISTINCT] are supported, found 
{}",
                     op
                 ))),
             },
@@ -3440,7 +3446,7 @@ mod tests {
         let sql = "SELECT order_id from orders EXCEPT SELECT order_id FROM 
orders";
         let err = logical_plan(sql).expect_err("query should have failed");
         assert_eq!(
-            "NotImplemented(\"Only UNION ALL is supported, found EXCEPT\")",
+            "NotImplemented(\"Only UNION ALL and UNION [DISTINCT] are 
supported, found EXCEPT\")",
             format!("{:?}", err)
         );
     }
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index d3aa38d..d8e580c 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -5066,3 +5066,13 @@ async fn avro_explain() {
     ];
     assert_eq!(expected, actual);
 }
+
+#[tokio::test]
+async fn union_distinct() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let sql = "SELECT 1 as x UNION SELECT 1 as x";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![vec!["1"]];
+    assert_eq!(expected, actual);
+    Ok(())
+}

Reply via email to