This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 27a3c81493 Substrait union/union all (#7117)
27a3c81493 is described below

commit 27a3c81493210435f23b64e8fc65c988bf665579
Author: Nuttiiya Seekhao <[email protected]>
AuthorDate: Mon Jul 31 16:52:08 2023 -0400

    Substrait union/union all (#7117)
    
    * Add union
    
    * Clippy fix
    
    * Removed assert()
---
 datafusion/substrait/src/logical_plan/consumer.rs  | 27 ++++++++++++++++++++++
 datafusion/substrait/src/logical_plan/producer.rs  | 21 ++++++++++++++++-
 .../tests/cases/roundtrip_logical_plan.rs          | 18 +++++++++++++++
 3 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/datafusion/substrait/src/logical_plan/consumer.rs 
b/datafusion/substrait/src/logical_plan/consumer.rs
index 66cc37c60b..2c37f7ede1 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -47,6 +47,7 @@ use substrait::proto::{
     join_rel, plan_rel, r#type,
     read_rel::ReadType,
     rel::RelType,
+    set_rel,
     sort_field::{SortDirection, SortKind::*},
     AggregateFunction, Expression, Plan, Rel, Type,
 };
@@ -472,6 +473,32 @@ pub async fn from_substrait_rel(
                 "Only NamedTable reads are supported".to_string(),
             )),
         },
+        Some(RelType::Set(set)) => match set_rel::SetOp::from_i32(set.op) {
+            Some(set_op) => match set_op {
+                set_rel::SetOp::UnionAll => {
+                    if !set.inputs.is_empty() {
+                        let mut union_builder = Ok(LogicalPlanBuilder::from(
+                            from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
+                        ));
+                        for input in &set.inputs[1..] {
+                            union_builder = union_builder?
+                                .union(from_substrait_rel(ctx, input, 
extensions).await?);
+                        }
+                        union_builder?.build()
+                    } else {
+                        Err(DataFusionError::NotImplemented(
+                            "Union relation requires at least one 
input".to_string(),
+                        ))
+                    }
+                }
+                _ => Err(DataFusionError::NotImplemented(format!(
+                    "Unsupported set operator: {set_op:?}"
+                ))),
+            },
+            None => Err(DataFusionError::NotImplemented(
+                "Invalid set operation type None".to_string(),
+            )),
+        },
         Some(RelType::ExtensionLeaf(extension)) => {
             let Some(ext_detail) = &extension.detail else {
                 return Err(DataFusionError::Substrait(
diff --git a/datafusion/substrait/src/logical_plan/producer.rs 
b/datafusion/substrait/src/logical_plan/producer.rs
index e044f00504..07da2dc0be 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -62,10 +62,11 @@ use substrait::{
         join_rel, plan_rel, r#type,
         read_rel::{NamedTable, ReadType},
         rel::RelType,
+        set_rel,
         sort_field::{SortDirection, SortKind},
         AggregateFunction, AggregateRel, AggregationPhase, Expression, 
ExtensionLeafRel,
         ExtensionMultiRel, ExtensionSingleRel, FetchRel, FilterRel, 
FunctionArgument,
-        JoinRel, NamedStruct, Plan, PlanRel, ProjectRel, ReadRel, Rel, RelRoot,
+        JoinRel, NamedStruct, Plan, PlanRel, ProjectRel, ReadRel, Rel, 
RelRoot, SetRel,
         SortField, SortRel,
     },
     version,
@@ -321,6 +322,24 @@ pub fn to_substrait_rel(
             // since there is no corresponding relation type in Substrait
             to_substrait_rel(alias.input.as_ref(), ctx, extension_info)
         }
+        LogicalPlan::Union(union) => {
+            let input_rels = union
+                .inputs
+                .iter()
+                .map(|input| to_substrait_rel(input.as_ref(), ctx, 
extension_info))
+                .collect::<Result<Vec<_>>>()?
+                .into_iter()
+                .map(|ptr| *ptr)
+                .collect();
+            Ok(Box::new(Rel {
+                rel_type: Some(substrait::proto::rel::RelType::Set(SetRel {
+                    common: None,
+                    inputs: input_rels,
+                    op: set_rel::SetOp::UnionAll as i32, // UNION DISTINCT 
gets translated to AGGREGATION + UNION ALL
+                    advanced_extension: None,
+                })),
+            }))
+        }
         LogicalPlan::Window(window) => {
             let input = to_substrait_rel(window.input.as_ref(), ctx, 
extension_info)?;
             // If the input is a Project relation, we can just append the 
WindowFunction expressions
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index b4a3b2cf32..f297264c3d 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -420,6 +420,24 @@ async fn roundtrip_ilike() -> Result<()> {
     roundtrip("SELECT f FROM data WHERE f ILIKE 'a%b'").await
 }
 
+#[tokio::test]
+async fn roundtrip_union() -> Result<()> {
+    roundtrip("SELECT a, e FROM data UNION SELECT a, e FROM data").await
+}
+
+#[tokio::test]
+async fn roundtrip_union2() -> Result<()> {
+    roundtrip(
+        "SELECT a, b FROM data UNION SELECT a, b FROM data UNION SELECT a, b 
FROM data",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn roundtrip_union_all() -> Result<()> {
+    roundtrip("SELECT a, e FROM data UNION ALL SELECT a, e FROM data").await
+}
+
 #[tokio::test]
 async fn simple_intersect() -> Result<()> {
     assert_expected_plan(

Reply via email to