This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f1212d  Implement EXCEPT & EXCEPT DISTINCT (#1259)
6f1212d is described below

commit 6f1212d140c8dfb9f267f67e50e7dffb79c7874c
Author: Carlos <[email protected]>
AuthorDate: Tue Nov 9 03:07:14 2021 +0800

    Implement EXCEPT & EXCEPT DISTINCT (#1259)
---
 datafusion/src/logical_plan/builder.rs | 59 ++++++++++++++++++++++++++++++
 datafusion/src/sql/planner.rs          | 48 ++++++++++---------------
 datafusion/tests/sql.rs                | 66 +++++++++++++++++++++++++++++++++-
 3 files changed, 143 insertions(+), 30 deletions(-)

diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index ac8e0c3..0c7950c 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -715,6 +715,65 @@ impl LogicalPlanBuilder {
         }
     }
 
+    /// Process intersect set operator
+    pub(crate) fn intersect(
+        left_plan: LogicalPlan,
+        right_plan: LogicalPlan,
+        is_all: bool,
+    ) -> Result<LogicalPlan> {
+        LogicalPlanBuilder::intersect_or_except(
+            left_plan,
+            right_plan,
+            JoinType::Semi,
+            is_all,
+        )
+    }
+
+    /// Process except set operator
+    pub(crate) fn except(
+        left_plan: LogicalPlan,
+        right_plan: LogicalPlan,
+        is_all: bool,
+    ) -> Result<LogicalPlan> {
+        LogicalPlanBuilder::intersect_or_except(
+            left_plan,
+            right_plan,
+            JoinType::Anti,
+            is_all,
+        )
+    }
+
+    /// Process intersect or except
+    fn intersect_or_except(
+        left_plan: LogicalPlan,
+        right_plan: LogicalPlan,
+        join_type: JoinType,
+        is_all: bool,
+    ) -> Result<LogicalPlan> {
+        let join_keys = left_plan
+            .schema()
+            .fields()
+            .iter()
+            .zip(right_plan.schema().fields().iter())
+            .map(|(left_field, right_field)| {
+                (
+                    (Column::from_name(left_field.name())),
+                    (Column::from_name(right_field.name())),
+                )
+            })
+            .unzip();
+        if is_all {
+            LogicalPlanBuilder::from(left_plan)
+                .join_detailed(&right_plan, join_type, join_keys, true)?
+                .build()
+        } else {
+            LogicalPlanBuilder::from(left_plan)
+                .distinct()?
+                .join_detailed(&right_plan, join_type, join_keys, true)?
+                .build()
+        }
+    }
+
     /// Build the plan
     pub fn build(&self) -> Result<LogicalPlan> {
         Ok(self.plan.clone())
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 50875a8..2053d76 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -226,25 +226,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 let left_plan = self.set_expr_to_plan(left.as_ref(), None, 
ctes)?;
                 let right_plan = self.set_expr_to_plan(right.as_ref(), None, 
ctes)?;
                 match (op, all) {
-                (SetOperator::Union, true) => {
-                    union_with_alias(left_plan, right_plan, alias)
-                }
-                (SetOperator::Union, false) => {
-                    let union_plan = union_with_alias(left_plan, right_plan, 
alias)?;
-                    LogicalPlanBuilder::from(union_plan).distinct()?.build()
-                }
-                (SetOperator::Intersect, true) => {
-                    let join_keys = 
left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field,
 right_field)| ((Column::from_name(left_field.name())), 
(Column::from_name(right_field.name())))).unzip();
-                    
LogicalPlanBuilder::from(left_plan).join_detailed(&right_plan, JoinType::Semi, 
join_keys, true)?.build()
-                }
-                (SetOperator::Intersect, false) => {
-                    let join_keys = 
left_plan.schema().fields().iter().zip(right_plan.schema().fields().iter()).map(|(left_field,
 right_field)| ((Column::from_name(left_field.name())), 
(Column::from_name(right_field.name())))).unzip();
-                    
LogicalPlanBuilder::from(left_plan).distinct()?.join_detailed(&right_plan, 
JoinType::Semi, join_keys, true)?.build()
-                }
-                _ => Err(DataFusionError::NotImplemented(format!(
-                    "Only UNION ALL and UNION [DISTINCT] and INTERSECT and 
INTERSECT [DISTINCT] are supported, found {}",
-                    op
-                ))),
+                    (SetOperator::Union, true) => {
+                        union_with_alias(left_plan, right_plan, alias)
+                    }
+                    (SetOperator::Union, false) => {
+                        let union_plan = union_with_alias(left_plan, 
right_plan, alias)?;
+                        
LogicalPlanBuilder::from(union_plan).distinct()?.build()
+                    }
+                    (SetOperator::Intersect, true) => {
+                        LogicalPlanBuilder::intersect(left_plan, right_plan, 
true)
+                    }
+                    (SetOperator::Intersect, false) => {
+                        LogicalPlanBuilder::intersect(left_plan, right_plan, 
false)
+                    }
+                    (SetOperator::Except, true) => {
+                        LogicalPlanBuilder::except(left_plan, right_plan, true)
+                    }
+                    (SetOperator::Except, false) => {
+                        LogicalPlanBuilder::except(left_plan, right_plan, 
false)
+                    }
                 }
             }
             _ => Err(DataFusionError::NotImplemented(format!(
@@ -3581,16 +3581,6 @@ mod tests {
     }
 
     #[test]
-    fn except_not_supported() {
-        let sql = "SELECT order_id from orders EXCEPT SELECT order_id FROM 
orders";
-        let err = logical_plan(sql).expect_err("query should have failed");
-        assert_eq!(
-            "NotImplemented(\"Only UNION ALL and UNION [DISTINCT] and 
INTERSECT and INTERSECT [DISTINCT] are supported, found EXCEPT\")",
-            format!("{:?}", err)
-        );
-    }
-
-    #[test]
     fn select_typedstring() {
         let sql = "SELECT date '2020-12-10' AS date FROM person";
         let expected = "Projection: CAST(Utf8(\"2020-12-10\") AS Date32) AS 
date\
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index e6064af..016781c 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -5623,7 +5623,7 @@ async fn intersect_with_null_equal() {
     let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
             INTERSECT SELECT * FROM (SELECT null AS id1, 1 AS id2) t2";
 
-    let expected: Vec<Vec<String>> = vec![vec!["NULL".to_string(), 
"1".to_string()]];
+    let expected = vec![vec!["NULL", "1"]];
 
     let mut ctx = create_join_context_qualified().unwrap();
     let actual = execute(&mut ctx, sql).await;
@@ -5669,3 +5669,67 @@ async fn test_intersect_distinct() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn except_with_null_not_equal() {
+    let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
+            EXCEPT SELECT * FROM (SELECT null AS id1, 2 AS id2) t2";
+
+    let expected = vec![vec!["NULL", "1"]];
+
+    let mut ctx = create_join_context_qualified().unwrap();
+    let actual = execute(&mut ctx, sql).await;
+
+    assert_eq!(expected, actual);
+}
+
+#[tokio::test]
+async fn except_with_null_equal() {
+    let sql = "SELECT * FROM (SELECT null AS id1, 1 AS id2) t1
+            EXCEPT SELECT * FROM (SELECT null AS id1, 1 AS id2) t2";
+
+    let expected: &[&[&str]] = &[];
+    let mut ctx = create_join_context_qualified().unwrap();
+    let actual = execute(&mut ctx, sql).await;
+
+    assert_eq!(expected, actual);
+}
+
+#[tokio::test]
+async fn test_expect_all() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_parquet(&mut ctx).await;
+    // execute the query
+    let sql = "SELECT int_col, double_col FROM alltypes_plain where int_col > 
0 EXCEPT ALL SELECT int_col, double_col FROM alltypes_plain where int_col < 1";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------+------------+",
+        "| int_col | double_col |",
+        "+---------+------------+",
+        "| 1       | 10.1       |",
+        "| 1       | 10.1       |",
+        "| 1       | 10.1       |",
+        "| 1       | 10.1       |",
+        "+---------+------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_expect_distinct() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_alltypes_parquet(&mut ctx).await;
+    // execute the query
+    let sql = "SELECT int_col, double_col FROM alltypes_plain where int_col > 
0 EXCEPT SELECT int_col, double_col FROM alltypes_plain where int_col < 1";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+---------+------------+",
+        "| int_col | double_col |",
+        "+---------+------------+",
+        "| 1       | 10.1       |",
+        "+---------+------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}

Reply via email to