This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch add_collect_statistics in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
commit 4d98d2682d98b4d280f314da00d4758fb3182b22 Author: Daniƫl Heres <[email protected]> AuthorDate: Tue May 30 13:53:15 2023 +0200 Add config to collect statistics --- ballista/client/src/prelude.rs | 2 +- ballista/core/src/config.rs | 10 ++++++++++ .../scheduler/src/state/execution_graph/execution_stage.rs | 10 ++++++++-- ballista/scheduler/src/state/session_manager.rs | 2 ++ benchmarks/src/bin/tpch.rs | 3 ++- python/src/context.rs | 2 ++ 6 files changed, 25 insertions(+), 4 deletions(-) diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs index 5b73728e..c2984789 100644 --- a/ballista/client/src/prelude.rs +++ b/ballista/client/src/prelude.rs @@ -22,7 +22,7 @@ pub use ballista_core::{ BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_PLUGIN_DIR, BALLISTA_REPARTITION_AGGREGATIONS, BALLISTA_REPARTITION_JOINS, - BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA, + BALLISTA_REPARTITION_WINDOWS, BALLISTA_WITH_INFORMATION_SCHEMA, BALLISTA_COLLECT_STATISTICS }, error::{BallistaError, Result}, }; diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index c3981bef..653167ba 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -34,6 +34,8 @@ pub const BALLISTA_REPARTITION_JOINS: &str = "ballista.repartition.joins"; pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggregations"; pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows"; pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning"; +pub const BALLISTA_COLLECT_STATISTICS: &str = "ballista.collect_statistics"; + pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema"; /// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init. pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir"; @@ -182,6 +184,10 @@ impl BallistaConfig { ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(), "Sets whether enable information_schema".to_string(), DataType::Boolean, Some("false".to_string())), + ConfigEntry::new(BALLISTA_COLLECT_STATISTICS.to_string(), + "Configuration for collecting statistics during scan".to_string(), + DataType::Boolean, Some("true".to_string()) + ), ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(), "Sets the plugin dir".to_string(), DataType::Utf8, Some("".to_string())), @@ -224,6 +230,10 @@ impl BallistaConfig { self.get_bool_setting(BALLISTA_PARQUET_PRUNING) } + pub fn collect_statistics(&self) -> bool { + self.get_bool_setting(BALLISTA_COLLECT_STATISTICS) + } + pub fn default_with_information_schema(&self) -> bool { self.get_bool_setting(BALLISTA_WITH_INFORMATION_SCHEMA) } diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs index b187d8e9..628e8c6f 100644 --- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs @@ -22,6 +22,7 @@ use std::iter::FromIterator; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::display::DisplayableExecutionPlan; @@ -362,9 +363,14 @@ impl UnresolvedStage { &input_locations, )?; - // Optimize join order based on new resolved statistics + // Optimize plan based on new resolved statistics let optimize_join = JoinSelection::new(); - let plan = optimize_join.optimize(plan, &SessionConfig::new())?; + let optimize_aggregate = AggregateStatistics::new(); + + let cfg: SessionConfig = SessionConfig::new(); + + let plan: Arc<dyn ExecutionPlan> = optimize_join.optimize(plan, &cfg)?; + let plan: Arc<dyn ExecutionPlan> = optimize_aggregate.optimize(plan, &cfg)?; Ok(ResolvedStage::new( self.stage_id, diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs index eb7df9f2..56701d0b 100644 --- a/ballista/scheduler/src/state/session_manager.rs +++ b/ballista/scheduler/src/state/session_manager.rs @@ -121,6 +121,7 @@ pub fn create_datafusion_context( .with_repartition_joins(ballista_config.repartition_joins()) .with_repartition_aggregations(ballista_config.repartition_aggregations()) .with_repartition_windows(ballista_config.repartition_windows()) + .with_collect_statistics(ballista_config.collect_statistics()) .with_parquet_pruning(ballista_config.parquet_pruning()); let config = propagate_ballista_configs(config, ballista_config); @@ -142,6 +143,7 @@ pub fn update_datafusion_context( .with_repartition_joins(ballista_config.repartition_joins()) .with_repartition_aggregations(ballista_config.repartition_aggregations()) .with_repartition_windows(ballista_config.repartition_windows()) + .with_collect_statistics(ballista_config.collect_statistics()) .with_parquet_pruning(ballista_config.parquet_pruning()); let config = propagate_ballista_configs(config, ballista_config); mut_state.config = config; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index aabfea6f..624cf9e4 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -20,7 +20,7 @@ use ballista::context::BallistaContext; use ballista::prelude::{ BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, - BALLISTA_JOB_NAME, + BALLISTA_JOB_NAME, BALLISTA_COLLECT_STATISTICS, }; use datafusion::arrow::array::*; use datafusion::arrow::util::display::array_value_to_string; @@ -363,6 +363,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { &format!("Query derived from TPC-H q{}", opt.query), ) .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size)) + .set(BALLISTA_COLLECT_STATISTICS, "true") .build() .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; diff --git a/python/src/context.rs b/python/src/context.rs index 26c5661a..8b56f690 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -72,6 +72,7 @@ impl PySessionContext { repartition_windows: bool, parquet_pruning: bool, target_partitions: Option<usize>, + collect_statistics: bool, // TODO: config_options ) -> Self { let cfg = SessionConfig::new() @@ -81,6 +82,7 @@ impl PySessionContext { .with_repartition_joins(repartition_joins) .with_repartition_aggregations(repartition_aggregations) .with_repartition_windows(repartition_windows) + .with_collect_statistics(collect_statistics) .with_parquet_pruning(parquet_pruning); let cfg_full = match target_partitions {
