This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 740a4fa2c Unify most of `SessionConfig` settings into `ConfigOptions` 
(#4492)
740a4fa2c is described below

commit 740a4fa2c6ba4b85875a433bb86e5b00435a5969
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Dec 5 14:56:32 2022 -0500

    Unify most of `SessionConfig` settings into `ConfigOptions` (#4492)
    
    * Unify most `SessionConfig` settings into `ConfigOptions`
    
    * Update set target_partitions in show_variable test
    
    * Normalize setting in docs
    
    * fix clippy
---
 benchmarks/src/bin/tpch.rs                         |   2 +-
 datafusion/core/benches/sort_limit_query_sql.rs    |   5 +-
 datafusion/core/src/config.rs                      |  98 +++++++++++-
 datafusion/core/src/dataframe.rs                   |  10 +-
 datafusion/core/src/datasource/listing/table.rs    |   2 +-
 .../core/src/datasource/listing_table_factory.rs   |   4 +-
 datafusion/core/src/execution/context.rs           | 176 ++++++++++++++-------
 datafusion/core/src/execution/options.rs           |   2 +-
 .../core/src/physical_optimizer/enforcement.rs     |   2 +-
 .../core/src/physical_optimizer/repartition.rs     |   4 +-
 datafusion/core/src/physical_plan/planner.rs       |  22 +--
 .../test_files/information_schema.slt              |  13 ++
 docs/source/user-guide/configs.md                  |   7 +
 13 files changed, 254 insertions(+), 93 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 5aa4a3848..699be4238 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -410,7 +410,7 @@ async fn get_table(
     let options = ListingOptions::new(format)
         .with_file_extension(extension)
         .with_target_partitions(target_partitions)
-        .with_collect_stat(ctx.config.collect_statistics);
+        .with_collect_stat(ctx.config.collect_statistics());
 
     let table_path = ListingTableUrl::parse(path)?;
     let config = 
ListingTableConfig::new(table_path).with_listing_options(options);
diff --git a/datafusion/core/benches/sort_limit_query_sql.rs 
b/datafusion/core/benches/sort_limit_query_sql.rs
index e7aa33bd7..efee5de13 100644
--- a/datafusion/core/benches/sort_limit_query_sql.rs
+++ b/datafusion/core/benches/sort_limit_query_sql.rs
@@ -23,6 +23,7 @@ use datafusion::datasource::listing::{
     ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
 };
 
+use datafusion::prelude::SessionConfig;
 use parking_lot::Mutex;
 use std::sync::Arc;
 
@@ -85,8 +86,8 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
 
     rt.block_on(async {
         // create local session context
-        let ctx = SessionContext::new();
-        ctx.state.write().config.target_partitions = 1;
+        let ctx =
+            
SessionContext::with_config(SessionConfig::new().with_target_partitions(1));
 
         let table_provider = Arc::new(csv.await);
         let mem_table = MemTable::load(table_provider, Some(partitions), 
&ctx.state())
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index c738bc0cf..034dd29fd 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -27,6 +27,28 @@ use std::env;
 use std::fmt::{Debug, Formatter};
 use std::sync::Arc;
 
+/// Configuration option "datafusion.execution.target_partitions"
+pub const OPT_TARGET_PARTITIONS: &str = 
"datafusion.execution.target_partitions";
+
+/// Configuration option "datafusion.catalog.create_default_catalog_and_schema"
+pub const OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA: &str =
+    "datafusion.catalog.create_default_catalog_and_schema";
+/// Configuration option "datafusion.catalog.information_schema"
+pub const OPT_INFORMATION_SCHEMA: &str = 
"datafusion.catalog.information_schema";
+
+/// Configuration option "datafusion.optimizer.repartition_joins"
+pub const OPT_REPARTITION_JOINS: &str = 
"datafusion.optimizer.repartition_joins";
+
+/// Configuration option "datafusion.optimizer.repartition_aggregations"
+pub const OPT_REPARTITION_AGGREGATIONS: &str =
+    "datafusion.optimizer.repartition_aggregations";
+
+/// Configuration option "datafusion.optimizer.repartition_windows"
+pub const OPT_REPARTITION_WINDOWS: &str = 
"datafusion.optimizer.repartition_windows";
+
+/// Configuration option "datafusion.execuction_collect_statistics"
+pub const OPT_COLLECT_STATISTICS: &str = 
"datafusion.execuction_collect_statistics";
+
 /// Configuration option "datafusion.optimizer.filter_null_join_keys"
 pub const OPT_FILTER_NULL_JOIN_KEYS: &str = 
"datafusion.optimizer.filter_null_join_keys";
 
@@ -199,7 +221,54 @@ impl BuiltInConfigs {
     /// configuration options
     pub fn new() -> Self {
         Self {
-            config_definitions: vec![ConfigDefinition::new_bool(
+            config_definitions: vec![ConfigDefinition::new_u64(
+                OPT_TARGET_PARTITIONS,
+                "Number of partitions for query execution. Increasing 
partitions can increase \
+                 concurrency. Defaults to the number of cpu cores on the 
system.",
+                num_cpus::get() as u64,
+            ),
+
+            ConfigDefinition::new_bool(
+                OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
+                "Whether the default catalog and schema should be created 
automatically.",
+                true
+            ),
+
+            ConfigDefinition::new_bool(
+                OPT_INFORMATION_SCHEMA,
+                "Should DataFusion provide access to `information_schema` \
+                 virtual tables for displaying schema information",
+                false
+            ),
+
+            ConfigDefinition::new_bool(
+                OPT_REPARTITION_JOINS,
+                "Should DataFusion repartition data using the join keys to 
execute joins in parallel \
+                 using the provided `target_partitions` level",
+                true
+            ),
+
+            ConfigDefinition::new_bool(
+                OPT_REPARTITION_AGGREGATIONS,
+                "Should DataFusion repartition data using the aggregate keys 
to execute aggregates \
+                 in parallel using the provided `target_partitions` level",
+                true
+            ),
+
+            ConfigDefinition::new_bool(
+                OPT_REPARTITION_WINDOWS,
+                "Should DataFusion collect statistics after listing files",
+                true
+            ),
+
+            ConfigDefinition::new_bool(
+                OPT_COLLECT_STATISTICS,
+                "Should DataFusion repartition data using the partitions keys 
to execute window \
+                 functions in parallel using the provided `target_partitions` 
level",
+                false
+            ),
+
+            ConfigDefinition::new_bool(
                 OPT_FILTER_NULL_JOIN_KEYS,
                 "When set to true, the optimizer will insert filters before a 
join between \
                 a nullable and non-nullable column to filter out nulls on the 
nullable side. This \
@@ -336,11 +405,14 @@ impl BuiltInConfigs {
         let configs = Self::new();
         let mut docs = "| key | type | default | description |\n".to_string();
         docs += "|-----|------|---------|-------------|\n";
-        for config in configs
+
+        let config_definitions: Vec<_> = configs
             .config_definitions
-            .iter()
-            .sorted_by_key(|c| c.key.as_str())
-        {
+            .into_iter()
+            .map(normalize_for_display)
+            .collect();
+
+        for config in config_definitions.iter().sorted_by_key(|c| 
c.key.as_str()) {
             let _ = writeln!(
                 &mut docs,
                 "| {} | {} | {} | {} |",
@@ -351,6 +423,16 @@ impl BuiltInConfigs {
     }
 }
 
+/// Normalizes a config definition prior to markdown display
+fn normalize_for_display(mut v: ConfigDefinition) -> ConfigDefinition {
+    // Since the default value of target_partitions depends on the number of 
cores,
+    // set the default value to 0 in the docs.
+    if v.key == OPT_TARGET_PARTITIONS {
+        v.default_value = ScalarValue::UInt64(Some(0))
+    }
+    v
+}
+
 /// Configuration options struct. This can contain values for built-in and 
custom options
 #[derive(Clone)]
 pub struct ConfigOptions {
@@ -437,6 +519,12 @@ impl ConfigOptions {
         self.set(key, ScalarValue::UInt64(Some(value)))
     }
 
+    /// set a `usize` configuration option
+    pub fn set_usize(&mut self, key: &str, value: usize) {
+        let value: u64 = value.try_into().expect("convert u64 to usize");
+        self.set(key, ScalarValue::UInt64(Some(value)))
+    }
+
     /// set a `String` configuration option
     pub fn set_string(&mut self, key: &str, value: impl Into<String>) {
         self.set(key, ScalarValue::Utf8(Some(value.into())))
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 495536fcd..d7dd04f88 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -839,6 +839,7 @@ mod tests {
     use std::vec;
 
     use super::*;
+    use crate::execution::context::SessionConfig;
     use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
     use crate::physical_plan::ColumnarValue;
     use crate::physical_plan::Partitioning;
@@ -1538,8 +1539,7 @@ mod tests {
         assert_eq!(4016, union_rows.iter().map(|x| 
x.num_rows()).sum::<usize>());
 
         let physical_plan = union.create_physical_plan().await?;
-        let default_partition_count =
-            SessionContext::new().copied_config().target_partitions;
+        let default_partition_count = SessionConfig::new().target_partitions();
 
         // For partition aware union, the output partition count should not be 
changed.
         assert_eq!(
@@ -1594,8 +1594,7 @@ mod tests {
         assert_eq!(916, union_rows.iter().map(|x| 
x.num_rows()).sum::<usize>());
 
         let physical_plan = union.create_physical_plan().await?;
-        let default_partition_count =
-            SessionContext::new().copied_config().target_partitions;
+        let default_partition_count = SessionConfig::new().target_partitions();
 
         // For non-partition aware union, the output partitioning count should 
be the combination of all output partitions count
         assert!(matches!(
@@ -1624,8 +1623,7 @@ mod tests {
             JoinType::RightAnti,
         ];
 
-        let default_partition_count =
-            SessionContext::new().copied_config().target_partitions;
+        let default_partition_count = SessionConfig::new().target_partitions();
 
         for join_type in all_join_types {
             let join = left.join(
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 4ca044f0b..22fcb9216 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -169,7 +169,7 @@ impl ListingTableConfig {
 
         let listing_options = ListingOptions::new(format)
             .with_file_extension(file_extension)
-            .with_target_partitions(state.config.target_partitions);
+            .with_target_partitions(state.config.target_partitions());
 
         Ok(Self {
             table_paths: self.table_paths,
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index 537288646..ad6b820ba 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -125,9 +125,9 @@ impl TableProviderFactory for ListingTableFactory {
         };
 
         let options = ListingOptions::new(file_format)
-            .with_collect_stat(state.config.collect_statistics)
+            .with_collect_stat(state.config.collect_statistics())
             .with_file_extension(file_extension)
-            .with_target_partitions(state.config.target_partitions)
+            .with_target_partitions(state.config.target_partitions())
             .with_table_partition_cols(table_partition_cols)
             .with_file_sort_order(None);
 
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index a9eb42bab..0117fee37 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -21,7 +21,11 @@ use crate::{
         catalog::{CatalogList, MemoryCatalogList},
         information_schema::CatalogWithInformationSchema,
     },
-    config::OPT_PARQUET_ENABLE_PRUNING,
+    config::{
+        OPT_COLLECT_STATISTICS, OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
+        OPT_INFORMATION_SCHEMA, OPT_PARQUET_ENABLE_PRUNING, 
OPT_REPARTITION_AGGREGATIONS,
+        OPT_REPARTITION_JOINS, OPT_REPARTITION_WINDOWS, OPT_TARGET_PARTITIONS,
+    },
     datasource::listing::{ListingOptions, ListingTable},
     datasource::{MemTable, ViewTable},
     logical_expr::{PlanType, ToStringifiedPlan},
@@ -612,7 +616,7 @@ impl SessionContext {
         options: AvroReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let target_partitions = self.copied_config().target_partitions;
+        let target_partitions = self.copied_config().target_partitions();
 
         let listing_options = options.to_listing_options(target_partitions);
 
@@ -639,7 +643,7 @@ impl SessionContext {
         options: NdJsonReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let target_partitions = self.copied_config().target_partitions;
+        let target_partitions = self.copied_config().target_partitions();
 
         let listing_options = options.to_listing_options(target_partitions);
 
@@ -674,7 +678,7 @@ impl SessionContext {
         options: CsvReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let target_partitions = self.copied_config().target_partitions;
+        let target_partitions = self.copied_config().target_partitions();
         let listing_options = options.to_listing_options(target_partitions);
         let resolved_schema = match options.schema {
             Some(s) => Arc::new(s.to_owned()),
@@ -774,7 +778,7 @@ impl SessionContext {
         options: CsvReadOptions<'_>,
     ) -> Result<()> {
         let listing_options =
-            options.to_listing_options(self.copied_config().target_partitions);
+            
options.to_listing_options(self.copied_config().target_partitions());
 
         self.register_listing_table(
             name,
@@ -797,7 +801,7 @@ impl SessionContext {
         options: NdJsonReadOptions<'_>,
     ) -> Result<()> {
         let listing_options =
-            options.to_listing_options(self.copied_config().target_partitions);
+            
options.to_listing_options(self.copied_config().target_partitions());
 
         self.register_listing_table(
             name,
@@ -834,7 +838,7 @@ impl SessionContext {
         options: AvroReadOptions<'_>,
     ) -> Result<()> {
         let listing_options =
-            options.to_listing_options(self.copied_config().target_partitions);
+            
options.to_listing_options(self.copied_config().target_partitions());
 
         self.register_listing_table(
             name,
@@ -859,7 +863,7 @@ impl SessionContext {
         catalog: Arc<dyn CatalogProvider>,
     ) -> Option<Arc<dyn CatalogProvider>> {
         let name = name.into();
-        let information_schema = self.copied_config().information_schema;
+        let information_schema = self.copied_config().information_schema();
         let state = self.state.read();
         let catalog = if information_schema {
             Arc::new(CatalogWithInformationSchema::new(
@@ -1148,28 +1152,11 @@ impl Hasher for IdHasher {
 /// Configuration options for session context
 #[derive(Clone)]
 pub struct SessionConfig {
-    /// Number of partitions for query execution. Increasing partitions can 
increase concurrency.
-    pub target_partitions: usize,
     /// Default catalog name for table resolution
     default_catalog: String,
-    /// Default schema name for table resolution
+    /// Default schema name for table resolution (not in ConfigOptions
+    /// due to `resolve_table_ref` which passes back references)
     default_schema: String,
-    /// Whether the default catalog and schema should be created automatically
-    create_default_catalog_and_schema: bool,
-    /// Should DataFusion provide access to `information_schema`
-    /// virtual tables for displaying schema information
-    information_schema: bool,
-    /// Should DataFusion repartition data using the join keys to execute 
joins in parallel
-    /// using the provided `target_partitions` level
-    pub repartition_joins: bool,
-    /// Should DataFusion repartition data using the aggregate keys to execute 
aggregates in parallel
-    /// using the provided `target_partitions` level
-    pub repartition_aggregations: bool,
-    /// Should DataFusion repartition data using the partition keys to execute 
window functions in
-    /// parallel using the provided `target_partitions` level
-    pub repartition_windows: bool,
-    /// Should DataFusion collect statistics after listing files
-    pub collect_statistics: bool,
     /// Configuration options
     pub config_options: Arc<RwLock<ConfigOptions>>,
     /// Opaque extensions.
@@ -1179,15 +1166,8 @@ pub struct SessionConfig {
 impl Default for SessionConfig {
     fn default() -> Self {
         Self {
-            target_partitions: num_cpus::get(),
             default_catalog: DEFAULT_CATALOG.to_owned(),
             default_schema: DEFAULT_SCHEMA.to_owned(),
-            create_default_catalog_and_schema: true,
-            information_schema: false,
-            repartition_joins: true,
-            repartition_aggregations: true,
-            repartition_windows: true,
-            collect_statistics: false,
             config_options: Arc::new(RwLock::new(ConfigOptions::new())),
             // Assume no extensions by default.
             extensions: HashMap::with_capacity_and_hasher(
@@ -1228,6 +1208,12 @@ impl SessionConfig {
         self.set(key, ScalarValue::UInt64(Some(value)))
     }
 
+    /// Set a generic `usize` configuration option
+    pub fn set_usize(self, key: &str, value: usize) -> Self {
+        let value: u64 = value.try_into().expect("convert usize to u64");
+        self.set(key, ScalarValue::UInt64(Some(value)))
+    }
+
     /// Set a generic `str` configuration option
     pub fn set_str(self, key: &str, value: &str) -> Self {
         self.set(key, ScalarValue::Utf8(Some(value.to_string())))
@@ -1240,12 +1226,67 @@ impl SessionConfig {
         self.set_u64(OPT_BATCH_SIZE, n.try_into().unwrap())
     }
 
-    /// Customize target_partitions
-    pub fn with_target_partitions(mut self, n: usize) -> Self {
+    /// Customize [`OPT_TARGET_PARTITIONS`]
+    pub fn with_target_partitions(self, n: usize) -> Self {
         // partition count must be greater than zero
         assert!(n > 0);
-        self.target_partitions = n;
-        self
+        self.set_usize(OPT_TARGET_PARTITIONS, n)
+    }
+
+    /// get target_partitions
+    pub fn target_partitions(&self) -> usize {
+        self.config_options
+            .read()
+            .get_usize(OPT_TARGET_PARTITIONS)
+            .expect("target partitions must be set")
+    }
+
+    /// Is the information schema enabled?
+    pub fn information_schema(&self) -> bool {
+        self.config_options
+            .read()
+            .get_bool(OPT_INFORMATION_SCHEMA)
+            .unwrap_or_default()
+    }
+
+    /// Should the context create the default catalog and schema?
+    pub fn create_default_catalog_and_schema(&self) -> bool {
+        self.config_options
+            .read()
+            .get_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA)
+            .unwrap_or_default()
+    }
+
+    /// Are joins repartitioned during execution?
+    pub fn repartition_joins(&self) -> bool {
+        self.config_options
+            .read()
+            .get_bool(OPT_REPARTITION_JOINS)
+            .unwrap_or_default()
+    }
+
+    /// Are aggregates repartitioned during execution?
+    pub fn repartition_aggregations(&self) -> bool {
+        self.config_options
+            .read()
+            .get_bool(OPT_REPARTITION_AGGREGATIONS)
+            .unwrap_or_default()
+    }
+
+    /// Are window functions repartitioned during execution?
+    pub fn repartition_window_functions(&self) -> bool {
+        self.config_options
+            .read()
+            .get_bool(OPT_REPARTITION_WINDOWS)
+            .unwrap_or_default()
+    }
+
+    /// Are statistics collected during execution?
+    pub fn collect_statistics(&self) -> bool {
+        self.config_options
+            .read()
+            .get_bool(OPT_COLLECT_STATISTICS)
+            .unwrap_or_default()
     }
 
     /// Selects a name for the default catalog and schema
@@ -1260,32 +1301,42 @@ impl SessionConfig {
     }
 
     /// Controls whether the default catalog and schema will be automatically 
created
-    pub fn create_default_catalog_and_schema(mut self, create: bool) -> Self {
-        self.create_default_catalog_and_schema = create;
+    pub fn with_create_default_catalog_and_schema(self, create: bool) -> Self {
+        self.config_options
+            .write()
+            .set_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA, create);
         self
     }
 
     /// Enables or disables the inclusion of `information_schema` virtual 
tables
-    pub fn with_information_schema(mut self, enabled: bool) -> Self {
-        self.information_schema = enabled;
+    pub fn with_information_schema(self, enabled: bool) -> Self {
+        self.config_options
+            .write()
+            .set_bool(OPT_INFORMATION_SCHEMA, enabled);
         self
     }
 
     /// Enables or disables the use of repartitioning for joins to improve 
parallelism
-    pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
-        self.repartition_joins = enabled;
+    pub fn with_repartition_joins(self, enabled: bool) -> Self {
+        self.config_options
+            .write()
+            .set_bool(OPT_REPARTITION_JOINS, enabled);
         self
     }
 
     /// Enables or disables the use of repartitioning for aggregations to 
improve parallelism
-    pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
-        self.repartition_aggregations = enabled;
+    pub fn with_repartition_aggregations(self, enabled: bool) -> Self {
+        self.config_options
+            .write()
+            .set_bool(OPT_REPARTITION_AGGREGATIONS, enabled);
         self
     }
 
     /// Enables or disables the use of repartitioning for window functions to 
improve parallelism
-    pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
-        self.repartition_windows = enabled;
+    pub fn with_repartition_windows(self, enabled: bool) -> Self {
+        self.config_options
+            .write()
+            .set_bool(OPT_REPARTITION_WINDOWS, enabled);
         self
     }
 
@@ -1306,8 +1357,10 @@ impl SessionConfig {
     }
 
     /// Enables or disables the collection of statistics after listing files
-    pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
-        self.collect_statistics = enabled;
+    pub fn with_collect_statistics(self, enabled: bool) -> Self {
+        self.config_options
+            .write()
+            .set_bool(OPT_COLLECT_STATISTICS, enabled);
         self
     }
 
@@ -1336,19 +1389,19 @@ impl SessionConfig {
         }
         map.insert(
             TARGET_PARTITIONS.to_owned(),
-            format!("{}", self.target_partitions),
+            format!("{}", self.target_partitions()),
         );
         map.insert(
             REPARTITION_JOINS.to_owned(),
-            format!("{}", self.repartition_joins),
+            format!("{}", self.repartition_joins()),
         );
         map.insert(
             REPARTITION_AGGREGATIONS.to_owned(),
-            format!("{}", self.repartition_aggregations),
+            format!("{}", self.repartition_aggregations()),
         );
         map.insert(
             REPARTITION_WINDOWS.to_owned(),
-            format!("{}", self.repartition_windows),
+            format!("{}", self.repartition_window_functions()),
         );
         map.insert(
             PARQUET_PRUNING.to_owned(),
@@ -1356,7 +1409,7 @@ impl SessionConfig {
         );
         map.insert(
             COLLECT_STATISTICS.to_owned(),
-            format!("{}", self.collect_statistics),
+            format!("{}", self.collect_statistics()),
         );
 
         map
@@ -1483,7 +1536,7 @@ impl SessionState {
         let session_id = Uuid::new_v4().to_string();
 
         let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn 
CatalogList>;
-        if config.create_default_catalog_and_schema {
+        if config.create_default_catalog_and_schema() {
             let default_catalog = MemoryCatalogProvider::new();
 
             default_catalog
@@ -1495,7 +1548,8 @@ impl SessionState {
 
             Self::register_default_schema(&config, &runtime, &default_catalog);
 
-            let default_catalog: Arc<dyn CatalogProvider> = if 
config.information_schema {
+            let default_catalog: Arc<dyn CatalogProvider> = if 
config.information_schema()
+            {
                 Arc::new(CatalogWithInformationSchema::new(
                     Arc::downgrade(&catalog_list),
                     Arc::downgrade(&config.config_options),
@@ -2280,7 +2334,7 @@ mod tests {
     #[tokio::test]
     async fn disabled_default_catalog_and_schema() -> Result<()> {
         let ctx = SessionContext::with_config(
-            SessionConfig::new().create_default_catalog_and_schema(false),
+            SessionConfig::new().with_create_default_catalog_and_schema(false),
         );
 
         assert!(matches!(
@@ -2299,7 +2353,7 @@ mod tests {
     #[tokio::test]
     async fn custom_catalog_and_schema() {
         let config = SessionConfig::new()
-            .create_default_catalog_and_schema(true)
+            .with_create_default_catalog_and_schema(true)
             .with_default_catalog_and_schema("my_catalog", "my_schema");
         catalog_and_schema_test(config).await;
     }
@@ -2307,7 +2361,7 @@ mod tests {
     #[tokio::test]
     async fn custom_catalog_and_schema_no_default() {
         let config = SessionConfig::new()
-            .create_default_catalog_and_schema(false)
+            .with_create_default_catalog_and_schema(false)
             .with_default_catalog_and_schema("my_catalog", "my_schema");
         catalog_and_schema_test(config).await;
     }
@@ -2315,7 +2369,7 @@ mod tests {
     #[tokio::test]
     async fn custom_catalog_and_schema_and_information_schema() {
         let config = SessionConfig::new()
-            .create_default_catalog_and_schema(true)
+            .with_create_default_catalog_and_schema(true)
             .with_information_schema(true)
             .with_default_catalog_and_schema("my_catalog", "my_schema");
         catalog_and_schema_test(config).await;
diff --git a/datafusion/core/src/execution/options.rs 
b/datafusion/core/src/execution/options.rs
index 430d10862..21a82227c 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -223,7 +223,7 @@ impl<'a> ParquetReadOptions<'a> {
 
         ListingOptions::new(Arc::new(file_format))
             .with_file_extension(self.file_extension)
-            .with_target_partitions(config.target_partitions)
+            .with_target_partitions(config.target_partitions())
             .with_table_partition_cols(self.table_partition_cols.clone())
     }
 }
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs 
b/datafusion/core/src/physical_optimizer/enforcement.rs
index cc9070ccb..3110061c4 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -72,7 +72,7 @@ impl PhysicalOptimizerRule for BasicEnforcement {
         plan: Arc<dyn ExecutionPlan>,
         config: &SessionConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let target_partitions = config.target_partitions;
+        let target_partitions = config.target_partitions();
         let top_down_join_key_reordering = config
             .config_options()
             .read()
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 1b4f53946..42c5e0c3f 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -223,10 +223,10 @@ impl PhysicalOptimizerRule for Repartition {
         config: &SessionConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Don't run optimizer if target_partitions == 1
-        if config.target_partitions == 1 {
+        if config.target_partitions() == 1 {
             Ok(plan)
         } else {
-            optimize_partitions(config.target_partitions, plan, false, false)
+            optimize_partitions(config.target_partitions(), plan, false, false)
         }
     }
 
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 62d44fac7..596c1888f 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -522,8 +522,8 @@ impl DefaultPhysicalPlanner {
                     let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
 
                     let can_repartition = !partition_keys.is_empty()
-                        && session_state.config.target_partitions > 1
-                        && session_state.config.repartition_windows;
+                        && session_state.config.target_partitions() > 1
+                        && session_state.config.repartition_window_functions();
 
                     let physical_partition_keys = if can_repartition
                     {
@@ -661,8 +661,8 @@ impl DefaultPhysicalPlanner {
                     let final_group: Vec<Arc<dyn PhysicalExpr>> = 
initial_aggr.output_group_expr();
 
                     let can_repartition = !groups.is_empty()
-                        && session_state.config.target_partitions > 1
-                        && session_state.config.repartition_aggregations;
+                        && session_state.config.target_partitions() > 1
+                        && session_state.config.repartition_aggregations();
 
                     let (initial_aggr, next_partition_mode): (
                         Arc<dyn ExecutionPlan>,
@@ -836,7 +836,7 @@ impl DefaultPhysicalPlanner {
                         })
                         .collect::<Result<Vec<_>>>()?;
                     // If we have a `LIMIT` can run sort/limts in parallel 
(similar to TopK)
-                    Ok(if fetch.is_some() && 
session_state.config.target_partitions > 1 {
+                    Ok(if fetch.is_some() && 
session_state.config.target_partitions() > 1 {
                         let sort = SortExec::new_with_partitioning(
                             sort_expr,
                             physical_input,
@@ -937,8 +937,8 @@ impl DefaultPhysicalPlanner {
                         .read()
                         .get_bool(OPT_PREFER_HASH_JOIN)
                         .unwrap_or_default();
-                    if session_state.config.target_partitions > 1
-                        && session_state.config.repartition_joins
+                    if session_state.config.target_partitions() > 1
+                        && session_state.config.repartition_joins()
                         && !prefer_hash_join
                     {
                         // Use SortMergeJoin if hash join is not preferred
@@ -957,11 +957,11 @@ impl DefaultPhysicalPlanner {
                                 *null_equals_null,
                             )?))
                         }
-                    } else if session_state.config.target_partitions > 1
-                        && session_state.config.repartition_joins
+                    } else if session_state.config.target_partitions() > 1
+                        && session_state.config.repartition_joins()
                         && prefer_hash_join {
                          let partition_mode = {
-                            if session_state.config.collect_statistics {
+                            if session_state.config.collect_statistics() {
                                 PartitionMode::Auto
                             } else {
                                 PartitionMode::Partitioned
@@ -1771,7 +1771,7 @@ mod tests {
 
     async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn 
ExecutionPlan>> {
         let mut session_state = make_session_state();
-        session_state.config.target_partitions = 4;
+        session_state.config = session_state.config.with_target_partitions(4);
         // optimize the logical plan
         let logical_plan = session_state.optimize(logical_plan)?;
         let planner = DefaultPhysicalPlanner::default();
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt 
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 36202fb43..0599649da 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -15,12 +15,21 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# target_partitions defaults to num_cores, so set
+# to a known value that is unlikely to be
+# the real number of cores on a system
+statement ok
+SET datafusion.execution.target_partitions=7
+
 # show all variables
 query R
 SHOW ALL
 ----
+datafusion.catalog.create_default_catalog_and_schema true
+datafusion.catalog.information_schema true
 datafusion.catalog.location NULL
 datafusion.catalog.type NULL
+datafusion.execuction_collect_statistics false
 datafusion.execution.batch_size 8192
 datafusion.execution.coalesce_batches true
 datafusion.execution.coalesce_target_batch_size 4096
@@ -30,6 +39,7 @@ datafusion.execution.parquet.pruning true
 datafusion.execution.parquet.pushdown_filters false
 datafusion.execution.parquet.reorder_filters false
 datafusion.execution.parquet.skip_metadata true
+datafusion.execution.target_partitions 7
 datafusion.execution.time_zone +00:00
 datafusion.explain.logical_plan_only false
 datafusion.explain.physical_plan_only false
@@ -37,6 +47,9 @@ datafusion.optimizer.filter_null_join_keys false
 datafusion.optimizer.hash_join_single_partition_threshold 1048576
 datafusion.optimizer.max_passes 3
 datafusion.optimizer.prefer_hash_join true
+datafusion.optimizer.repartition_aggregations true
+datafusion.optimizer.repartition_joins true
+datafusion.optimizer.repartition_windows true
 datafusion.optimizer.skip_failed_rules true
 datafusion.optimizer.top_down_join_key_reordering true
 
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 8bb316433..e900d9c0b 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -37,8 +37,11 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 
 | key                                                       | type    | 
default | description                                                           
                                                                                
                                                                                
                                                                                
                                        |
 | --------------------------------------------------------- | ------- | 
------- | 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
+| datafusion.catalog.create_default_catalog_and_schema      | Boolean | true   
 | Whether the default catalog and schema should be created automatically.      
                                                                                
                                                                                
                                                                                
                                 |
+| datafusion.catalog.information_schema                     | Boolean | false  
 | Should DataFusion provide access to `information_schema` virtual tables for 
displaying schema information                                                   
                                                                                
                                                                                
                                  |
 | datafusion.catalog.location                               | Utf8    | NULL   
 | Location scanned to load tables for `default` schema, defaults to None       
                                                                                
                                                                                
                                                                                
                                 |
 | datafusion.catalog.type                                   | Utf8    | NULL   
 | Type of `TableProvider` to use when loading `default` schema. Defaults to 
None                                                                            
                                                                                
                                                                                
                                    |
+| datafusion.execuction_collect_statistics                  | Boolean | false  
 | Should DataFusion repartition data using the partitions keys to execute 
window functions in parallel using the provided `target_partitions` level       
                                                                                
                                                                                
                                      |
 | datafusion.execution.batch_size                           | UInt64  | 8192   
 | Default batch size while creating new batches, it's especially useful for 
buffer-in-memory batches since creating tiny batches would results in too much 
metadata memory consumption.                                                    
                                                                                
                                     |
 | datafusion.execution.coalesce_batches                     | Boolean | true   
 | When set to true, record batches will be examined between each operator and 
small batches will be coalesced into larger batches. This is helpful when there 
are highly selective filters or joins that could produce tiny output batches. 
The target batch size is determined by the configuration setting 
'datafusion.execution.coalesce_target_batch_size'. |
 | datafusion.execution.coalesce_target_batch_size           | UInt64  | 4096   
 | Target batch size when coalescing batches. Uses in conjunction with the 
configuration setting 'datafusion.execution.coalesce_batches'.                  
                                                                                
                                                                                
                                      |
@@ -48,6 +51,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.parquet.pushdown_filters             | Boolean | false  
 | If true, filter expressions are be applied during the parquet decoding 
operation to reduce the number of rows decoded.                                 
                                                                                
                                                                                
                                       |
 | datafusion.execution.parquet.reorder_filters              | Boolean | false  
 | If true, filter expressions evaluated during the parquet decoding opearation 
will be reordered heuristically to minimize the cost of evaluation. If false, 
the filters are applied in the same order as written in the query.              
                                                                                
                                   |
 | datafusion.execution.parquet.skip_metadata                | Boolean | true   
 | If true, the parquet reader skip the optional embedded metadata that may be 
in the file Schema. This setting can help avoid schema conflicts when querying 
multiple parquet files with schemas containing compatible types but different 
metadata.                                                                       
                                     |
+| datafusion.execution.target_partitions                    | UInt64  | 0      
 | Number of partitions for query execution. Increasing partitions can increase 
concurrency. Defaults to the number of cpu cores on the system.                 
                                                                                
                                                                                
                                 |
 | datafusion.execution.time_zone                            | Utf8    | +00:00 
 | The session time zone which some function require e.g. EXTRACT(HOUR from 
SOME_TIME) shift the underline datetime according to the time zone,             
                                                                                
                                                                                
                                     |
 | then extract the hour.                                    |
 | datafusion.explain.logical_plan_only                      | Boolean | false  
 | When set to true, the explain statement will only print logical plans.       
                                                                                
                                                                                
                                                                                
                                 |
@@ -56,5 +60,8 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.optimizer.hash_join_single_partition_threshold | UInt64  | 
1048576 | The maximum estimated size in bytes for one input side of a HashJoin 
will be collected into a single partition                                       
                                                                                
                                                                                
                                         |
 | datafusion.optimizer.max_passes                           | UInt64  | 3      
 | Number of times that the optimizer will attempt to optimize the plan         
                                                                                
                                                                                
                                                                                
                                 |
 | datafusion.optimizer.prefer_hash_join                     | Boolean | true   
 | When set to true, the physical plan optimizer will prefer HashJoin over 
SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but 
consumes more memory. Defaults to true                                          
                                                                                
                                              |
+| datafusion.optimizer.repartition_aggregations             | Boolean | true   
 | Should DataFusion repartition data using the aggregate keys to execute 
aggregates in parallel using the provided `target_partitions` level             
                                                                                
                                                                                
                                       |
+| datafusion.optimizer.repartition_joins                    | Boolean | true   
 | Should DataFusion repartition data using the join keys to execute joins in 
parallel using the provided `target_partitions` level                           
                                                                                
                                                                                
                                   |
+| datafusion.optimizer.repartition_windows                  | Boolean | true   
 | Should DataFusion collect statistics after listing files                     
                                                                                
                                                                                
                                                                                
                                 |
 | datafusion.optimizer.skip_failed_rules                    | Boolean | true   
 | When set to true, the logical plan optimizer will produce warning messages 
if any optimization rules produce errors and then proceed to the next rule. 
When set to false, any rules that produce errors will cause the query to fail.  
                                                                                
                                       |
 | datafusion.optimizer.top_down_join_key_reordering         | Boolean | true   
 | When set to true, the physical plan optimizer will run a top down process to 
reorder the join keys. Defaults to true                                         
                                                                                
                                                                                
                                 |


Reply via email to