alamb commented on a change in pull request #9695: URL: https://github.com/apache/arrow/pull/9695#discussion_r595533737
########## File path: rust/datafusion/src/logical_plan/plan.rs ########## @@ -120,6 +120,15 @@ pub enum LogicalPlan { /// The partitioning scheme partitioning_scheme: Partitioning, }, + /// Union multiple inputs + Union { + /// Inputs to merge + inputs: Vec<LogicalPlan>, + /// Union schema. Should be the same for all inputs. + schema: DFSchemaRef, + /// Union select alias Review comment: ```suggestion /// Union output relation alias ``` ########## File path: rust/datafusion/src/physical_plan/union.rs ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Some of these functions reference the Postgres documentation +// or implementation to ensure compatibility and are subject to +// the Postgres license. + +//! The Union operator combines multiple inputs with the same schema + +use std::{any::Any, sync::Arc}; + +use arrow::datatypes::SchemaRef; + +use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use crate::error::Result; +use async_trait::async_trait; + +/// UNION ALL execution plan +#[derive(Debug)] +pub struct UnionExec { + /// Input execution plan + inputs: Vec<Arc<dyn ExecutionPlan>>, +} + +impl UnionExec { + /// Create a new UnionExec + pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self { + UnionExec { inputs } + } +} + +#[async_trait] +impl ExecutionPlan for UnionExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inputs[0].schema() + } + + fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { + self.inputs.clone() + } + + /// Get the output partitioning of this plan Review comment: ```suggestion /// Output of the union is the union of all output partitions of all inputs ``` (this makes sense BTW) ########## File path: rust/datafusion/src/sql/planner.rs ########## @@ -104,19 +104,75 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Generate a logic plan from an SQL query pub fn query_to_plan(&self, query: &Query) -> Result<LogicalPlan> { - let plan = match &query.body { - SetExpr::Select(s) => self.select_to_plan(s.as_ref()), - _ => Err(DataFusionError::NotImplemented(format!( - "Query {} not implemented yet", - query.body - ))), - }?; + self.query_to_plan_with_alias(query, None) + } + + /// Generate a logic plan from an SQL query with optional alias + pub fn query_to_plan_with_alias( + &self, + query: &Query, + alias: Option<String>, + ) -> Result<LogicalPlan> { + let set_expr = &query.body; + let plan = self.set_expr_to_plan(set_expr, alias)?; let plan = self.order_by(&plan, &query.order_by)?; self.limit(&plan, &query.limit) } + fn set_expr_to_plan( + &self, + set_expr: &SetExpr, + alias: Option<String>, + ) -> Result<LogicalPlan> { + match set_expr { + SetExpr::Select(s) => self.select_to_plan(s.as_ref()), + SetExpr::SetOperation { + op, + left, + right, + all, + } => match (op, all) { + (SetOperator::Union, true) => { + let left_plan = self.set_expr_to_plan(left.as_ref(), None)?; + let right_plan = self.set_expr_to_plan(right.as_ref(), None)?; + let inputs = vec![left_plan, right_plan] + .into_iter() + .flat_map(|p| match p { + LogicalPlan::Union { inputs, .. } => inputs, + x => vec![x], + }) + .collect::<Vec<_>>(); + if inputs.is_empty() { + return Err(DataFusionError::Execution(format!( + "Empty UNION: {}", + set_expr + ))); + } + if !inputs.iter().all(|s| s.schema() == inputs[0].schema()) { + return Err(DataFusionError::Execution( + "UNION ALL schemas are expected to be the same".to_string(), + )); + } + Ok(LogicalPlan::Union { + schema: inputs[0].schema().clone(), + inputs, + alias, + }) + } + _ => Err(DataFusionError::NotImplemented(format!( + "Only UNION ALL is supported: {}", Review comment: ```suggestion "Only UNION ALL is supported, found {}", ``` ########## File path: rust/datafusion/src/optimizer/limit_push_down.rs ########## @@ -181,6 +207,28 @@ mod test { Ok(()) } + #[test] + fn limit_should_push_down_union() -> Result<()> { Review comment: 👍 ########## File path: rust/datafusion/src/sql/planner.rs ########## @@ -2359,6 +2420,35 @@ mod tests { quick_test(sql, expected); } + #[test] + fn union() { + let sql = "SELECT order_id from orders UNION ALL SELECT order_id FROM orders"; + let expected = "Union\ + \n Projection: #order_id\ + \n TableScan: orders projection=None\ + \n Projection: #order_id\ + \n TableScan: orders projection=None"; + quick_test(sql, expected); + } + + #[test] + fn union_4_combined_in_one() { + let sql = "SELECT order_id from orders + UNION ALL SELECT order_id FROM orders + UNION ALL SELECT order_id FROM orders + UNION ALL SELECT order_id FROM orders"; + let expected = "Union\ + \n Projection: #order_id\ + \n TableScan: orders projection=None\ + \n Projection: #order_id\ + \n TableScan: orders projection=None\ + \n Projection: #order_id\ + \n TableScan: orders projection=None\ + \n Projection: #order_id\ + \n TableScan: orders projection=None"; + quick_test(sql, expected); + } + Review comment: I recommend two other (error) tests to ensure the error checks are working ``` "SELECT order_id from orders UNION ALL SELECT customer_id FROM order" ``` ``` "SELECT order_id from orders UNION SELECT order_id FROM order" ``` ########## File path: rust/datafusion/src/physical_plan/union.rs ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Some of these functions reference the Postgres documentation +// or implementation to ensure compatibility and are subject to +// the Postgres license. + +//! The Union operator combines multiple inputs with the same schema + +use std::{any::Any, sync::Arc}; + +use arrow::datatypes::SchemaRef; + +use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use crate::error::Result; +use async_trait::async_trait; + +/// UNION ALL execution plan +#[derive(Debug)] +pub struct UnionExec { + /// Input execution plan + inputs: Vec<Arc<dyn ExecutionPlan>>, +} + +impl UnionExec { + /// Create a new UnionExec + pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self { + UnionExec { inputs } + } +} + +#[async_trait] +impl ExecutionPlan for UnionExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inputs[0].schema() + } + + fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { + self.inputs.clone() + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + // Sums all the output partitions + let num_partitions = self + .inputs + .iter() + .map(|plan| plan.output_partitioning().partition_count()) + .sum(); + // TODO: this loses partitioning info in case of same partitioning scheme Review comment: it might be worth a follow on JIRA ticket, or also we can deal with it if it ever becomes a problem ########## File path: rust/datafusion/src/dataframe.rs ########## @@ -132,6 +132,20 @@ pub trait DataFrame: Send + Sync { /// ``` fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>>; + /// Calculate the union two dataframes. Review comment: ```suggestion /// Calculate the union two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org