This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cde74016e9 [Minor]: Produce better plan when group by contains all of 
the ordering requirements (#7542)
cde74016e9 is described below

commit cde74016e930ffd9c55eed403b84bcd026f38d0f
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Sep 13 23:50:21 2023 +0300

    [Minor]: Produce better plan when group by contains all of the ordering 
requirements (#7542)
    
    * Convert partial + final to single aggregate in two tests
    
    * Address reviews
    
    * Use any instead of loop
---
 .../core/src/physical_plan/aggregates/mod.rs       | 37 +++++++++++++++++++---
 datafusion/physical-expr/src/lib.rs                |  2 +-
 datafusion/physical-expr/src/physical_expr.rs      | 11 +++++++
 datafusion/sqllogictest/test_files/groupby.slt     | 26 +++++++--------
 4 files changed, 56 insertions(+), 20 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 14350ce1bb..c31c6badd3 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -36,7 +36,7 @@ use datafusion_expr::Accumulator;
 use datafusion_physical_expr::{
     equivalence::project_equivalence_properties,
     expressions::Column,
-    normalize_out_expr_with_columns_map, reverse_order_bys,
+    normalize_out_expr_with_columns_map, physical_exprs_contains, 
reverse_order_bys,
     utils::{convert_to_expr, get_indices_of_matching_exprs},
     AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
     PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
@@ -575,6 +575,27 @@ fn calc_required_input_ordering(
     
Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
 }
 
+/// Check whether group by expression contains all of the expression inside 
`requirement`
+// As an example Group By (c,b,a) contains all of the expressions in the 
`requirement`: (a ASC, b DESC)
+fn group_by_contains_all_requirements(
+    group_by: &PhysicalGroupBy,
+    requirement: &LexOrdering,
+) -> bool {
+    let physical_exprs = group_by
+        .expr()
+        .iter()
+        .map(|(expr, _alias)| expr.clone())
+        .collect::<Vec<_>>();
+    // When we have multiple groups (grouping set)
+    // since group by may be calculated on the subset of the group_by.expr()
+    // it is not guaranteed to have all of the requirements among group by 
expressions.
+    // Hence do the analysis: whether group by contains all requirements in 
the single group case.
+    group_by.groups.len() <= 1
+        && requirement
+            .iter()
+            .all(|req| physical_exprs_contains(&physical_exprs, &req.expr))
+}
+
 impl AggregateExec {
     /// Create a new hash aggregate execution plan
     pub fn try_new(
@@ -601,16 +622,22 @@ impl AggregateExec {
             .iter()
             .zip(order_by_expr)
             .map(|(aggr_expr, fn_reqs)| {
-                // If the aggregation function is order-sensitive and we are
-                // performing a "first stage" calculation, keep the ordering
-                // requirement as is; otherwise ignore the ordering 
requirement.
+                // If
+                // - aggregation function is order-sensitive and
+                // - aggregation is performing a "first stage" calculation, and
+                // - at least one of the aggregate function requirement is not 
inside group by expression
+                // keep the ordering requirement as is; otherwise ignore the 
ordering requirement.
                 // In non-first stage modes, we accumulate data (using 
`merge_batch`)
                 // from different partitions (i.e. merge partial results). 
During
                 // this merge, we consider the ordering of each partial result.
                 // Hence, we do not need to use the ordering requirement in 
such
                 // modes as long as partial results are generated with the
                 // correct ordering.
-                fn_reqs.filter(|_| is_order_sensitive(aggr_expr) && 
mode.is_first_stage())
+                fn_reqs.filter(|req| {
+                    is_order_sensitive(aggr_expr)
+                        && mode.is_first_stage()
+                        && !group_by_contains_all_requirements(&group_by, req)
+                })
             })
             .collect::<Vec<_>>();
         let mut aggregator_reverse_reqs = None;
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index a8e49bfbd6..85081c24c3 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -61,7 +61,7 @@ pub use equivalence::{
 };
 
 pub use partitioning::{Distribution, Partitioning};
-pub use physical_expr::{PhysicalExpr, PhysicalExprRef};
+pub use physical_expr::{physical_exprs_contains, PhysicalExpr, 
PhysicalExprRef};
 pub use planner::create_physical_expr;
 pub use scalar_function::ScalarFunctionExpr;
 pub use sort_expr::{
diff --git a/datafusion/physical-expr/src/physical_expr.rs 
b/datafusion/physical-expr/src/physical_expr.rs
index ce3b7b6cf4..81702d8bfa 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -181,3 +181,14 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
         any
     }
 }
+
+/// It is similar to contains method of vector.
+/// Finds whether `expr` is among `physical_exprs`.
+pub fn physical_exprs_contains(
+    physical_exprs: &[Arc<dyn PhysicalExpr>],
+    expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+    physical_exprs
+        .iter()
+        .any(|physical_expr| physical_expr.eq(expr))
+}
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index 0c53a58ccd..c93617f352 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2127,19 +2127,18 @@ Projection: annotated_data_infinite2.a, 
annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, 
FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a 
DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], 
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], 
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 24
-0 1 49
-1 2 74
-1 3 99
+0 0 0
+0 1 25
+1 2 50
+1 3 75
 
 # test_source_sorted_groupby4
 
@@ -2154,19 +2153,18 @@ Projection: annotated_data_infinite2.a, 
annotated_data_infinite2.b, LAST_VALUE(a
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, 
LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a 
DESC NULLS FIRST]@2 as last_c]
---AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], 
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], 
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24
+0 1 49
+1 2 74
+1 3 99
 
 # when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement
 # queries should still work, However, result depends on the scanning order and

Reply via email to