This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c60412b7 Convert batch_size to config option (#2771)
7c60412b7 is described below

commit 7c60412b7beb599b8f6b9ed49528a450aa48e56b
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jun 24 14:38:58 2022 -0600

    Convert batch_size to config option (#2771)
    
    * convert batch_size to config option
    
    * add comments
    
    * update test
    
    * fix avro
    
    * Update datafusion/core/src/execution/context.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * address PR feedback
    
    * Update datafusion/core/src/config.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/config.rs                      | 85 ++++++++++++++++++----
 datafusion/core/src/execution/context.rs           | 44 +++++++----
 .../src/physical_optimizer/coalesce_batches.rs     |  2 +-
 .../core/src/physical_plan/file_format/avro.rs     |  2 +-
 .../core/src/physical_plan/file_format/csv.rs      |  2 +-
 .../core/src/physical_plan/file_format/json.rs     |  2 +-
 .../core/src/physical_plan/file_format/parquet.rs  |  2 +-
 .../core/src/physical_plan/sort_merge_join.rs      |  2 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    |  6 +-
 .../physical_plan/sorts/sort_preserving_merge.rs   |  4 +-
 docs/source/user-guide/configs.md                  |  1 +
 11 files changed, 111 insertions(+), 41 deletions(-)

diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 3834ad70d..ee428cef1 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -21,8 +21,11 @@ use arrow::datatypes::DataType;
 use datafusion_common::ScalarValue;
 use std::collections::HashMap;
 
-/// Configuration option "datafusion.optimizer.filterNullJoinKeys"
-pub const OPT_FILTER_NULL_JOIN_KEYS: &str = 
"datafusion.optimizer.filterNullJoinKeys";
+/// Configuration option "datafusion.optimizer.filter_null_join_keys"
+pub const OPT_FILTER_NULL_JOIN_KEYS: &str = 
"datafusion.optimizer.filter_null_join_keys";
+
+/// Configuration option "datafusion.execution.batch_size"
+pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
 
 /// Definition of a configuration option
 pub struct ConfigDefinition {
@@ -53,13 +56,31 @@ impl ConfigDefinition {
     }
 
     /// Create a configuration option definition with a boolean value
-    pub fn new_bool(name: &str, description: &str, default_value: bool) -> 
Self {
-        Self {
-            key: name.to_string(),
-            description: description.to_string(),
-            data_type: DataType::Boolean,
-            default_value: ScalarValue::Boolean(Some(default_value)),
-        }
+    pub fn new_bool(
+        key: impl Into<String>,
+        description: impl Into<String>,
+        default_value: bool,
+    ) -> Self {
+        Self::new(
+            key,
+            description,
+            DataType::Boolean,
+            ScalarValue::Boolean(Some(default_value)),
+        )
+    }
+
+    /// Create a configuration option definition with a u64 value
+    pub fn new_u64(
+        key: impl Into<String>,
+        description: impl Into<String>,
+        default_value: u64,
+    ) -> Self {
+        Self::new(
+            key,
+            description,
+            DataType::UInt64,
+            ScalarValue::UInt64(Some(default_value)),
+        )
     }
 }
 
@@ -87,6 +108,13 @@ impl BuiltInConfigs {
                 filter can add additional overhead when the file format does 
not fully support \
                 predicate push down.",
                 false,
+            ),
+            ConfigDefinition::new_u64(
+            OPT_BATCH_SIZE,
+            "Default batch size while creating new batches, it's especially 
useful for \
+            buffer-in-memory batches since creating tiny batches would results 
in too much metadata \
+            memory consumption.",
+            8192,
             )],
         }
     }
@@ -139,6 +167,11 @@ impl ConfigOptions {
         self.set(key, ScalarValue::Boolean(Some(value)))
     }
 
+    /// set a `u64` configuration option
+    pub fn set_u64(&mut self, key: &str, value: u64) {
+        self.set(key, ScalarValue::UInt64(Some(value)))
+    }
+
     /// get a configuration option
     pub fn get(&self, key: &str) -> Option<ScalarValue> {
         self.options.get(key).cloned()
@@ -151,6 +184,19 @@ impl ConfigOptions {
             _ => false,
         }
     }
+
+    /// get a u64 configuration option
+    pub fn get_u64(&self, key: &str) -> u64 {
+        match self.get(key) {
+            Some(ScalarValue::UInt64(Some(n))) => n,
+            _ => 0,
+        }
+    }
+
+    /// Access the underlying hashmap
+    pub fn options(&self) -> &HashMap<String, ScalarValue> {
+        &self.options
+    }
 }
 
 #[cfg(test)]
