This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 44bbb0ec32 Simplifications (#7907)
44bbb0ec32 is described below

commit 44bbb0ec32e5957fdf0c9072d2518493c4909c6d
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Oct 24 08:44:03 2023 +0300

    Simplifications (#7907)
---
 datafusion/physical-plan/src/aggregates/mod.rs | 81 ++++++++++++--------------
 datafusion/physical-plan/src/projection.rs     | 20 +++----
 2 files changed, 45 insertions(+), 56 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 7191d51fb7..1fa129680c 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -218,6 +218,23 @@ impl PhysicalGroupBy {
     pub fn is_single(&self) -> bool {
         self.null_expr.is_empty()
     }
+
+    /// Calculate GROUP BY expressions according to input schema.
+    pub fn input_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        self.expr
+            .iter()
+            .map(|(expr, _alias)| expr.clone())
+            .collect()
+    }
+
+    /// Return grouping expressions as they occur in the output schema.
+    fn output_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        self.expr
+            .iter()
+            .enumerate()
+            .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
+            .collect()
+    }
 }
 
 impl PartialEq for PhysicalGroupBy {
@@ -319,11 +336,7 @@ fn get_working_mode(
     // Since direction of the ordering is not important for GROUP BY columns,
     // we convert PhysicalSortExpr to PhysicalExpr in the existing ordering.
     let ordering_exprs = convert_to_expr(output_ordering);
-    let groupby_exprs = group_by
-        .expr
-        .iter()
-        .map(|(item, _)| item.clone())
-        .collect::<Vec<_>>();
+    let groupby_exprs = group_by.input_exprs();
     // Find where each expression of the GROUP BY clause occurs in the existing
     // ordering (if it occurs):
     let mut ordered_indices =
@@ -363,7 +376,7 @@ fn calc_aggregation_ordering(
 ) -> Option<AggregationOrdering> {
     get_working_mode(input, group_by).map(|(mode, order_indices)| {
         let existing_ordering = input.output_ordering().unwrap_or(&[]);
-        let out_group_expr = output_group_expr_helper(group_by);
+        let out_group_expr = group_by.output_exprs();
         // Calculate output ordering information for the operator:
         let out_ordering = order_indices
             .iter()
@@ -381,18 +394,6 @@ fn calc_aggregation_ordering(
     })
 }
 
-/// This function returns grouping expressions as they occur in the output 
schema.
-fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn 
PhysicalExpr>> {
-    // Update column indices. Since the group by columns come first in the 
output schema, their
-    // indices are simply 0..self.group_expr(len).
-    group_by
-        .expr()
-        .iter()
-        .enumerate()
-        .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
-        .collect()
-}
-
 /// This function returns the ordering requirement of the first non-reversible
 /// order-sensitive aggregate function such as ARRAY_AGG. This requirement 
serves
 /// as the initial requirement while calculating the finest requirement among 
all
@@ -591,11 +592,7 @@ fn group_by_contains_all_requirements(
     group_by: &PhysicalGroupBy,
     requirement: &LexOrdering,
 ) -> bool {
-    let physical_exprs = group_by
-        .expr()
-        .iter()
-        .map(|(expr, _alias)| expr.clone())
-        .collect::<Vec<_>>();
+    let physical_exprs = group_by.input_exprs();
     // When we have multiple groups (grouping set)
     // since group by may be calculated on the subset of the group_by.expr()
     // it is not guaranteed to have all of the requirements among group by 
expressions.
@@ -735,7 +732,7 @@ impl AggregateExec {
 
     /// Grouping expressions as they occur in the output schema
     pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        output_group_expr_helper(&self.group_by)
+        self.group_by.output_exprs()
     }
 
     /// Aggregate expressions
@@ -894,28 +891,24 @@ impl ExecutionPlan for AggregateExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        match &self.mode {
-            AggregateMode::Partial | AggregateMode::Single => {
-                // Partial and Single Aggregation will not change the output 
partitioning but need to respect the Alias
-                let input_partition = self.input.output_partitioning();
-                match input_partition {
-                    Partitioning::Hash(exprs, part) => {
-                        let normalized_exprs = exprs
-                            .into_iter()
-                            .map(|expr| {
-                                normalize_out_expr_with_columns_map(
-                                    expr,
-                                    &self.columns_map,
-                                )
-                            })
-                            .collect::<Vec<_>>();
-                        Partitioning::Hash(normalized_exprs, part)
-                    }
-                    _ => input_partition,
-                }
+        let input_partition = self.input.output_partitioning();
+        if self.mode.is_first_stage() {
+            // First stage Aggregation will not change the output partitioning 
but need to respect the Alias
+            let input_partition = self.input.output_partitioning();
+            if let Partitioning::Hash(exprs, part) = input_partition {
+                let normalized_exprs = exprs
+                    .into_iter()
+                    .map(|expr| {
+                        normalize_out_expr_with_columns_map(expr, 
&self.columns_map)
+                    })
+                    .collect::<Vec<_>>();
+                Partitioning::Hash(normalized_exprs, part)
+            } else {
+                input_partition
             }
+        } else {
             // Final Aggregation's output partitioning is the same as its real 
input
-            _ => self.input.output_partitioning(),
+            input_partition
         }
     }
 
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index 029dd24d7d..a374154c99 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -224,18 +224,14 @@ impl ExecutionPlan for ProjectionExec {
     fn output_partitioning(&self) -> Partitioning {
         // Output partition need to respect the alias
         let input_partition = self.input.output_partitioning();
-        match input_partition {
-            Partitioning::Hash(exprs, part) => {
-                let normalized_exprs = exprs
-                    .into_iter()
-                    .map(|expr| {
-                        normalize_out_expr_with_columns_map(expr, 
&self.columns_map)
-                    })
-                    .collect::<Vec<_>>();
-
-                Partitioning::Hash(normalized_exprs, part)
-            }
-            _ => input_partition,
+        if let Partitioning::Hash(exprs, part) = input_partition {
+            let normalized_exprs = exprs
+                .into_iter()
+                .map(|expr| normalize_out_expr_with_columns_map(expr, 
&self.columns_map))
+                .collect::<Vec<_>>();
+            Partitioning::Hash(normalized_exprs, part)
+        } else {
+            input_partition
         }
     }
 

Reply via email to