This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b6d9e68 Refine create_datafusion_context() (#778)
9b6d9e68 is described below

commit 9b6d9e68b7903cd894b866fdd9acd79b82e8f904
Author: yahoNanJing <[email protected]>
AuthorDate: Thu May 18 10:04:36 2023 +0800

    Refine create_datafusion_context() (#778)
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/scheduler/src/state/session_manager.rs | 54 +++----------------------
 1 file changed, 5 insertions(+), 49 deletions(-)

diff --git a/ballista/scheduler/src/state/session_manager.rs 
b/ballista/scheduler/src/state/session_manager.rs
index e07dbe90..e89f6dae 100644
--- a/ballista/scheduler/src/state/session_manager.rs
+++ b/ballista/scheduler/src/state/session_manager.rs
@@ -21,8 +21,6 @@ use ballista_core::error::Result;
 use datafusion::prelude::{SessionConfig, SessionContext};
 
 use crate::cluster::JobState;
-use datafusion::common::ScalarValue;
-use log::warn;
 use std::sync::Arc;
 
 #[derive(Clone)]
@@ -60,58 +58,16 @@ pub fn create_datafusion_context(
     ballista_config: &BallistaConfig,
     session_builder: SessionBuilder,
 ) -> Arc<SessionContext> {
-    let config = SessionConfig::new()
+    let config =
+        
SessionConfig::from_string_hash_map(ballista_config.settings().clone()).unwrap();
+    let config = config
         .with_target_partitions(ballista_config.default_shuffle_partitions())
         .with_batch_size(ballista_config.default_batch_size())
         .with_repartition_joins(ballista_config.repartition_joins())
         
.with_repartition_aggregations(ballista_config.repartition_aggregations())
         .with_repartition_windows(ballista_config.repartition_windows())
-        .with_parquet_pruning(ballista_config.parquet_pruning());
-    let config = propagate_ballista_configs(config, ballista_config);
-
+        .with_parquet_pruning(ballista_config.parquet_pruning())
+        .set_bool("datafusion.optimizer.enable_round_robin_repartition", 
false);
     let session_state = session_builder(config);
     Arc::new(SessionContext::with_state(session_state))
 }
-
-fn propagate_ballista_configs(
-    config: SessionConfig,
-    ballista_config: &BallistaConfig,
-) -> SessionConfig {
-    let mut config = config;
-    // TODO we cannot just pass string values along to DataFusion configs
-    // and we will need to improve that in the next release of DataFusion
-    // see https://github.com/apache/arrow-datafusion/issues/3500
-    for (k, v) in ballista_config.settings() {
-        // see https://arrow.apache.org/datafusion/user-guide/configs.html for 
explanation of these configs
-        match k.as_str() {
-            "datafusion.optimizer.filter_null_join_keys" => {
-                config = config.set(
-                    k,
-                    
ScalarValue::Boolean(Some(v.parse::<bool>().unwrap_or(false))),
-                )
-            }
-            "datafusion.execution.coalesce_batches" => {
-                config = config.set(
-                    k,
-                    
ScalarValue::Boolean(Some(v.parse::<bool>().unwrap_or(true))),
-                )
-            }
-            "datafusion.execution.coalesce_target_batch_size" => {
-                config = config.set(
-                    k,
-                    
ScalarValue::UInt64(Some(v.parse::<u64>().unwrap_or(4096))),
-                )
-            }
-            "datafusion.optimizer.skip_failed_rules" => {
-                config = config.set(
-                    k,
-                    
ScalarValue::Boolean(Some(v.parse::<bool>().unwrap_or(true))),
-                )
-            }
-            _ => {
-                warn!("Ignoring unknown configuration option {} = {}", k, v);
-            }
-        }
-    }
-    config
-}

Reply via email to