This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 27a3c81493 Substrait union/union all (#7117)
27a3c81493 is described below
commit 27a3c81493210435f23b64e8fc65c988bf665579
Author: Nuttiiya Seekhao <[email protected]>
AuthorDate: Mon Jul 31 16:52:08 2023 -0400
Substrait union/union all (#7117)
* Add union
* Clippy fix
* Removed assert()
---
datafusion/substrait/src/logical_plan/consumer.rs | 27 ++++++++++++++++++++++
datafusion/substrait/src/logical_plan/producer.rs | 21 ++++++++++++++++-
.../tests/cases/roundtrip_logical_plan.rs | 18 +++++++++++++++
3 files changed, 65 insertions(+), 1 deletion(-)
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index 66cc37c60b..2c37f7ede1 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -47,6 +47,7 @@ use substrait::proto::{
join_rel, plan_rel, r#type,
read_rel::ReadType,
rel::RelType,
+ set_rel,
sort_field::{SortDirection, SortKind::*},
AggregateFunction, Expression, Plan, Rel, Type,
};
@@ -472,6 +473,32 @@ pub async fn from_substrait_rel(
"Only NamedTable reads are supported".to_string(),
)),
},
+ Some(RelType::Set(set)) => match set_rel::SetOp::from_i32(set.op) {
+ Some(set_op) => match set_op {
+ set_rel::SetOp::UnionAll => {
+ if !set.inputs.is_empty() {
+ let mut union_builder = Ok(LogicalPlanBuilder::from(
+ from_substrait_rel(ctx, &set.inputs[0],
extensions).await?,
+ ));
+ for input in &set.inputs[1..] {
+ union_builder = union_builder?
+ .union(from_substrait_rel(ctx, input,
extensions).await?);
+ }
+ union_builder?.build()
+ } else {
+ Err(DataFusionError::NotImplemented(
+ "Union relation requires at least one
input".to_string(),
+ ))
+ }
+ }
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported set operator: {set_op:?}"
+ ))),
+ },
+ None => Err(DataFusionError::NotImplemented(
+ "Invalid set operation type None".to_string(),
+ )),
+ },
Some(RelType::ExtensionLeaf(extension)) => {
let Some(ext_detail) = &extension.detail else {
return Err(DataFusionError::Substrait(
diff --git a/datafusion/substrait/src/logical_plan/producer.rs
b/datafusion/substrait/src/logical_plan/producer.rs
index e044f00504..07da2dc0be 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -62,10 +62,11 @@ use substrait::{
join_rel, plan_rel, r#type,
read_rel::{NamedTable, ReadType},
rel::RelType,
+ set_rel,
sort_field::{SortDirection, SortKind},
AggregateFunction, AggregateRel, AggregationPhase, Expression,
ExtensionLeafRel,
ExtensionMultiRel, ExtensionSingleRel, FetchRel, FilterRel,
FunctionArgument,
- JoinRel, NamedStruct, Plan, PlanRel, ProjectRel, ReadRel, Rel, RelRoot,
+ JoinRel, NamedStruct, Plan, PlanRel, ProjectRel, ReadRel, Rel,
RelRoot, SetRel,
SortField, SortRel,
},
version,
@@ -321,6 +322,24 @@ pub fn to_substrait_rel(
// since there is no corresponding relation type in Substrait
to_substrait_rel(alias.input.as_ref(), ctx, extension_info)
}
+ LogicalPlan::Union(union) => {
+ let input_rels = union
+ .inputs
+ .iter()
+ .map(|input| to_substrait_rel(input.as_ref(), ctx,
extension_info))
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .map(|ptr| *ptr)
+ .collect();
+ Ok(Box::new(Rel {
+ rel_type: Some(substrait::proto::rel::RelType::Set(SetRel {
+ common: None,
+ inputs: input_rels,
+ op: set_rel::SetOp::UnionAll as i32, // UNION DISTINCT
gets translated to AGGREGATION + UNION ALL
+ advanced_extension: None,
+ })),
+ }))
+ }
LogicalPlan::Window(window) => {
let input = to_substrait_rel(window.input.as_ref(), ctx,
extension_info)?;
// If the input is a Project relation, we can just append the
WindowFunction expressions
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index b4a3b2cf32..f297264c3d 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -420,6 +420,24 @@ async fn roundtrip_ilike() -> Result<()> {
roundtrip("SELECT f FROM data WHERE f ILIKE 'a%b'").await
}
+#[tokio::test]
+async fn roundtrip_union() -> Result<()> {
+ roundtrip("SELECT a, e FROM data UNION SELECT a, e FROM data").await
+}
+
+#[tokio::test]
+async fn roundtrip_union2() -> Result<()> {
+ roundtrip(
+ "SELECT a, b FROM data UNION SELECT a, b FROM data UNION SELECT a, b
FROM data",
+ )
+ .await
+}
+
+#[tokio::test]
+async fn roundtrip_union_all() -> Result<()> {
+ roundtrip("SELECT a, e FROM data UNION ALL SELECT a, e FROM data").await
+}
+
#[tokio::test]
async fn simple_intersect() -> Result<()> {
assert_expected_plan(