This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7c60412b7 Convert batch_size to config option (#2771)
7c60412b7 is described below
commit 7c60412b7beb599b8f6b9ed49528a450aa48e56b
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jun 24 14:38:58 2022 -0600
Convert batch_size to config option (#2771)
* convert batch_size to config option
* add comments
* update test
* fix avro
* Update datafusion/core/src/execution/context.rs
Co-authored-by: Andrew Lamb <[email protected]>
* address PR feedback
* Update datafusion/core/src/config.rs
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/config.rs | 85 ++++++++++++++++++----
datafusion/core/src/execution/context.rs | 44 +++++++----
.../src/physical_optimizer/coalesce_batches.rs | 2 +-
.../core/src/physical_plan/file_format/avro.rs | 2 +-
.../core/src/physical_plan/file_format/csv.rs | 2 +-
.../core/src/physical_plan/file_format/json.rs | 2 +-
.../core/src/physical_plan/file_format/parquet.rs | 2 +-
.../core/src/physical_plan/sort_merge_join.rs | 2 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 6 +-
.../physical_plan/sorts/sort_preserving_merge.rs | 4 +-
docs/source/user-guide/configs.md | 1 +
11 files changed, 111 insertions(+), 41 deletions(-)
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 3834ad70d..ee428cef1 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -21,8 +21,11 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use std::collections::HashMap;
-/// Configuration option "datafusion.optimizer.filterNullJoinKeys"
-pub const OPT_FILTER_NULL_JOIN_KEYS: &str =
"datafusion.optimizer.filterNullJoinKeys";
+/// Configuration option "datafusion.optimizer.filter_null_join_keys"
+pub const OPT_FILTER_NULL_JOIN_KEYS: &str =
"datafusion.optimizer.filter_null_join_keys";
+
+/// Configuration option "datafusion.execution.batch_size"
+pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
/// Definition of a configuration option
pub struct ConfigDefinition {
@@ -53,13 +56,31 @@ impl ConfigDefinition {
}
/// Create a configuration option definition with a boolean value
- pub fn new_bool(name: &str, description: &str, default_value: bool) ->
Self {
- Self {
- key: name.to_string(),
- description: description.to_string(),
- data_type: DataType::Boolean,
- default_value: ScalarValue::Boolean(Some(default_value)),
- }
+ pub fn new_bool(
+ key: impl Into<String>,
+ description: impl Into<String>,
+ default_value: bool,
+ ) -> Self {
+ Self::new(
+ key,
+ description,
+ DataType::Boolean,
+ ScalarValue::Boolean(Some(default_value)),
+ )
+ }
+
+ /// Create a configuration option definition with a u64 value
+ pub fn new_u64(
+ key: impl Into<String>,
+ description: impl Into<String>,
+ default_value: u64,
+ ) -> Self {
+ Self::new(
+ key,
+ description,
+ DataType::UInt64,
+ ScalarValue::UInt64(Some(default_value)),
+ )
}
}
@@ -87,6 +108,13 @@ impl BuiltInConfigs {
filter can add additional overhead when the file format does
not fully support \
predicate push down.",
false,
+ ),
+ ConfigDefinition::new_u64(
+ OPT_BATCH_SIZE,
+ "Default batch size while creating new batches, it's especially
useful for \
+ buffer-in-memory batches since creating tiny batches would results
in too much metadata \
+ memory consumption.",
+ 8192,
)],
}
}
@@ -139,6 +167,11 @@ impl ConfigOptions {
self.set(key, ScalarValue::Boolean(Some(value)))
}
+ /// set a `u64` configuration option
+ pub fn set_u64(&mut self, key: &str, value: u64) {
+ self.set(key, ScalarValue::UInt64(Some(value)))
+ }
+
/// get a configuration option
pub fn get(&self, key: &str) -> Option<ScalarValue> {
self.options.get(key).cloned()
@@ -151,6 +184,19 @@ impl ConfigOptions {
_ => false,
}
}
+
+ /// get a u64 configuration option
+ pub fn get_u64(&self, key: &str) -> u64 {
+ match self.get(key) {
+ Some(ScalarValue::UInt64(Some(n))) => n,
+ _ => 0,
+ }
+ }
+
+ /// Access the underlying hashmap
+ pub fn options(&self) -> &HashMap<String, ScalarValue> {
+ &self.options
+ }
}
#[cfg(test)]
@@ -160,18 +206,25 @@ mod test {
#[test]
fn docs() {
let docs = BuiltInConfigs::generate_config_markdown();
- assert_eq!("| key | type | default | description |\
- \n|-----|------|---------|-------------|\
- \n| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When
set to true, the optimizer \
- will insert filters before a join between a nullable and non-nullable
column to filter out \
- nulls on the nullable side. This filter can add additional overhead
when the file format does \
- not fully support predicate push down. |\n", docs);
+ // uncomment this println to see the docs so they can be
copy-and-pasted to
+ // docs/source/user-guide/configs.md until this task is automated
+ // in https://github.com/apache/arrow-datafusion/issues/2770
+ //println!("{}", docs);
+ let mut lines = docs.lines();
+ assert_eq!(
+ lines.next().unwrap(),
+ "| key | type | default | description |"
+ );
+ let configs = BuiltInConfigs::default();
+ for config in configs.config_definitions {
+ assert!(docs.contains(&config.key));
+ }
}
#[test]
fn get_then_set() {
let mut config = ConfigOptions::new();
- let config_key = "datafusion.optimizer.filterNullJoinKeys";
+ let config_key = "datafusion.optimizer.filter_null_join_keys";
assert!(!config.get_bool(config_key));
config.set_bool(config_key, true);
assert!(config.get_bool(config_key));
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 6b58d4570..16cd1adc2 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -80,7 +80,7 @@ use
crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;
-use crate::config::{ConfigOptions, OPT_FILTER_NULL_JOIN_KEYS};
+use crate::config::{ConfigOptions, OPT_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json,
plan_to_parquet};
@@ -974,8 +974,6 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}
-/// Session Configuration entry name for 'BATCH_SIZE'
-pub const BATCH_SIZE: &str = "batch_size";
/// Session Configuration entry name for 'TARGET_PARTITIONS'
pub const TARGET_PARTITIONS: &str = "target_partitions";
/// Session Configuration entry name for 'REPARTITION_JOINS'
@@ -990,10 +988,6 @@ pub const PARQUET_PRUNING: &str = "parquet_pruning";
/// Configuration options for session context
#[derive(Clone)]
pub struct SessionConfig {
- /// Default batch size while creating new batches, it's especially useful
- /// for buffer-in-memory batches since creating tiny batches would results
- /// in too much metadata memory consumption.
- pub batch_size: usize,
/// Number of partitions for query execution. Increasing partitions can
increase concurrency.
pub target_partitions: usize,
/// Default catalog name for table resolution
@@ -1023,7 +1017,6 @@ pub struct SessionConfig {
impl Default for SessionConfig {
fn default() -> Self {
Self {
- batch_size: 8192,
target_partitions: num_cpus::get(),
default_catalog: DEFAULT_CATALOG.to_owned(),
default_schema: DEFAULT_SCHEMA.to_owned(),
@@ -1055,12 +1048,16 @@ impl SessionConfig {
self.set(key, ScalarValue::Boolean(Some(value)))
}
+ /// Set a generic `u64` configuration option
+ pub fn set_u64(self, key: &str, value: u64) -> Self {
+ self.set(key, ScalarValue::UInt64(Some(value)))
+ }
+
/// Customize batch size
- pub fn with_batch_size(mut self, n: usize) -> Self {
+ pub fn with_batch_size(self, n: usize) -> Self {
// batch size must be greater than zero
assert!(n > 0);
- self.batch_size = n;
- self
+ self.set_u64(OPT_BATCH_SIZE, n.try_into().unwrap())
}
/// Customize target_partitions
@@ -1118,10 +1115,27 @@ impl SessionConfig {
self
}
- /// Convert configuration to name-value pairs
+ /// Get the currently configured batch size
+ pub fn batch_size(&self) -> usize {
+ self.config_options
+ .get_u64(OPT_BATCH_SIZE)
+ .try_into()
+ .unwrap()
+ }
+
+ /// Get the current configuration options
+ pub fn config_options(&self) -> &ConfigOptions {
+ &self.config_options
+ }
+
+ /// Convert configuration options to name-value pairs with values
converted to strings. Note
+ /// that this method will eventually be deprecated and replaced by
[config_options].
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
- map.insert(BATCH_SIZE.to_owned(), format!("{}", self.batch_size));
+ // copy configs from config_options
+ for (k, v) in self.config_options.options() {
+ map.insert(k.to_string(), format!("{}", v));
+ }
map.insert(
TARGET_PARTITIONS.to_owned(),
format!("{}", self.target_partitions),
@@ -1496,7 +1510,9 @@ impl TaskContext {
session_config
} else {
session_config
-
.with_batch_size(props.get(BATCH_SIZE).unwrap().parse().unwrap())
+ .with_batch_size(
+
props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap(),
+ )
.with_target_partitions(
props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
)
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 2b8582126..a87fce934 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -76,7 +76,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// we should do that once
https://issues.apache.org/jira/browse/ARROW-11059 is
// implemented. For now, we choose half the configured batch
size to avoid copies
// when a small number of rows are removed from a batch
- let target_batch_size = config.batch_size / 2;
+ let target_batch_size = config.batch_size() / 2;
Arc::new(CoalesceBatchesExec::new(plan.clone(),
target_batch_size))
} else {
plan.clone()
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs
b/datafusion/core/src/physical_plan/file_format/avro.rs
index ae1efb297..d03951990 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -111,7 +111,7 @@ impl ExecutionPlan for AvroExec {
) -> Result<SendableRecordBatchStream> {
let proj = self.base_config.projected_file_column_names();
- let batch_size = context.session_config().batch_size;
+ let batch_size = context.session_config().batch_size();
let file_schema = Arc::clone(&self.base_config.file_schema);
// The avro reader cannot limit the number of records, so `remaining`
is ignored.
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 7dddb70e9..0f8273ec1 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -115,7 +115,7 @@ impl ExecutionPlan for CsvExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let batch_size = context.session_config().batch_size;
+ let batch_size = context.session_config().batch_size();
let file_schema = Arc::clone(&self.base_config.file_schema);
let file_projection =
self.base_config.file_column_projection_indices();
let has_header = self.has_header;
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 397fee5fe..a829218f4 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -96,7 +96,7 @@ impl ExecutionPlan for NdJsonExec {
) -> Result<SendableRecordBatchStream> {
let proj = self.base_config.projected_file_column_names();
- let batch_size = context.session_config().batch_size;
+ let batch_size = context.session_config().batch_size();
let file_schema = Arc::clone(&self.base_config.file_schema);
// The json reader cannot limit the number of records, so `remaining`
is ignored.
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 54694f119..e719499c3 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -225,7 +225,7 @@ impl ExecutionPlan for ParquetExec {
metrics: self.metrics.clone(),
object_store,
pruning_predicate: self.pruning_predicate.clone(),
- batch_size: context.session_config().batch_size,
+ batch_size: context.session_config().batch_size(),
schema: self.projected_schema.clone(),
projection,
remaining_rows: self.base_config.limit,
diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs
b/datafusion/core/src/physical_plan/sort_merge_join.rs
index ffbd27df9..39ff4c2ec 100644
--- a/datafusion/core/src/physical_plan/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/sort_merge_join.rs
@@ -191,7 +191,7 @@ impl ExecutionPlan for SortMergeJoinExec {
let buffered = buffered.execute(partition, context.clone())?;
// create output buffer
- let batch_size = context.session_config().batch_size;
+ let batch_size = context.session_config().batch_size();
// create join stream
Ok(Box::pin(SMJStream::try_new(
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index c75f73f56..697ccb18d 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -134,7 +134,7 @@ impl ExternalSorter {
/// MergeSort in mem batches as well as spills into total order with
`SortPreservingMergeStream`.
async fn sort(&self) -> Result<SendableRecordBatchStream> {
let partition = self.partition_id();
- let batch_size = self.session_config.batch_size;
+ let batch_size = self.session_config.batch_size();
let mut in_mem_batches = self.in_mem_batches.lock().await;
if self.spilled_before().await {
@@ -168,7 +168,7 @@ impl ExternalSorter {
self.schema.clone(),
&self.expr,
tracking_metrics,
- self.session_config.batch_size,
+ self.session_config.batch_size(),
)))
} else if in_mem_batches.len() > 0 {
let tracking_metrics = self
@@ -268,7 +268,7 @@ impl MemoryConsumer for ExternalSorter {
&mut *in_mem_batches,
self.schema.clone(),
&*self.expr,
- self.session_config.batch_size,
+ self.session_config.batch_size(),
tracking_metrics,
);
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index f7ce73834..dc788be33 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -224,7 +224,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
schema,
&self.expr,
tracking_metrics,
- context.session_config().batch_size,
+ context.session_config().batch_size(),
));
debug!("Got stream result from
SortPreservingMergeStream::new_from_receivers");
@@ -1190,7 +1190,7 @@ mod tests {
batches.schema(),
sort.as_slice(),
tracking_metrics,
- task_ctx.session_config().batch_size,
+ task_ctx.session_config().batch_size(),
);
let mut merged =
common::collect(Box::pin(merge_stream)).await.unwrap();
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index c82ab2eb7..fecd54106 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -30,3 +30,4 @@ The following configuration options can be passed to
`SessionConfig` to control
| key | type | default | description
|
| --------------------------------------- | ------- | ------- |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to
true, the optimizer will insert filters before a join between a nullable and
non-nullable column to filter out nulls on the nullable side. This filter can
add additional overhead when the file format does not fully support predicate
push down. |
+| datafusion.execution.batch_size | UInt32 | 8192 | Default batch
size while creating new batches, it's especially useful for buffer-in-memory
batches since creating tiny batches would results in too much metadata memory
consumption.
|