This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c3f1d7246 Add DataFrame `union_distinct` and fix documentation for 
`distinct` (#2574)
c3f1d7246 is described below

commit c3f1d72460c006f66c21cd3373556f550ab98212
Author: Andy Grove <[email protected]>
AuthorDate: Fri May 20 02:49:34 2022 -0600

    Add DataFrame `union_distinct` and fix documentation for `distinct` (#2574)
---
 datafusion/core/src/dataframe.rs            | 29 ++++++++++++++++++++++++++---
 datafusion/core/src/logical_plan/builder.rs |  7 ++++++-
 datafusion/core/src/sql/planner.rs          | 29 ++++++++++++++++++++---------
 3 files changed, 52 insertions(+), 13 deletions(-)

diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index d0670bb28..10d06ca47 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -208,7 +208,8 @@ impl DataFrame {
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
     }
 
-    /// Calculate the union two [`DataFrame`]s.  The two [`DataFrame`]s must 
have exactly the same schema
+    /// Calculate the union of two [`DataFrame`]s, preserving duplicate 
rows.The
+    /// two [`DataFrame`]s must have exactly the same schema
     ///
     /// ```
     /// # use datafusion::prelude::*;
@@ -228,7 +229,30 @@ impl DataFrame {
         Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
     }
 
-    /// Calculate the union distinct two [`DataFrame`]s.  The two 
[`DataFrame`]s must have exactly the same schema
+    /// Calculate the distinct union of two [`DataFrame`]s.  The
+    /// two [`DataFrame`]s must have exactly the same schema
+    ///
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let ctx = SessionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).await?;
+    /// let df = df.union_distinct(df.clone())?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn union_distinct(&self, dataframe: Arc<DataFrame>) -> 
Result<Arc<DataFrame>> {
+        Ok(Arc::new(DataFrame::new(
+            self.session_state.clone(),
+            &LogicalPlanBuilder::from(self.plan.clone())
+                .union_distinct(dataframe.plan.clone())?
+                .build()?,
+        )))
+    }
+
+    /// Filter out duplicate rows
     ///
     /// ```
     /// # use datafusion::prelude::*;
@@ -237,7 +261,6 @@ impl DataFrame {
     /// # async fn main() -> Result<()> {
     /// let ctx = SessionContext::new();
     /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).await?;
-    /// let df = df.union(df.clone())?;
     /// let df = df.distinct()?;
     /// # Ok(())
     /// # }
diff --git a/datafusion/core/src/logical_plan/builder.rs 
b/datafusion/core/src/logical_plan/builder.rs
index e84313f82..f8b1fefdd 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -431,11 +431,16 @@ impl LogicalPlanBuilder {
         })))
     }
 
-    /// Apply a union
+    /// Apply a union, preserving duplicate rows
     pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
         Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
     }
 
+    /// Apply a union, removing duplicate rows
+    pub fn union_distinct(&self, plan: LogicalPlan) -> Result<Self> {
+        self.union(plan)?.distinct()
+    }
+
     /// Apply deduplication: Only distinct (different) values are returned)
     pub fn distinct(&self) -> Result<Self> {
         let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
diff --git a/datafusion/core/src/sql/planner.rs 
b/datafusion/core/src/sql/planner.rs
index 85c2d8f0c..b4efc0868 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -28,8 +28,8 @@ use crate::datasource::TableProvider;
 use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
 use crate::logical_plan::Expr::Alias;
 use crate::logical_plan::{
-    and, col, lit, normalize_col, normalize_col_with_schemas, 
union_with_alias, Column,
-    CreateCatalog, CreateCatalogSchema, CreateExternalTable as 
PlanCreateExternalTable,
+    and, col, lit, normalize_col, normalize_col_with_schemas, Column, 
CreateCatalog,
+    CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
     CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, 
FileType,
     LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, 
ToStringifiedPlan,
 };
@@ -324,13 +324,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 let right_plan =
                     self.set_expr_to_plan(*right, None, ctes, 
outer_query_schema)?;
                 match (op, all) {
-                    (SetOperator::Union, true) => {
-                        union_with_alias(left_plan, right_plan, alias)
-                    }
-                    (SetOperator::Union, false) => {
-                        let union_plan = union_with_alias(left_plan, 
right_plan, alias)?;
-                        
LogicalPlanBuilder::from(union_plan).distinct()?.build()
-                    }
+                    (SetOperator::Union, true) => 
LogicalPlanBuilder::from(left_plan)
+                        .union(right_plan)?
+                        .build(),
+                    (SetOperator::Union, false) => 
LogicalPlanBuilder::from(left_plan)
+                        .union_distinct(right_plan)?
+                        .build(),
                     (SetOperator::Intersect, true) => {
                         LogicalPlanBuilder::intersect(left_plan, right_plan, 
true)
                     }
@@ -3915,6 +3914,18 @@ mod tests {
 
     #[test]
     fn union() {
+        let sql = "SELECT order_id from orders UNION SELECT order_id FROM 
orders";
+        let expected = "Projection: #order_id\
+        \n  Aggregate: groupBy=[[#order_id]], aggr=[[]]\
+        \n    Union\n      Projection: #orders.order_id\
+        \n        TableScan: orders projection=None\
+        \n      Projection: #orders.order_id\
+        \n        TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    #[test]
+    fn union_all() {
         let sql = "SELECT order_id from orders UNION ALL SELECT order_id FROM 
orders";
         let expected = "Union\
             \n  Projection: #orders.order_id\

Reply via email to