This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push: new 51adfb01 Fix regression with TPC-H benchmark (#1060) 51adfb01 is described below commit 51adfb01c3a6c3c3672774a2d6654c851ecd3072 Author: Andy Grove <agr...@apache.org> AuthorDate: Tue Sep 24 04:58:18 2024 -0600 Fix regression with TPC-H benchmark (#1060) * Fix regression with TPC-H benchmark * add link to issue --- ballista/executor/src/execution_loop.rs | 6 ++++++ .../scheduler/src/state/execution_graph/execution_stage.rs | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 111120bd..0c169e2d 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -32,6 +32,8 @@ use ballista_core::error::BallistaError; use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId}; use ballista_core::serde::BallistaCodec; use datafusion::execution::context::TaskContext; +use datafusion::functions::datetime::date_part; +use datafusion::functions::unicode::substr; use datafusion::functions_aggregate::covariance::{covar_pop_udaf, covar_samp_udaf}; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::functions_aggregate::variance::var_samp_udaf; @@ -199,6 +201,10 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP task_aggregate_functions.insert("covar_pop".to_string(), covar_pop_udaf()); task_aggregate_functions.insert("SUM".to_string(), sum_udaf()); + // TODO which other functions need adding here? + task_scalar_functions.insert("date_part".to_string(), date_part()); + task_scalar_functions.insert("substr".to_string(), substr()); + for window_func in executor.window_functions.clone() { task_window_functions.insert(window_func.0, window_func.1); } diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs index e9b62a06..65a2d012 100644 --- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; -use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; @@ -349,10 +348,13 @@ impl UnresolvedStage { &input_locations, )?; + // TODO reinstate this logic once https://github.com/apache/datafusion/issues/10978 + // is fixed // Optimize join order and statistics based on new resolved statistics - let optimize_join = JoinSelection::new(); - let config = SessionConfig::default(); - let plan = optimize_join.optimize(plan, config.options())?; + // let optimize_join = JoinSelection::new(); + // let config = SessionConfig::default(); + // let plan = optimize_join.optimize(plan, config.options())?; + let optimize_aggregate = AggregateStatistics::new(); let plan = optimize_aggregate.optimize(plan, SessionConfig::default().options())?; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org