This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 44bbb0ec32 Simplifications (#7907)
44bbb0ec32 is described below
commit 44bbb0ec32e5957fdf0c9072d2518493c4909c6d
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Oct 24 08:44:03 2023 +0300
Simplifications (#7907)
---
datafusion/physical-plan/src/aggregates/mod.rs | 81 ++++++++++++--------------
datafusion/physical-plan/src/projection.rs | 20 +++----
2 files changed, 45 insertions(+), 56 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 7191d51fb7..1fa129680c 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -218,6 +218,23 @@ impl PhysicalGroupBy {
pub fn is_single(&self) -> bool {
self.null_expr.is_empty()
}
+
+ /// Calculate GROUP BY expressions according to input schema.
+ pub fn input_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ self.expr
+ .iter()
+ .map(|(expr, _alias)| expr.clone())
+ .collect()
+ }
+
+ /// Return grouping expressions as they occur in the output schema.
+ fn output_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ self.expr
+ .iter()
+ .enumerate()
+ .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
+ .collect()
+ }
}
impl PartialEq for PhysicalGroupBy {
@@ -319,11 +336,7 @@ fn get_working_mode(
// Since direction of the ordering is not important for GROUP BY columns,
// we convert PhysicalSortExpr to PhysicalExpr in the existing ordering.
let ordering_exprs = convert_to_expr(output_ordering);
- let groupby_exprs = group_by
- .expr
- .iter()
- .map(|(item, _)| item.clone())
- .collect::<Vec<_>>();
+ let groupby_exprs = group_by.input_exprs();
// Find where each expression of the GROUP BY clause occurs in the existing
// ordering (if it occurs):
let mut ordered_indices =
@@ -363,7 +376,7 @@ fn calc_aggregation_ordering(
) -> Option<AggregationOrdering> {
get_working_mode(input, group_by).map(|(mode, order_indices)| {
let existing_ordering = input.output_ordering().unwrap_or(&[]);
- let out_group_expr = output_group_expr_helper(group_by);
+ let out_group_expr = group_by.output_exprs();
// Calculate output ordering information for the operator:
let out_ordering = order_indices
.iter()
@@ -381,18 +394,6 @@ fn calc_aggregation_ordering(
})
}
-/// This function returns grouping expressions as they occur in the output
schema.
-fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn
PhysicalExpr>> {
- // Update column indices. Since the group by columns come first in the
output schema, their
- // indices are simply 0..self.group_expr(len).
- group_by
- .expr()
- .iter()
- .enumerate()
- .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
- .collect()
-}
-
/// This function returns the ordering requirement of the first non-reversible
/// order-sensitive aggregate function such as ARRAY_AGG. This requirement
serves
/// as the initial requirement while calculating the finest requirement among
all
@@ -591,11 +592,7 @@ fn group_by_contains_all_requirements(
group_by: &PhysicalGroupBy,
requirement: &LexOrdering,
) -> bool {
- let physical_exprs = group_by
- .expr()
- .iter()
- .map(|(expr, _alias)| expr.clone())
- .collect::<Vec<_>>();
+ let physical_exprs = group_by.input_exprs();
// When we have multiple groups (grouping set)
// since group by may be calculated on the subset of the group_by.expr()
// it is not guaranteed to have all of the requirements among group by
expressions.
@@ -735,7 +732,7 @@ impl AggregateExec {
/// Grouping expressions as they occur in the output schema
pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- output_group_expr_helper(&self.group_by)
+ self.group_by.output_exprs()
}
/// Aggregate expressions
@@ -894,28 +891,24 @@ impl ExecutionPlan for AggregateExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- match &self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
- // Partial and Single Aggregation will not change the output
partitioning but need to respect the Alias
- let input_partition = self.input.output_partitioning();
- match input_partition {
- Partitioning::Hash(exprs, part) => {
- let normalized_exprs = exprs
- .into_iter()
- .map(|expr| {
- normalize_out_expr_with_columns_map(
- expr,
- &self.columns_map,
- )
- })
- .collect::<Vec<_>>();
- Partitioning::Hash(normalized_exprs, part)
- }
- _ => input_partition,
- }
+ let input_partition = self.input.output_partitioning();
+ if self.mode.is_first_stage() {
+ // First stage Aggregation will not change the output partitioning
but need to respect the Alias
+ let input_partition = self.input.output_partitioning();
+ if let Partitioning::Hash(exprs, part) = input_partition {
+ let normalized_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ normalize_out_expr_with_columns_map(expr,
&self.columns_map)
+ })
+ .collect::<Vec<_>>();
+ Partitioning::Hash(normalized_exprs, part)
+ } else {
+ input_partition
}
+ } else {
// Final Aggregation's output partitioning is the same as its real
input
- _ => self.input.output_partitioning(),
+ input_partition
}
}
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index 029dd24d7d..a374154c99 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -224,18 +224,14 @@ impl ExecutionPlan for ProjectionExec {
fn output_partitioning(&self) -> Partitioning {
// Output partition need to respect the alias
let input_partition = self.input.output_partitioning();
- match input_partition {
- Partitioning::Hash(exprs, part) => {
- let normalized_exprs = exprs
- .into_iter()
- .map(|expr| {
- normalize_out_expr_with_columns_map(expr,
&self.columns_map)
- })
- .collect::<Vec<_>>();
-
- Partitioning::Hash(normalized_exprs, part)
- }
- _ => input_partition,
+ if let Partitioning::Hash(exprs, part) = input_partition {
+ let normalized_exprs = exprs
+ .into_iter()
+ .map(|expr| normalize_out_expr_with_columns_map(expr,
&self.columns_map))
+ .collect::<Vec<_>>();
+ Partitioning::Hash(normalized_exprs, part)
+ } else {
+ input_partition
}
}