This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch dynamically_optimize_aggregate in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
commit 7306f319f36b93d3c47715bf96063812b3ed1b7a Author: Daniƫl Heres <[email protected]> AuthorDate: Mon Nov 27 13:35:35 2023 +0100 Dynamically optimize aggregate based on shuffle stats --- ballista/scheduler/src/state/execution_graph/execution_stage.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs index fcac54d5..f082fe43 100644 --- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs @@ -22,6 +22,7 @@ use std::iter::FromIterator; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::display::DisplayableExecutionPlan; @@ -348,9 +349,13 @@ impl UnresolvedStage { &input_locations, )?; - // Optimize join order based on new resolved statistics + // Optimize join order and statistics based on new resolved statistics let optimize_join = JoinSelection::new(); - let plan = optimize_join.optimize(plan, SessionConfig::default().options())?; + let config = SessionConfig::default(); + let plan = optimize_join.optimize(plan, config.options())?; + let optimize_aggregate = AggregateStatistics::new(); + let plan = + optimize_aggregate.optimize(plan, SessionConfig::default().options())?; Ok(ResolvedStage::new( self.stage_id,
