This is an automated email from the ASF dual-hosted git repository.
linwei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2b6341c48a Improve CombinePartialFinalAggregate code (#12128)
2b6341c48a is described below
commit 2b6341c48a69528bb7793b11f46a29a1826e8c2d
Author: 张林伟 <[email protected]>
AuthorDate: Sat Aug 24 08:39:36 2024 +0800
Improve CombinePartialFinalAggregate code (#12128)
---
.../combine_partial_final_agg.rs | 107 ++++++++++-----------
1 file changed, 51 insertions(+), 56 deletions(-)
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index f65a4c837a..8cbb187f7b 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -51,62 +51,57 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate
{
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(|plan| {
- let transformed =
- plan.as_any()
- .downcast_ref::<AggregateExec>()
- .and_then(|agg_exec| {
- if matches!(
- agg_exec.mode(),
- AggregateMode::Final |
AggregateMode::FinalPartitioned
- ) {
- agg_exec
- .input()
- .as_any()
- .downcast_ref::<AggregateExec>()
- .and_then(|input_agg_exec| {
- if matches!(
- input_agg_exec.mode(),
- AggregateMode::Partial
- ) && can_combine(
- (
- agg_exec.group_expr(),
- agg_exec.aggr_expr(),
- agg_exec.filter_expr(),
- ),
- (
- input_agg_exec.group_expr(),
- input_agg_exec.aggr_expr(),
- input_agg_exec.filter_expr(),
- ),
- ) {
- let mode =
- if agg_exec.mode() ==
&AggregateMode::Final {
- AggregateMode::Single
- } else {
-
AggregateMode::SinglePartitioned
- };
- AggregateExec::try_new(
- mode,
-
input_agg_exec.group_expr().clone(),
-
input_agg_exec.aggr_expr().to_vec(),
-
input_agg_exec.filter_expr().to_vec(),
- input_agg_exec.input().clone(),
- input_agg_exec.input_schema(),
- )
- .map(|combined_agg| {
-
combined_agg.with_limit(agg_exec.limit())
- })
- .ok()
- .map(Arc::new)
- } else {
- None
- }
- })
- } else {
- None
- }
- });
-
+ // Check if the plan is AggregateExec
+ let Some(agg_exec) = plan.as_any().downcast_ref::<AggregateExec>()
else {
+ return Ok(Transformed::no(plan));
+ };
+
+ if !matches!(
+ agg_exec.mode(),
+ AggregateMode::Final | AggregateMode::FinalPartitioned
+ ) {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Check if the input is AggregateExec
+ let Some(input_agg_exec) =
+ agg_exec.input().as_any().downcast_ref::<AggregateExec>()
+ else {
+ return Ok(Transformed::no(plan));
+ };
+
+ let transformed = if matches!(input_agg_exec.mode(),
AggregateMode::Partial)
+ && can_combine(
+ (
+ agg_exec.group_expr(),
+ agg_exec.aggr_expr(),
+ agg_exec.filter_expr(),
+ ),
+ (
+ input_agg_exec.group_expr(),
+ input_agg_exec.aggr_expr(),
+ input_agg_exec.filter_expr(),
+ ),
+ ) {
+ let mode = if agg_exec.mode() == &AggregateMode::Final {
+ AggregateMode::Single
+ } else {
+ AggregateMode::SinglePartitioned
+ };
+ AggregateExec::try_new(
+ mode,
+ input_agg_exec.group_expr().clone(),
+ input_agg_exec.aggr_expr().to_vec(),
+ input_agg_exec.filter_expr().to_vec(),
+ input_agg_exec.input().clone(),
+ input_agg_exec.input_schema(),
+ )
+ .map(|combined_agg| combined_agg.with_limit(agg_exec.limit()))
+ .ok()
+ .map(Arc::new)
+ } else {
+ None
+ };
Ok(if let Some(transformed) = transformed {
Transformed::yes(transformed)
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]