This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c3f1d7246 Add DataFrame `union_distinct` and fix documentation for
`distinct` (#2574)
c3f1d7246 is described below
commit c3f1d72460c006f66c21cd3373556f550ab98212
Author: Andy Grove <[email protected]>
AuthorDate: Fri May 20 02:49:34 2022 -0600
Add DataFrame `union_distinct` and fix documentation for `distinct` (#2574)
---
datafusion/core/src/dataframe.rs | 29 ++++++++++++++++++++++++++---
datafusion/core/src/logical_plan/builder.rs | 7 ++++++-
datafusion/core/src/sql/planner.rs | 29 ++++++++++++++++++++---------
3 files changed, 52 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index d0670bb28..10d06ca47 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -208,7 +208,8 @@ impl DataFrame {
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
}
- /// Calculate the union two [`DataFrame`]s. The two [`DataFrame`]s must
have exactly the same schema
+ /// Calculate the union of two [`DataFrame`]s, preserving duplicate
rows.The
+ /// two [`DataFrame`]s must have exactly the same schema
///
/// ```
/// # use datafusion::prelude::*;
@@ -228,7 +229,30 @@ impl DataFrame {
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
}
- /// Calculate the union distinct two [`DataFrame`]s. The two
[`DataFrame`]s must have exactly the same schema
+ /// Calculate the distinct union of two [`DataFrame`]s. The
+ /// two [`DataFrame`]s must have exactly the same schema
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let ctx = SessionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).await?;
+ /// let df = df.union_distinct(df.clone())?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn union_distinct(&self, dataframe: Arc<DataFrame>) ->
Result<Arc<DataFrame>> {
+ Ok(Arc::new(DataFrame::new(
+ self.session_state.clone(),
+ &LogicalPlanBuilder::from(self.plan.clone())
+ .union_distinct(dataframe.plan.clone())?
+ .build()?,
+ )))
+ }
+
+ /// Filter out duplicate rows
///
/// ```
/// # use datafusion::prelude::*;
@@ -237,7 +261,6 @@ impl DataFrame {
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).await?;
- /// let df = df.union(df.clone())?;
/// let df = df.distinct()?;
/// # Ok(())
/// # }
diff --git a/datafusion/core/src/logical_plan/builder.rs
b/datafusion/core/src/logical_plan/builder.rs
index e84313f82..f8b1fefdd 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -431,11 +431,16 @@ impl LogicalPlanBuilder {
})))
}
- /// Apply a union
+ /// Apply a union, preserving duplicate rows
pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
}
+ /// Apply a union, removing duplicate rows
+ pub fn union_distinct(&self, plan: LogicalPlan) -> Result<Self> {
+ self.union(plan)?.distinct()
+ }
+
/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(&self) -> Result<Self> {
let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
diff --git a/datafusion/core/src/sql/planner.rs
b/datafusion/core/src/sql/planner.rs
index 85c2d8f0c..b4efc0868 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -28,8 +28,8 @@ use crate::datasource::TableProvider;
use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
- and, col, lit, normalize_col, normalize_col_with_schemas,
union_with_alias, Column,
- CreateCatalog, CreateCatalogSchema, CreateExternalTable as
PlanCreateExternalTable,
+ and, col, lit, normalize_col, normalize_col_with_schemas, Column,
CreateCatalog,
+ CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr,
FileType,
LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
ToStringifiedPlan,
};
@@ -324,13 +324,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let right_plan =
self.set_expr_to_plan(*right, None, ctes,
outer_query_schema)?;
match (op, all) {
- (SetOperator::Union, true) => {
- union_with_alias(left_plan, right_plan, alias)
- }
- (SetOperator::Union, false) => {
- let union_plan = union_with_alias(left_plan,
right_plan, alias)?;
-
LogicalPlanBuilder::from(union_plan).distinct()?.build()
- }
+ (SetOperator::Union, true) =>
LogicalPlanBuilder::from(left_plan)
+ .union(right_plan)?
+ .build(),
+ (SetOperator::Union, false) =>
LogicalPlanBuilder::from(left_plan)
+ .union_distinct(right_plan)?
+ .build(),
(SetOperator::Intersect, true) => {
LogicalPlanBuilder::intersect(left_plan, right_plan,
true)
}
@@ -3915,6 +3914,18 @@ mod tests {
#[test]
fn union() {
+ let sql = "SELECT order_id from orders UNION SELECT order_id FROM
orders";
+ let expected = "Projection: #order_id\
+ \n Aggregate: groupBy=[[#order_id]], aggr=[[]]\
+ \n Union\n Projection: #orders.order_id\
+ \n TableScan: orders projection=None\
+ \n Projection: #orders.order_id\
+ \n TableScan: orders projection=None";
+ quick_test(sql, expected);
+ }
+
+ #[test]
+ fn union_all() {
let sql = "SELECT order_id from orders UNION ALL SELECT order_id FROM
orders";
let expected = "Union\
\n Projection: #orders.order_id\