This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a8dedc8 Add support for UNION sql (#1068)
a8dedc8 is described below
commit a8dedc87a60f93faad1d78e77801de4859e28be9
Author: carlos <[email protected]>
AuthorDate: Mon Oct 4 20:57:18 2021 +0800
Add support for UNION sql (#1068)
---
README.md | 2 +-
datafusion/src/dataframe.rs | 15 +++++++++++++++
datafusion/src/execution/dataframe_impl.rs | 9 +++++++++
datafusion/src/logical_plan/builder.rs | 9 +++++++++
datafusion/src/sql/planner.rs | 10 ++++++++--
datafusion/tests/sql.rs | 10 ++++++++++
6 files changed, 52 insertions(+), 3 deletions(-)
diff --git a/README.md b/README.md
index ed7e8bd..b2c56a3 100644
--- a/README.md
+++ b/README.md
@@ -212,7 +212,7 @@ DataFusion also includes a simple command-line interactive
SQL utility. See the
- [x] Common table expressions
- [ ] Set Operations
- [x] UNION ALL
- - [ ] UNION
+ - [x] UNION
- [ ] INTERSECT
- [ ] MINUS
- [x] Joins
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index da64ffa..5b157d0 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -147,6 +147,21 @@ pub trait DataFrame: Send + Sync {
/// ```
fn union(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn
DataFrame>>;
+ /// Calculate the union distinct two [`DataFrame`]s. The two
[`DataFrame`]s must have exactly the same schema
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # fn main() -> Result<()> {
+ /// let mut ctx = ExecutionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let df = df.union(df.clone())?;
+ /// let df = df.distinct()?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ fn distinct(&self) -> Result<Arc<dyn DataFrame>>;
+
/// Sort the DataFrame by the specified sorting expressions. Any
expression can be turned into
/// a sort expression by calling its
[sort](../logical_plan/enum.Expr.html#method.sort) method.
///
diff --git a/datafusion/src/execution/dataframe_impl.rs
b/datafusion/src/execution/dataframe_impl.rs
index cd39dd6..e57db17 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -214,6 +214,15 @@ impl DataFrame for DataFrameImpl {
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}
+
+ fn distinct(&self) -> Result<Arc<dyn DataFrame>> {
+ Ok(Arc::new(DataFrameImpl::new(
+ self.ctx_state.clone(),
+ &LogicalPlanBuilder::from(self.to_logical_plan())
+ .distinct()?
+ .build()?,
+ )))
+ }
}
#[cfg(test)]
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index d902d6f..20908d7 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -291,6 +291,15 @@ impl LogicalPlanBuilder {
Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
}
+ /// Apply deduplication: Only distinct (different) values are returned)
+ pub fn distinct(&self) -> Result<Self> {
+ let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
+ let plan = LogicalPlanBuilder::from(self.plan.clone())
+ .aggregate(projection_expr, vec![])?
+ .build()?;
+ Self::from(plan).project(vec![Expr::Wildcard])
+ }
+
/// Apply a join with on constraint
pub fn join(
&self,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 140ae26..78601bc 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -170,8 +170,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let right_plan = self.set_expr_to_plan(right.as_ref(),
None, ctes)?;
union_with_alias(left_plan, right_plan, alias)
}
+ (SetOperator::Union, false) => {
+ let left_plan = self.set_expr_to_plan(left.as_ref(), None,
ctes)?;
+ let right_plan = self.set_expr_to_plan(right.as_ref(),
None, ctes)?;
+ let union_plan = union_with_alias(left_plan, right_plan,
alias)?;
+ LogicalPlanBuilder::from(union_plan).distinct()?.build()
+ }
_ => Err(DataFusionError::NotImplemented(format!(
- "Only UNION ALL is supported, found {}",
+ "Only UNION ALL and UNION [DISTINCT] are supported, found
{}",
op
))),
},
@@ -3440,7 +3446,7 @@ mod tests {
let sql = "SELECT order_id from orders EXCEPT SELECT order_id FROM
orders";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
- "NotImplemented(\"Only UNION ALL is supported, found EXCEPT\")",
+ "NotImplemented(\"Only UNION ALL and UNION [DISTINCT] are
supported, found EXCEPT\")",
format!("{:?}", err)
);
}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index d3aa38d..d8e580c 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -5066,3 +5066,13 @@ async fn avro_explain() {
];
assert_eq!(expected, actual);
}
+
+#[tokio::test]
+async fn union_distinct() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ let sql = "SELECT 1 as x UNION SELECT 1 as x";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![vec!["1"]];
+ assert_eq!(expected, actual);
+ Ok(())
+}