This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 51adfb01 Fix regression with TPC-H benchmark (#1060)
51adfb01 is described below

commit 51adfb01c3a6c3c3672774a2d6654c851ecd3072
Author: Andy Grove <agr...@apache.org>
AuthorDate: Tue Sep 24 04:58:18 2024 -0600

    Fix regression with TPC-H benchmark (#1060)
    
    * Fix regression with TPC-H benchmark
    
    * add link to issue
---
 ballista/executor/src/execution_loop.rs                        |  6 ++++++
 .../scheduler/src/state/execution_graph/execution_stage.rs     | 10 ++++++----
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 111120bd..0c169e2d 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -32,6 +32,8 @@ use ballista_core::error::BallistaError;
 use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
 use ballista_core::serde::BallistaCodec;
 use datafusion::execution::context::TaskContext;
+use datafusion::functions::datetime::date_part;
+use datafusion::functions::unicode::substr;
 use datafusion::functions_aggregate::covariance::{covar_pop_udaf, 
covar_samp_udaf};
 use datafusion::functions_aggregate::sum::sum_udaf;
 use datafusion::functions_aggregate::variance::var_samp_udaf;
@@ -199,6 +201,10 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     task_aggregate_functions.insert("covar_pop".to_string(), covar_pop_udaf());
     task_aggregate_functions.insert("SUM".to_string(), sum_udaf());
 
+    // TODO which other functions need adding here?
+    task_scalar_functions.insert("date_part".to_string(), date_part());
+    task_scalar_functions.insert("substr".to_string(), substr());
+
     for window_func in executor.window_functions.clone() {
         task_window_functions.insert(window_func.0, window_func.1);
     }
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index e9b62a06..65a2d012 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -23,7 +23,6 @@ use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
 use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics;
-use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
@@ -349,10 +348,13 @@ impl UnresolvedStage {
             &input_locations,
         )?;
 
+        // TODO reinstate this logic once 
https://github.com/apache/datafusion/issues/10978
+        // is fixed
         // Optimize join order and statistics based on new resolved statistics
-        let optimize_join = JoinSelection::new();
-        let config = SessionConfig::default();
-        let plan = optimize_join.optimize(plan, config.options())?;
+        // let optimize_join = JoinSelection::new();
+        // let config = SessionConfig::default();
+        // let plan = optimize_join.optimize(plan, config.options())?;
+
         let optimize_aggregate = AggregateStatistics::new();
         let plan =
             optimize_aggregate.optimize(plan, 
SessionConfig::default().options())?;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to