This is an automated email from the ASF dual-hosted git repository.

linwei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b6341c48a Improve CombinePartialFinalAggregate code (#12128)
2b6341c48a is described below

commit 2b6341c48a69528bb7793b11f46a29a1826e8c2d
Author: 张林伟 <[email protected]>
AuthorDate: Sat Aug 24 08:39:36 2024 +0800

    Improve CombinePartialFinalAggregate code (#12128)
---
 .../combine_partial_final_agg.rs                   | 107 ++++++++++-----------
 1 file changed, 51 insertions(+), 56 deletions(-)

diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index f65a4c837a..8cbb187f7b 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -51,62 +51,57 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate 
{
         _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         plan.transform_down(|plan| {
-            let transformed =
-                plan.as_any()
-                    .downcast_ref::<AggregateExec>()
-                    .and_then(|agg_exec| {
-                        if matches!(
-                            agg_exec.mode(),
-                            AggregateMode::Final | 
AggregateMode::FinalPartitioned
-                        ) {
-                            agg_exec
-                                .input()
-                                .as_any()
-                                .downcast_ref::<AggregateExec>()
-                                .and_then(|input_agg_exec| {
-                                    if matches!(
-                                        input_agg_exec.mode(),
-                                        AggregateMode::Partial
-                                    ) && can_combine(
-                                        (
-                                            agg_exec.group_expr(),
-                                            agg_exec.aggr_expr(),
-                                            agg_exec.filter_expr(),
-                                        ),
-                                        (
-                                            input_agg_exec.group_expr(),
-                                            input_agg_exec.aggr_expr(),
-                                            input_agg_exec.filter_expr(),
-                                        ),
-                                    ) {
-                                        let mode =
-                                            if agg_exec.mode() == 
&AggregateMode::Final {
-                                                AggregateMode::Single
-                                            } else {
-                                                
AggregateMode::SinglePartitioned
-                                            };
-                                        AggregateExec::try_new(
-                                            mode,
-                                            
input_agg_exec.group_expr().clone(),
-                                            
input_agg_exec.aggr_expr().to_vec(),
-                                            
input_agg_exec.filter_expr().to_vec(),
-                                            input_agg_exec.input().clone(),
-                                            input_agg_exec.input_schema(),
-                                        )
-                                        .map(|combined_agg| {
-                                            
combined_agg.with_limit(agg_exec.limit())
-                                        })
-                                        .ok()
-                                        .map(Arc::new)
-                                    } else {
-                                        None
-                                    }
-                                })
-                        } else {
-                            None
-                        }
-                    });
-
+            // Check if the plan is AggregateExec
+            let Some(agg_exec) = plan.as_any().downcast_ref::<AggregateExec>() 
else {
+                return Ok(Transformed::no(plan));
+            };
+
+            if !matches!(
+                agg_exec.mode(),
+                AggregateMode::Final | AggregateMode::FinalPartitioned
+            ) {
+                return Ok(Transformed::no(plan));
+            }
+
+            // Check if the input is AggregateExec
+            let Some(input_agg_exec) =
+                agg_exec.input().as_any().downcast_ref::<AggregateExec>()
+            else {
+                return Ok(Transformed::no(plan));
+            };
+
+            let transformed = if matches!(input_agg_exec.mode(), 
AggregateMode::Partial)
+                && can_combine(
+                    (
+                        agg_exec.group_expr(),
+                        agg_exec.aggr_expr(),
+                        agg_exec.filter_expr(),
+                    ),
+                    (
+                        input_agg_exec.group_expr(),
+                        input_agg_exec.aggr_expr(),
+                        input_agg_exec.filter_expr(),
+                    ),
+                ) {
+                let mode = if agg_exec.mode() == &AggregateMode::Final {
+                    AggregateMode::Single
+                } else {
+                    AggregateMode::SinglePartitioned
+                };
+                AggregateExec::try_new(
+                    mode,
+                    input_agg_exec.group_expr().clone(),
+                    input_agg_exec.aggr_expr().to_vec(),
+                    input_agg_exec.filter_expr().to_vec(),
+                    input_agg_exec.input().clone(),
+                    input_agg_exec.input_schema(),
+                )
+                .map(|combined_agg| combined_agg.with_limit(agg_exec.limit()))
+                .ok()
+                .map(Arc::new)
+            } else {
+                None
+            };
             Ok(if let Some(transformed) = transformed {
                 Transformed::yes(transformed)
             } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to