This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cde74016e9 [Minor]: Produce better plan when group by contains all of
the ordering requirements (#7542)
cde74016e9 is described below
commit cde74016e930ffd9c55eed403b84bcd026f38d0f
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Sep 13 23:50:21 2023 +0300
[Minor]: Produce better plan when group by contains all of the ordering
requirements (#7542)
* Convert partial + final to single aggregate in two tests
* Address reviews
* Use any instead of loop
---
.../core/src/physical_plan/aggregates/mod.rs | 37 +++++++++++++++++++---
datafusion/physical-expr/src/lib.rs | 2 +-
datafusion/physical-expr/src/physical_expr.rs | 11 +++++++
datafusion/sqllogictest/test_files/groupby.slt | 26 +++++++--------
4 files changed, 56 insertions(+), 20 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 14350ce1bb..c31c6badd3 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -36,7 +36,7 @@ use datafusion_expr::Accumulator;
use datafusion_physical_expr::{
equivalence::project_equivalence_properties,
expressions::Column,
- normalize_out_expr_with_columns_map, reverse_order_bys,
+ normalize_out_expr_with_columns_map, physical_exprs_contains,
reverse_order_bys,
utils::{convert_to_expr, get_indices_of_matching_exprs},
AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
@@ -575,6 +575,27 @@ fn calc_required_input_ordering(
Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
}
+/// Check whether group by expression contains all of the expression inside
`requirement`
+// As an example Group By (c,b,a) contains all of the expressions in the
`requirement`: (a ASC, b DESC)
+fn group_by_contains_all_requirements(
+ group_by: &PhysicalGroupBy,
+ requirement: &LexOrdering,
+) -> bool {
+ let physical_exprs = group_by
+ .expr()
+ .iter()
+ .map(|(expr, _alias)| expr.clone())
+ .collect::<Vec<_>>();
+ // When we have multiple groups (grouping set)
+ // since group by may be calculated on the subset of the group_by.expr()
+ // it is not guaranteed to have all of the requirements among group by
expressions.
+ // Hence do the analysis: whether group by contains all requirements in
the single group case.
+ group_by.groups.len() <= 1
+ && requirement
+ .iter()
+ .all(|req| physical_exprs_contains(&physical_exprs, &req.expr))
+}
+
impl AggregateExec {
/// Create a new hash aggregate execution plan
pub fn try_new(
@@ -601,16 +622,22 @@ impl AggregateExec {
.iter()
.zip(order_by_expr)
.map(|(aggr_expr, fn_reqs)| {
- // If the aggregation function is order-sensitive and we are
- // performing a "first stage" calculation, keep the ordering
- // requirement as is; otherwise ignore the ordering
requirement.
+ // If
+ // - aggregation function is order-sensitive and
+ // - aggregation is performing a "first stage" calculation, and
+ // - at least one of the aggregate function requirement is not
inside group by expression
+ // keep the ordering requirement as is; otherwise ignore the
ordering requirement.
// In non-first stage modes, we accumulate data (using
`merge_batch`)
// from different partitions (i.e. merge partial results).
During
// this merge, we consider the ordering of each partial result.
// Hence, we do not need to use the ordering requirement in
such
// modes as long as partial results are generated with the
// correct ordering.
- fn_reqs.filter(|_| is_order_sensitive(aggr_expr) &&
mode.is_first_stage())
+ fn_reqs.filter(|req| {
+ is_order_sensitive(aggr_expr)
+ && mode.is_first_stage()
+ && !group_by_contains_all_requirements(&group_by, req)
+ })
})
.collect::<Vec<_>>();
let mut aggregator_reverse_reqs = None;
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index a8e49bfbd6..85081c24c3 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -61,7 +61,7 @@ pub use equivalence::{
};
pub use partitioning::{Distribution, Partitioning};
-pub use physical_expr::{PhysicalExpr, PhysicalExprRef};
+pub use physical_expr::{physical_exprs_contains, PhysicalExpr,
PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index ce3b7b6cf4..81702d8bfa 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -181,3 +181,14 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
any
}
}
+
+/// It is similar to contains method of vector.
+/// Finds whether `expr` is among `physical_exprs`.
+pub fn physical_exprs_contains(
+ physical_exprs: &[Arc<dyn PhysicalExpr>],
+ expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+ physical_exprs
+ .iter()
+ .any(|physical_expr| physical_expr.eq(expr))
+}
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index 0c53a58ccd..c93617f352 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2127,19 +2127,18 @@ Projection: annotated_data_infinite2.a,
annotated_data_infinite2.b, FIRST_VALUE(
----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b,
FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a
DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b],
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b],
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true
query III
SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
FROM annotated_data_infinite2
GROUP BY a, b
----
-0 0 24
-0 1 49
-1 2 74
-1 3 99
+0 0 0
+0 1 25
+1 2 50
+1 3 75
# test_source_sorted_groupby4
@@ -2154,19 +2153,18 @@ Projection: annotated_data_infinite2.a,
annotated_data_infinite2.b, LAST_VALUE(a
----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b,
LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a
DESC NULLS FIRST]@2 as last_c]
---AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b],
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-----AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b],
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true
query III
SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
FROM annotated_data_infinite2
GROUP BY a, b
----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24
+0 1 49
+1 2 74
+1 3 99
# when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement
# queries should still work, However, result depends on the scanning order and