This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6f1212d Implement EXCEPT & EXCEPT DISTINCT (#1259)
6f1212d is described below
commit 6f1212d140c8dfb9f267f67e50e7dffb79c7874c
Author: Carlos <[email protected]>
AuthorDate: Tue Nov 9 03:07:14 2021 +0800
Implement EXCEPT & EXCEPT DISTINCT (#1259)
---
datafusion/src/logical_plan/builder.rs | 59 ++++++++++++++++++++++++++++++
datafusion/src/sql/planner.rs | 48 ++++++++++---------------
datafusion/tests/sql.rs | 66 +++++++++++++++++++++++++++++++++-
3 files changed, 143 insertions(+), 30 deletions(-)
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index ac8e0c3..0c7950c 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -715,6 +715,65 @@ impl LogicalPlanBuilder {
}
}
+ /// Process intersect set operator
+ pub(crate) fn intersect(
+ left_plan: LogicalPlan,
+ right_plan: LogicalPlan,
+ is_all: bool,
+ ) -> Result<LogicalPlan> {
+ LogicalPlanBuilder::intersect_or_except(
+ left_plan,
+ right_plan,
+ JoinType::Semi,
+ is_all,
+ )
+ }
+
+ /// Process except set operator
+ pub(crate) fn except(
+ left_plan: LogicalPlan,
+ right_plan: LogicalPlan,
+ is_all: bool,
+ ) -> Result<LogicalPlan> {
+ LogicalPlanBuilder::intersect_or_except(
+ left_plan,
+ right_plan,
+ JoinType::Anti,
+ is_all,
+ )
+ }
+
+ /// Process intersect or except
+ fn intersect_or_except(
+ left_plan: LogicalPlan,
+ right_plan: LogicalPlan,
+ join_type: JoinType,
+ is_all: bool,
+ ) -> Result<LogicalPlan> {
+ let join_keys = left_plan
+ .schema()
+ .fields()
+ .iter()
+ .zip(right_plan.schema().fields().iter())
+ .map(|(left_field, right_field)| {
+ (
+ (Column::from_name(left_field.name())),
+ (Column::from_name(right_field.name())),
+ )
+ })
+ .unzip();
+ if is_all {
+ LogicalPlanBuilder::from(left_plan)
+ .join_detailed(&right_plan, join_type, join_keys, true)?
+ .build()
+ } else {
+ LogicalPlanBuilder::from(left_plan)
+ .distinct()?
+ .join_detailed(&right_plan, join_type, join_keys, true)?
+ .build()
+ }
+ }
+
/// Build the plan
pub fn build(&self) -> Result<LogicalPlan> {
Ok(self.plan.clone())
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 50875a8..2053d76 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -226,25 +226,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let left_plan = self.set_expr_to_plan(left.as_ref(), None,
ctes)?;
let right_plan = self.set_expr_to_plan(right.as_ref(), None,
ctes)?;
match (op, all) {
- (SetOperator::Union, true) => {
- union_with_alias(left_plan, right_plan, alias)
- }
- (SetOperator::Union, false) => {
- let union_plan = union_with_alias(left_plan, right_plan,
alias)?;
- LogicalPlanBuilder::from(union_plan).distinct()?.build()
- }
- (SetOperator::Intersect, true) => {
- let join_keys =
left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field,
right_field)| ((Column::from_name(left_field.name())),
(Column::from_name(right_field.name())))).unzip();
-
LogicalPlanBuilder::from(left_plan).join_detailed(&right_plan, JoinType::Semi,
join_keys, true)?.build()
- }
- (SetOperator::Intersect, false) => {
- let join_keys =
left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field,
right_field)| ((Column::from_name(left_field.name())),
(Column::from_name(right_field.name())))).unzip();
-
LogicalPlanBuilder::from(left_plan).distinct()?.join_detailed(&right_plan,
JoinType::Semi, join_keys, true)?.build()
- }
- _ => Err(DataFusionError::NotImplemented(format!(
- "Only UNION ALL and UNION [DISTINCT] and INTERSECT and
INTERSECT [DISTINCT] are supported, found {}",
- op
- ))),
+ (SetOperator::Union, true) => {
+ union_with_alias(left_plan, right_plan, alias)
+ }
+ (SetOperator::Union, false) => {
+ let union_plan = union_with_alias(left_plan,
right_plan, alias)?;
+
LogicalPlanBuilder::from(union_plan).distinct()?.build()
+ }
+ (SetOperator::Intersect, true) => {
+ LogicalPlanBuilder::intersect(left_plan, right_plan,
true)
+ }
+ (SetOperator::Intersect, false) => {
+ LogicalPlanBuilder::intersect(left_plan, right_plan,
false)
+ }
+ (SetOperator::Except, true) => {
+ LogicalPlanBuilder::except(left_plan, right_plan, true)
+ }
+ (SetOperator::Except, false) => {
+ LogicalPlanBuilder::except(left_plan, right_plan,
false)
+ }
}
}
_ => Err(DataFusionError::NotImplemented(format!(
@@ -3581,16 +3581,6 @@ mod tests {
}
#[test]
- fn except_not_supported() {
- let sql = "SELECT order_id from orders EXCEPT SELECT order_id FROM
orders";
- let err = logical_plan(sql).expect_err("query should have failed");
- assert_eq!(
- "NotImplemented(\"Only UNION ALL and UNION [DISTINCT] and
INTERSECT and INTERSECT [DISTINCT] are supported, found EXCEPT\")",
- format!("{:?}", err)
- );
- }
-
- #[test]
fn select_typedstring() {
let sql = "SELECT date '2020-12-10' AS date FROM person";
let expected = "Projection: CAST(Utf8(\"2020-12-10\") AS Date32) AS
date\
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index e6064af..016781c 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -5623,7 +5623,7 @@ async fn intersect_with_null_equal() {
let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
INTERSECT SELECT * FROM (SELECT null AS id1, 1 AS id2) t2";
- let expected: Vec<Vec<String>> = vec![vec!["NULL".to_string(),
"1".to_string()]];
+ let expected = vec![vec!["NULL", "1"]];
let mut ctx = create_join_context_qualified().unwrap();
let actual = execute(&mut ctx, sql).await;
@@ -5669,3 +5669,67 @@ async fn test_intersect_distinct() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn except_with_null_not_equal() {
+ let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
+ EXCEPT SELECT * FROM (SELECT null AS id1, 2 AS id2) t2";
+
+ let expected = vec![vec!["NULL", "1"]];
+
+ let mut ctx = create_join_context_qualified().unwrap();
+ let actual = execute(&mut ctx, sql).await;
+
+ assert_eq!(expected, actual);
+}
+
+#[tokio::test]
+async fn except_with_null_equal() {
+ let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
+ EXCEPT SELECT * FROM (SELECT null AS id1, 1 AS id2) t2";
+
+ let expected: &[&[&str]] = &[];
+ let mut ctx = create_join_context_qualified().unwrap();
+ let actual = execute(&mut ctx, sql).await;
+
+ assert_eq!(expected, actual);
+}
+
+#[tokio::test]
+async fn test_expect_all() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ register_alltypes_parquet(&mut ctx).await;
+ // execute the query
+ let sql = "SELECT int_col, double_col FROM alltypes_plain where int_col >
0 EXCEPT ALL SELECT int_col, double_col FROM alltypes_plain where int_col < 1";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+------------+",
+ "| int_col | double_col |",
+ "+---------+------------+",
+ "| 1 | 10.1 |",
+ "| 1 | 10.1 |",
+ "| 1 | 10.1 |",
+ "| 1 | 10.1 |",
+ "+---------+------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_expect_distinct() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ register_alltypes_parquet(&mut ctx).await;
+ // execute the query
+ let sql = "SELECT int_col, double_col FROM alltypes_plain where int_col >
0 EXCEPT SELECT int_col, double_col FROM alltypes_plain where int_col < 1";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+------------+",
+ "| int_col | double_col |",
+ "+---------+------------+",
+ "| 1 | 10.1 |",
+ "+---------+------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}