alamb commented on a change in pull request #8034:
URL: https://github.com/apache/arrow/pull/8034#discussion_r476449126



##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -61,6 +61,55 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+        self.optimize_plan(plan, ctx_state)
+    }
+}
+
+impl DefaultPhysicalPlanner {
+    /// Create a physical plan from a logical plan
+    fn optimize_plan(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let children = plan
+            .children()
+            .iter()
+            .map(|child| self.optimize_plan(child.clone(), ctx_state))
+            .collect::<Result<Vec<_>>>()?;
+
+        if children.len() == 0 {
+            // leaf node, children cannot be replaced
+            Ok(plan.clone())
+        } else {
+            match plan.required_child_distribution() {
+                Distribution::UnspecifiedDistribution => 
plan.with_new_children(children),
+                Distribution::SinglePartition => plan.with_new_children(

Review comment:
       This is clever -- I like it. 👍 

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -111,6 +123,17 @@ impl ExecutionPlan for HashAggregateExec {
         self.schema.clone()
     }
 
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        match &self.mode {
+            AggregateMode::Partial => Distribution::UnspecifiedDistribution,

Review comment:
       I would think the output distribution is likely the same as the input 
distribution if the input distribution is on the grouping keys. THe only 
possibly change here is if the mode is `Partial` and the input distribution is 
`SinglePartition` so is the output

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -121,21 +144,40 @@ impl ExecutionPlan for HashAggregateExec {
         partition: usize,
     ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         let input = self.input.execute(partition)?;
+        let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
+        let aggr_expr = self.aggr_expr.iter().map(|x| x.0.clone()).collect();
         if self.group_expr.is_empty() {
             Ok(Arc::new(Mutex::new(HashAggregateIterator::new(
                 self.schema.clone(),
-                self.aggr_expr.clone(),
+                aggr_expr,
                 input,
             ))))
         } else {
             Ok(Arc::new(Mutex::new(GroupedHashAggregateIterator::new(
                 self.schema.clone(),
-                self.group_expr.clone(),
-                self.aggr_expr.clone(),
+                group_expr,
+                aggr_expr,
                 input,
             ))))
         }
     }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {

Review comment:
       I like this pattern of matching child length

##########
File path: rust/datafusion/src/execution/physical_plan/projection.rs
##########
@@ -32,7 +32,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
 #[derive(Debug)]
 pub struct ProjectionExec {
     /// The projection expressions

Review comment:
       ```suggestion
       /// The projection expressions stored as tuples of (expression, output 
column name)
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -153,33 +202,28 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
 
                 let initial_aggr = HashAggregateExec::try_new(
+                    AggregateMode::Partial,

Review comment:
       An improvement for a subsequent PR would be to only skip the partial 
aggregate if the input distribution is ` Distribution::SinglePartition`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to