vbarua commented on code in PR #12863:
URL: https://github.com/apache/datafusion/pull/12863#discussion_r1802075260


##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -875,27 +935,65 @@ pub async fn from_substrait_rel(
             Ok(set_op) => match set_op {
                 set_rel::SetOp::UnionAll => {
                     if !set.inputs.is_empty() {
-                        let mut union_builder = Ok(LogicalPlanBuilder::from(
-                            from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
-                        ));
-                        for input in &set.inputs[1..] {
-                            union_builder = union_builder?
-                                .union(from_substrait_rel(ctx, input, 
extensions).await?);
-                        }
-                        union_builder?.build()
+                        union_rels(&set.inputs, ctx, extensions, false).await
+                    } else {
+                        not_impl_err!("Union relation requires at least one 
input")
+                    }
+                }
+                set_rel::SetOp::UnionDistinct => {
+                    if !set.inputs.is_empty() {
+                        union_rels(&set.inputs, ctx, extensions, true).await
                     } else {
                         not_impl_err!("Union relation requires at least one 
input")
                     }
                 }
                 set_rel::SetOp::IntersectionPrimary => {
-                    if set.inputs.len() == 2 {
+                    if set.inputs.len() >= 2 {
                         LogicalPlanBuilder::intersect(
                             from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
-                            from_substrait_rel(ctx, &set.inputs[1], 
extensions).await?,
+                            union_rels(&set.inputs[1..], ctx, extensions, 
false).await?,
                             false,
                         )
                     } else {
-                        not_impl_err!("Primary Intersect relation with more 
than two inputs isn't supported")
+                        not_impl_err!(
+                            "Primary Intersect relation requires at least two 
inputs"
+                        )
+                    }
+                }
+                set_rel::SetOp::IntersectionMultiset => {
+                    if set.inputs.len() >= 2 {
+                        intersect_rels(&set.inputs, ctx, extensions, 
true).await
+                    } else {
+                        not_impl_err!(
+                            "Multiset Intersect relation requires at least two 
inputs"
+                        )
+                    }
+                }
+                set_rel::SetOp::IntersectionMultisetAll => {
+                    if set.inputs.len() >= 2 {
+                        intersect_rels(&set.inputs, ctx, extensions, 
false).await
+                    } else {
+                        not_impl_err!(
+                            "MultisetAll Intersect relation requires at least 
two inputs"
+                        )
+                    }
+                }

Review Comment:
   This made me go check how DataFusion actually handles executing an INTERSECT 
ALL query. The current DataFusion behaviour doesn't conform to the Substrait 
spec, which itself is based on the SQL spec.
   
   Assuming https://github.com/apache/datafusion/issues/12955 is a valid bug, 
this mapping is correct.



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -764,6 +768,7 @@ pub fn operator_to_name(op: Operator) -> &'static str {
     }
 }
 
+#[allow(deprecated)]

Review Comment:
   > I think so.. grouping_expressions field was deprecated in the proto 
definitions, but should still be used for some time not to break 
backwards-compatibility.
   
   +1 for retaining both for backwards compatability
   
   I've created https://github.com/apache/datafusion/issues/12957 to track that 
we deferred implementing this.



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -875,27 +935,65 @@ pub async fn from_substrait_rel(
             Ok(set_op) => match set_op {
                 set_rel::SetOp::UnionAll => {
                     if !set.inputs.is_empty() {
-                        let mut union_builder = Ok(LogicalPlanBuilder::from(
-                            from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
-                        ));
-                        for input in &set.inputs[1..] {
-                            union_builder = union_builder?
-                                .union(from_substrait_rel(ctx, input, 
extensions).await?);
-                        }
-                        union_builder?.build()
+                        union_rels(&set.inputs, ctx, extensions, false).await
+                    } else {
+                        not_impl_err!("Union relation requires at least one 
input")
+                    }
+                }
+                set_rel::SetOp::UnionDistinct => {
+                    if !set.inputs.is_empty() {
+                        union_rels(&set.inputs, ctx, extensions, true).await
                     } else {
                         not_impl_err!("Union relation requires at least one 
input")
                     }
                 }
                 set_rel::SetOp::IntersectionPrimary => {
-                    if set.inputs.len() == 2 {
+                    if set.inputs.len() >= 2 {
                         LogicalPlanBuilder::intersect(
                             from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
-                            from_substrait_rel(ctx, &set.inputs[1], 
extensions).await?,
+                            union_rels(&set.inputs[1..], ctx, extensions, 
false).await?,
                             false,

Review Comment:
   As an example of the above comment about `is_all` vs `distinct`, the `false` 
in `union_rels` is `distinct: false` and the false in `intersect` is `is_all: 
false`.



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -197,6 +197,65 @@ fn split_eq_and_noneq_join_predicate_with_nulls_equality(
     (accum_join_keys, nulls_equal_nulls, join_filter)
 }
 
+async fn union_rels(
+    rels: &[Rel],
+    ctx: &SessionContext,
+    extensions: &Extensions,
+    distinct: bool,

Review Comment:
   minor: For consistency with the logical builder methods, it would be nice to 
use `is_all: bool` in these methods instead of `distinct: bool`. I had a little 
bit of difficulty keeping track of which bool was what in which function during 
this review.
   



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -875,27 +935,65 @@ pub async fn from_substrait_rel(
             Ok(set_op) => match set_op {
                 set_rel::SetOp::UnionAll => {
                     if !set.inputs.is_empty() {
-                        let mut union_builder = Ok(LogicalPlanBuilder::from(
-                            from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
-                        ));
-                        for input in &set.inputs[1..] {
-                            union_builder = union_builder?
-                                .union(from_substrait_rel(ctx, input, 
extensions).await?);
-                        }
-                        union_builder?.build()
+                        union_rels(&set.inputs, ctx, extensions, false).await
+                    } else {
+                        not_impl_err!("Union relation requires at least one 
input")
+                    }
+                }
+                set_rel::SetOp::UnionDistinct => {
+                    if !set.inputs.is_empty() {
+                        union_rels(&set.inputs, ctx, extensions, true).await
                     } else {
                         not_impl_err!("Union relation requires at least one 
input")
                     }
                 }
                 set_rel::SetOp::IntersectionPrimary => {
-                    if set.inputs.len() == 2 {
+                    if set.inputs.len() >= 2 {
                         LogicalPlanBuilder::intersect(
                             from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
-                            from_substrait_rel(ctx, &set.inputs[1], 
extensions).await?,
+                            union_rels(&set.inputs[1..], ctx, extensions, 
false).await?,
                             false,
                         )
                     } else {
-                        not_impl_err!("Primary Intersect relation with more 
than two inputs isn't supported")
+                        not_impl_err!(
+                            "Primary Intersect relation requires at least two 
inputs"
+                        )
+                    }
+                }
+                set_rel::SetOp::IntersectionMultiset => {
+                    if set.inputs.len() >= 2 {
+                        intersect_rels(&set.inputs, ctx, extensions, 
true).await
+                    } else {
+                        not_impl_err!(
+                            "Multiset Intersect relation requires at least two 
inputs"
+                        )
+                    }
+                }
+                set_rel::SetOp::IntersectionMultisetAll => {
+                    if set.inputs.len() >= 2 {
+                        intersect_rels(&set.inputs, ctx, extensions, 
false).await
+                    } else {
+                        not_impl_err!(
+                            "MultisetAll Intersect relation requires at least 
two inputs"
+                        )
+                    }
+                }
+                set_rel::SetOp::MinusPrimary => {
+                    if set.inputs.len() >= 2 {
+                        except_rels(&set.inputs, ctx, extensions, true).await
+                    } else {
+                        not_impl_err!(
+                            "Primary Minus relation requires at least two 
inputs"
+                        )
+                    }
+                }
+                set_rel::SetOp::MinusPrimaryAll => {
+                    if set.inputs.len() >= 2 {
+                        except_rels(&set.inputs, ctx, extensions, false).await
+                    } else {
+                        not_impl_err!(
+                            "PrimaryAll Minus relation requires at least two 
inputs"
+                        )

Review Comment:
   This made me go check how DataFusion actually handles executing an EXCEPT 
ALL query. The current DataFusion behaviour doesn't conform to the Substrait 
spec, which itself is based on the SQL spec.
   
   Assuming https://github.com/apache/datafusion/issues/12956 is a valid bug, 
this mapping is correct.



##########
datafusion/substrait/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -675,6 +675,60 @@ async fn simple_intersect_consume() -> Result<()> {
     .await
 }
 
+#[tokio::test]
+async fn primary_intersect_consume() -> Result<()> {
+    let proto_plan =
+        
read_json("tests/testdata/test_plans/intersect_primary.substrait.json");
+
+    assert_substrait_sql(
+        proto_plan,
+        "SELECT a FROM data INTERSECT (SELECT a FROM data2 UNION ALL SELECT a 
FROM data2)",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn multiset_intersect_consume() -> Result<()> {
+    let proto_plan =
+        
read_json("tests/testdata/test_plans/intersect_multiset.substrait.json");
+
+    assert_substrait_sql(
+        proto_plan,
+        "SELECT a FROM data INTERSECT SELECT a FROM data2 INTERSECT SELECT a 
FROM data2",
+    )
+    .await
+}

Review Comment:
   You added a file `intersect_multiset_all.substrait.json` but you didn't use 
it in any of the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to