This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 9b6d9e68 Refine create_datafusion_context() (#778)
9b6d9e68 is described below
commit 9b6d9e68b7903cd894b866fdd9acd79b82e8f904
Author: yahoNanJing <[email protected]>
AuthorDate: Thu May 18 10:04:36 2023 +0800
Refine create_datafusion_context() (#778)
Co-authored-by: yangzhong <[email protected]>
---
ballista/scheduler/src/state/session_manager.rs | 54 +++----------------------
1 file changed, 5 insertions(+), 49 deletions(-)
diff --git a/ballista/scheduler/src/state/session_manager.rs
b/ballista/scheduler/src/state/session_manager.rs
index e07dbe90..e89f6dae 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -21,8 +21,6 @@ use ballista_core::error::Result;
use datafusion::prelude::{SessionConfig, SessionContext};
use crate::cluster::JobState;
-use datafusion::common::ScalarValue;
-use log::warn;
use std::sync::Arc;
#[derive(Clone)]
@@ -60,58 +58,16 @@ pub fn create_datafusion_context(
ballista_config: &BallistaConfig,
session_builder: SessionBuilder,
) -> Arc<SessionContext> {
- let config = SessionConfig::new()
+ let config =
+
SessionConfig::from_string_hash_map(ballista_config.settings().clone()).unwrap();
+ let config = config
.with_target_partitions(ballista_config.default_shuffle_partitions())
.with_batch_size(ballista_config.default_batch_size())
.with_repartition_joins(ballista_config.repartition_joins())
.with_repartition_aggregations(ballista_config.repartition_aggregations())
.with_repartition_windows(ballista_config.repartition_windows())
- .with_parquet_pruning(ballista_config.parquet_pruning());
- let config = propagate_ballista_configs(config, ballista_config);
-
+ .with_parquet_pruning(ballista_config.parquet_pruning())
+ .set_bool("datafusion.optimizer.enable_round_robin_repartition",
false);
let session_state = session_builder(config);
Arc::new(SessionContext::with_state(session_state))
}
-
-fn propagate_ballista_configs(
- config: SessionConfig,
- ballista_config: &BallistaConfig,
-) -> SessionConfig {
- let mut config = config;
- // TODO we cannot just pass string values along to DataFusion configs
- // and we will need to improve that in the next release of DataFusion
- // see https://github.com/apache/arrow-datafusion/issues/3500
- for (k, v) in ballista_config.settings() {
- // see https://arrow.apache.org/datafusion/user-guide/configs.html for
explanation of these configs
- match k.as_str() {
- "datafusion.optimizer.filter_null_join_keys" => {
- config = config.set(
- k,
-
ScalarValue::Boolean(Some(v.parse::<bool>().unwrap_or(false))),
- )
- }
- "datafusion.execution.coalesce_batches" => {
- config = config.set(
- k,
-
ScalarValue::Boolean(Some(v.parse::<bool>().unwrap_or(true))),
- )
- }
- "datafusion.execution.coalesce_target_batch_size" => {
- config = config.set(
- k,
-
ScalarValue::UInt64(Some(v.parse::<u64>().unwrap_or(4096))),
- )
- }
- "datafusion.optimizer.skip_failed_rules" => {
- config = config.set(
- k,
-
ScalarValue::Boolean(Some(v.parse::<bool>().unwrap_or(true))),
- )
- }
- _ => {
- warn!("Ignoring unknown configuration option {} = {}", k, v);
- }
- }
- }
- config
-}