@@ -160,18 +206,25 @@ mod test {
     #[test]
     fn docs() {
         let docs = BuiltInConfigs::generate_config_markdown();
-        assert_eq!("| key | type | default | description |\
-        \n|-----|------|---------|-------------|\
-        \n| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When 
set to true, the optimizer \
-        will insert filters before a join between a nullable and non-nullable 
column to filter out \
-        nulls on the nullable side. This filter can add additional overhead 
when the file format does \
-        not fully support predicate push down. |\n", docs);
+        // uncomment this println to see the docs so they can be 
copy-and-pasted to
+        // docs/source/user-guide/configs.md until this task is automated
+        // in https://github.com/apache/arrow-datafusion/issues/2770
+        //println!("{}", docs);
+        let mut lines = docs.lines();
+        assert_eq!(
+            lines.next().unwrap(),
+            "| key | type | default | description |"
+        );
+        let configs = BuiltInConfigs::default();
+        for config in configs.config_definitions {
+            assert!(docs.contains(&config.key));
+        }
     }
 
     #[test]
     fn get_then_set() {
         let mut config = ConfigOptions::new();
-        let config_key = "datafusion.optimizer.filterNullJoinKeys";
+        let config_key = "datafusion.optimizer.filter_null_join_keys";
         assert!(!config.get_bool(config_key));
         config.set_bool(config_key, true);
         assert!(config.get_bool(config_key));
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 6b58d4570..16cd1adc2 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -80,7 +80,7 @@ use 
crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use crate::physical_optimizer::repartition::Repartition;
 
-use crate::config::{ConfigOptions, OPT_FILTER_NULL_JOIN_KEYS};
+use crate::config::{ConfigOptions, OPT_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS};
 use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use crate::logical_plan::plan::Explain;
 use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, 
plan_to_parquet};
@@ -974,8 +974,6 @@ impl QueryPlanner for DefaultQueryPlanner {
     }
 }
 
-/// Session Configuration entry name for 'BATCH_SIZE'
-pub const BATCH_SIZE: &str = "batch_size";
 /// Session Configuration entry name for 'TARGET_PARTITIONS'
 pub const TARGET_PARTITIONS: &str = "target_partitions";
 /// Session Configuration entry name for 'REPARTITION_JOINS'
@@ -990,10 +988,6 @@ pub const PARQUET_PRUNING: &str = "parquet_pruning";
 /// Configuration options for session context
 #[derive(Clone)]
 pub struct SessionConfig {
-    /// Default batch size while creating new batches, it's especially useful
-    /// for buffer-in-memory batches since creating tiny batches would results
-    /// in too much metadata memory consumption.
-    pub batch_size: usize,
     /// Number of partitions for query execution. Increasing partitions can 
increase concurrency.
     pub target_partitions: usize,
     /// Default catalog name for table resolution
@@ -1023,7 +1017,6 @@ pub struct SessionConfig {
 impl Default for SessionConfig {
     fn default() -> Self {
         Self {
-            batch_size: 8192,
             target_partitions: num_cpus::get(),
             default_catalog: DEFAULT_CATALOG.to_owned(),
             default_schema: DEFAULT_SCHEMA.to_owned(),
@@ -1055,12 +1048,16 @@ impl SessionConfig {
         self.set(key, ScalarValue::Boolean(Some(value)))
     }
 
+    /// Set a generic `u64` configuration option
+    pub fn set_u64(self, key: &str, value: u64) -> Self {
+        self.set(key, ScalarValue::UInt64(Some(value)))
+    }
+
     /// Customize batch size
-    pub fn with_batch_size(mut self, n: usize) -> Self {
+    pub fn with_batch_size(self, n: usize) -> Self {
         // batch size must be greater than zero
         assert!(n > 0);
-        self.batch_size = n;
-        self
+        self.set_u64(OPT_BATCH_SIZE, n.try_into().unwrap())
     }
 
     /// Customize target_partitions
@@ -1118,10 +1115,27 @@ impl SessionConfig {
         self
     }
 
-    /// Convert configuration to name-value pairs
+    /// Get the currently configured batch size
+    pub fn batch_size(&self) -> usize {
+        self.config_options
+            .get_u64(OPT_BATCH_SIZE)
+            .try_into()
+            .unwrap()
+    }
+
+    /// Get the current configuration options
+    pub fn config_options(&self) -> &ConfigOptions {
+        &self.config_options
+    }
+
+    /// Convert configuration options to name-value pairs with values 
converted to strings. Note
+    /// that this method will eventually be deprecated and replaced by 
[config_options].
     pub fn to_props(&self) -> HashMap<String, String> {
         let mut map = HashMap::new();
-        map.insert(BATCH_SIZE.to_owned(), format!("{}", self.batch_size));
+        // copy configs from config_options
+        for (k, v) in self.config_options.options() {
+            map.insert(k.to_string(), format!("{}", v));
+        }
         map.insert(
             TARGET_PARTITIONS.to_owned(),
             format!("{}", self.target_partitions),
@@ -1496,7 +1510,9 @@ impl TaskContext {
                     session_config
                 } else {
                     session_config
-                        
.with_batch_size(props.get(BATCH_SIZE).unwrap().parse().unwrap())
+                        .with_batch_size(
+                            
props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap(),
+                        )
                         .with_target_partitions(
                             
props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
                         )
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs 
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 2b8582126..a87fce934 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -76,7 +76,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
                 // we should do that once 
https://issues.apache.org/jira/browse/ARROW-11059 is
                 // implemented. For now, we choose half the configured batch 
size to avoid copies
                 // when a small number of rows are removed from a batch
-                let target_batch_size = config.batch_size / 2;
+                let target_batch_size = config.batch_size() / 2;
                 Arc::new(CoalesceBatchesExec::new(plan.clone(), 
target_batch_size))
             } else {
                 plan.clone()
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs 
b/datafusion/core/src/physical_plan/file_format/avro.rs
index ae1efb297..d03951990 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -111,7 +111,7 @@ impl ExecutionPlan for AvroExec {
     ) -> Result<SendableRecordBatchStream> {
         let proj = self.base_config.projected_file_column_names();
 
-        let batch_size = context.session_config().batch_size;
+        let batch_size = context.session_config().batch_size();
         let file_schema = Arc::clone(&self.base_config.file_schema);
 
         // The avro reader cannot limit the number of records, so `remaining` 
is ignored.
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 7dddb70e9..0f8273ec1 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -115,7 +115,7 @@ impl ExecutionPlan for CsvExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let batch_size = context.session_config().batch_size;
+        let batch_size = context.session_config().batch_size();
         let file_schema = Arc::clone(&self.base_config.file_schema);
         let file_projection = 
self.base_config.file_column_projection_indices();
         let has_header = self.has_header;
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 397fee5fe..a829218f4 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -96,7 +96,7 @@ impl ExecutionPlan for NdJsonExec {
     ) -> Result<SendableRecordBatchStream> {
         let proj = self.base_config.projected_file_column_names();
 
-        let batch_size = context.session_config().batch_size;
+        let batch_size = context.session_config().batch_size();
         let file_schema = Arc::clone(&self.base_config.file_schema);
 
         // The json reader cannot limit the number of records, so `remaining` 
is ignored.
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 54694f119..e719499c3 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -225,7 +225,7 @@ impl ExecutionPlan for ParquetExec {
             metrics: self.metrics.clone(),
             object_store,
             pruning_predicate: self.pruning_predicate.clone(),
-            batch_size: context.session_config().batch_size,
+            batch_size: context.session_config().batch_size(),
             schema: self.projected_schema.clone(),
             projection,
             remaining_rows: self.base_config.limit,
diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/sort_merge_join.rs
index ffbd27df9..39ff4c2ec 100644
--- a/datafusion/core/src/physical_plan/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/sort_merge_join.rs
@@ -191,7 +191,7 @@ impl ExecutionPlan for SortMergeJoinExec {
         let buffered = buffered.execute(partition, context.clone())?;
 
         // create output buffer
-        let batch_size = context.session_config().batch_size;
+        let batch_size = context.session_config().batch_size();
 
         // create join stream
         Ok(Box::pin(SMJStream::try_new(
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index c75f73f56..697ccb18d 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -134,7 +134,7 @@ impl ExternalSorter {
     /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
     async fn sort(&self) -> Result<SendableRecordBatchStream> {
         let partition = self.partition_id();
-        let batch_size = self.session_config.batch_size;
+        let batch_size = self.session_config.batch_size();
         let mut in_mem_batches = self.in_mem_batches.lock().await;
 
         if self.spilled_before().await {
@@ -168,7 +168,7 @@ impl ExternalSorter {
                 self.schema.clone(),
                 &self.expr,
                 tracking_metrics,
-                self.session_config.batch_size,
+                self.session_config.batch_size(),
             )))
         } else if in_mem_batches.len() > 0 {
             let tracking_metrics = self
@@ -268,7 +268,7 @@ impl MemoryConsumer for ExternalSorter {
             &mut *in_mem_batches,
             self.schema.clone(),
             &*self.expr,
-            self.session_config.batch_size,
+            self.session_config.batch_size(),
             tracking_metrics,
         );
 
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index f7ce73834..dc788be33 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -224,7 +224,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
                     schema,
                     &self.expr,
                     tracking_metrics,
-                    context.session_config().batch_size,
+                    context.session_config().batch_size(),
                 ));
 
                 debug!("Got stream result from 
SortPreservingMergeStream::new_from_receivers");
@@ -1190,7 +1190,7 @@ mod tests {
             batches.schema(),
             sort.as_slice(),
             tracking_metrics,
-            task_ctx.session_config().batch_size,
+            task_ctx.session_config().batch_size(),
         );
 
         let mut merged = 
common::collect(Box::pin(merge_stream)).await.unwrap();
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index c82ab2eb7..fecd54106 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -30,3 +30,4 @@ The following configuration options can be passed to 
`SessionConfig` to control
 | key                                     | type    | default | description    
                                                                                
                                                                                
                                                                                
 |
 | --------------------------------------- | ------- | ------- | 
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
 | datafusion.optimizer.filterNullJoinKeys | Boolean | false   | When set to 
true, the optimizer will insert filters before a join between a nullable and 
non-nullable column to filter out nulls on the nullable side. This filter can 
add additional overhead when the file format does not fully support predicate 
push down. |
+| datafusion.execution.batch_size         | UInt32  | 8192    | Default batch 
size while creating new batches, it's especially useful for buffer-in-memory 
batches since creating tiny batches would results in too much metadata memory 
consumption.                                                                    
       |

Reply via email to