alamb commented on a change in pull request #8034: URL: https://github.com/apache/arrow/pull/8034#discussion_r476449126
########## File path: rust/datafusion/src/execution/physical_plan/planner.rs ########## @@ -61,6 +61,55 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, + ) -> Result<Arc<dyn ExecutionPlan>> { + let plan = self.create_initial_plan(logical_plan, ctx_state)?; + self.optimize_plan(plan, ctx_state) + } +} + +impl DefaultPhysicalPlanner { + /// Create a physical plan from a logical plan + fn optimize_plan( + &self, + plan: Arc<dyn ExecutionPlan>, + ctx_state: &ExecutionContextState, + ) -> Result<Arc<dyn ExecutionPlan>> { + let children = plan + .children() + .iter() + .map(|child| self.optimize_plan(child.clone(), ctx_state)) + .collect::<Result<Vec<_>>>()?; + + if children.len() == 0 { + // leaf node, children cannot be replaced + Ok(plan.clone()) + } else { + match plan.required_child_distribution() { + Distribution::UnspecifiedDistribution => plan.with_new_children(children), + Distribution::SinglePartition => plan.with_new_children( Review comment: This is clever -- I like it. 👍 ########## File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs ########## @@ -111,6 +123,17 @@ impl ExecutionPlan for HashAggregateExec { self.schema.clone() } + fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { + vec![self.input.clone()] + } + + fn required_child_distribution(&self) -> Distribution { + match &self.mode { + AggregateMode::Partial => Distribution::UnspecifiedDistribution, Review comment: I would think the output distribution is likely the same as the input distribution if the input distribution is on the grouping keys. THe only possibly change here is if the mode is `Partial` and the input distribution is `SinglePartition` so is the output ########## File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs ########## @@ -121,21 +144,40 @@ impl ExecutionPlan for HashAggregateExec { partition: usize, ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> { let input = self.input.execute(partition)?; + let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect(); + let aggr_expr = self.aggr_expr.iter().map(|x| x.0.clone()).collect(); if self.group_expr.is_empty() { Ok(Arc::new(Mutex::new(HashAggregateIterator::new( self.schema.clone(), - self.aggr_expr.clone(), + aggr_expr, input, )))) } else { Ok(Arc::new(Mutex::new(GroupedHashAggregateIterator::new( self.schema.clone(), - self.group_expr.clone(), - self.aggr_expr.clone(), + group_expr, + aggr_expr, input, )))) } } + + fn with_new_children( + &self, + children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + match children.len() { Review comment: I like this pattern of matching child length ########## File path: rust/datafusion/src/execution/physical_plan/projection.rs ########## @@ -32,7 +32,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader}; #[derive(Debug)] pub struct ProjectionExec { /// The projection expressions Review comment: ```suggestion /// The projection expressions stored as tuples of (expression, output column name) ``` ########## File path: rust/datafusion/src/execution/physical_plan/planner.rs ########## @@ -153,33 +202,28 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { .collect::<Result<Vec<_>>>()?; let initial_aggr = HashAggregateExec::try_new( + AggregateMode::Partial, Review comment: An improvement for a subsequent PR would be to only skip the partial aggregate if the input distribution is ` Distribution::SinglePartition` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org