This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new cbb05177b Implement extensible configuration mechanism (#2754)
cbb05177b is described below
commit cbb05177bce90618021c51622989b58edd882275
Author: Andy Grove <[email protected]>
AuthorDate: Wed Jun 22 08:47:40 2022 -0600
Implement extensible configuration mechanism (#2754)
---
datafusion/core/src/config.rs | 187 +++++++++++++++++++++++++++++++
datafusion/core/src/execution/context.rs | 48 +++++---
datafusion/core/src/lib.rs | 1 +
docs/source/index.rst | 1 +
docs/source/user-guide/configs.md | 26 +++++
5 files changed, 249 insertions(+), 14 deletions(-)
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
new file mode 100644
index 000000000..3834ad70d
--- /dev/null
+++ b/datafusion/core/src/config.rs
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataFusion Configuration Options
+
+use arrow::datatypes::DataType;
+use datafusion_common::ScalarValue;
+use std::collections::HashMap;
+
+/// Configuration option "datafusion.optimizer.filterNullJoinKeys"
+pub const OPT_FILTER_NULL_JOIN_KEYS: &str =
"datafusion.optimizer.filterNullJoinKeys";
+
+/// Definition of a configuration option
+pub struct ConfigDefinition {
+ /// key used to identifier this configuration option
+ key: String,
+ /// Description to be used in generated documentation
+ description: String,
+ /// Data type of this option
+ data_type: DataType,
+ /// Default value
+ default_value: ScalarValue,
+}
+
+impl ConfigDefinition {
+ /// Create a configuration option definition
+ pub fn new(
+ name: impl Into<String>,
+ description: impl Into<String>,
+ data_type: DataType,
+ default_value: ScalarValue,
+ ) -> Self {
+ Self {
+ key: name.into(),
+ description: description.into(),
+ data_type,
+ default_value,
+ }
+ }
+
+ /// Create a configuration option definition with a boolean value
+ pub fn new_bool(name: &str, description: &str, default_value: bool) ->
Self {
+ Self {
+ key: name.to_string(),
+ description: description.to_string(),
+ data_type: DataType::Boolean,
+ default_value: ScalarValue::Boolean(Some(default_value)),
+ }
+ }
+}
+
+/// Contains definitions for all built-in configuration options
+pub struct BuiltInConfigs {
+ /// Configuration option definitions
+ config_definitions: Vec<ConfigDefinition>,
+}
+
+impl Default for BuiltInConfigs {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl BuiltInConfigs {
+ /// Create a new BuiltInConfigs struct containing definitions for all
built-in
+ /// configuration options
+ pub fn new() -> Self {
+ Self {
+ config_definitions: vec![ConfigDefinition::new_bool(
+ OPT_FILTER_NULL_JOIN_KEYS,
+ "When set to true, the optimizer will insert filters before a
join between \
+ a nullable and non-nullable column to filter out nulls on the
nullable side. This \
+ filter can add additional overhead when the file format does
not fully support \
+ predicate push down.",
+ false,
+ )],
+ }
+ }
+
+ /// Generate documentation that can be included int he user guide
+ pub fn generate_config_markdown() -> String {
+ let configs = Self::new();
+ let mut docs = "| key | type | default | description |\n".to_string();
+ docs += "|-----|------|---------|-------------|\n";
+ for config in configs.config_definitions {
+ docs += &format!(
+ "| {} | {} | {} | {} |\n",
+ config.key, config.data_type, config.default_value,
config.description
+ );
+ }
+ docs
+ }
+}
+
+/// Configuration options struct. This can contain values for built-in and
custom options
+#[derive(Debug, Clone)]
+pub struct ConfigOptions {
+ options: HashMap<String, ScalarValue>,
+}
+
+impl Default for ConfigOptions {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ConfigOptions {
+ /// Create new ConfigOptions struct
+ pub fn new() -> Self {
+ let mut options = HashMap::new();
+ let built_in = BuiltInConfigs::new();
+ for config_def in &built_in.config_definitions {
+ options.insert(config_def.key.clone(),
config_def.default_value.clone());
+ }
+ Self { options }
+ }
+
+ /// set a configuration option
+ pub fn set(&mut self, key: &str, value: ScalarValue) {
+ self.options.insert(key.to_string(), value);
+ }
+
+ /// set a boolean configuration option
+ pub fn set_bool(&mut self, key: &str, value: bool) {
+ self.set(key, ScalarValue::Boolean(Some(value)))
+ }
+
+ /// get a configuration option
+ pub fn get(&self, key: &str) -> Option<ScalarValue> {
+ self.options.get(key).cloned()
+ }
+
+ /// get a boolean configuration option
+ pub fn get_bool(&self, key: &str) -> bool {
+ match self.get(key) {
+ Some(ScalarValue::Boolean(Some(b))) => b,
+ _ => false,
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::config::{BuiltInConfigs, ConfigOptions};
+
+ #[test]
+ fn docs() {
+ let docs = BuiltInConfigs::generate_config_markdown();
+ assert_eq!("| key | type | default | description |\
+ \n|-----|------|---------|-------------|\
+ \n| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When
set to true, the optimizer \
+ will insert filters before a join between a nullable and non-nullable
column to filter out \
+ nulls on the nullable side. This filter can add additional overhead
when the file format does \
+ not fully support predicate push down. |\n", docs);
+ }
+
+ #[test]
+ fn get_then_set() {
+ let mut config = ConfigOptions::new();
+ let config_key = "datafusion.optimizer.filterNullJoinKeys";
+ assert!(!config.get_bool(config_key));
+ config.set_bool(config_key, true);
+ assert!(config.get_bool(config_key));
+ }
+
+ #[test]
+ fn get_invalid_config() {
+ let config = ConfigOptions::new();
+ let invalid_key = "not.valid";
+ assert!(config.get(invalid_key).is_none());
+ assert!(!config.get_bool(invalid_key));
+ }
+}
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index ad71d4bb8..6b58d4570 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -80,6 +80,7 @@ use
crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;
+use crate::config::{ConfigOptions, OPT_FILTER_NULL_JOIN_KEYS};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json,
plan_to_parquet};
@@ -91,6 +92,7 @@ use crate::physical_plan::PhysicalPlanner;
use crate::variable::{VarProvider, VarType};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
+use datafusion_common::ScalarValue;
use datafusion_expr::TableSource;
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_sql::{
@@ -1014,6 +1016,8 @@ pub struct SessionConfig {
pub repartition_windows: bool,
/// Should DataFusion parquet reader using the predicate to prune data
pub parquet_pruning: bool,
+ /// Configuration options
+ pub config_options: ConfigOptions,
}
impl Default for SessionConfig {
@@ -1029,6 +1033,7 @@ impl Default for SessionConfig {
repartition_aggregations: true,
repartition_windows: true,
parquet_pruning: true,
+ config_options: ConfigOptions::new(),
}
}
}
@@ -1039,6 +1044,17 @@ impl SessionConfig {
Default::default()
}
+ /// Set a configuration option
+ pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
+ self.config_options.set(key, value);
+ self
+ }
+
+ /// Set a boolean configuration option
+ pub fn set_bool(self, key: &str, value: bool) -> Self {
+ self.set(key, ScalarValue::Boolean(Some(value)))
+ }
+
/// Customize batch size
pub fn with_batch_size(mut self, n: usize) -> Self {
// batch size must be greater than zero
@@ -1200,22 +1216,26 @@ impl SessionState {
.register_catalog(config.default_catalog.clone(),
default_catalog);
}
+ let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
+ // Simplify expressions first to maximize the chance
+ // of applying other optimizations
+ Arc::new(SimplifyExpressions::new()),
+ Arc::new(SubqueryFilterToJoin::new()),
+ Arc::new(EliminateFilter::new()),
+ Arc::new(CommonSubexprEliminate::new()),
+ Arc::new(EliminateLimit::new()),
+ Arc::new(ProjectionPushDown::new()),
+ ];
+ if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
+ rules.push(Arc::new(FilterNullJoinKeys::default()));
+ }
+ rules.push(Arc::new(FilterPushDown::new()));
+ rules.push(Arc::new(LimitPushDown::new()));
+ rules.push(Arc::new(SingleDistinctToGroupBy::new()));
+
SessionState {
session_id,
- optimizer: Optimizer::new(vec![
- // Simplify expressions first to maximize the chance
- // of applying other optimizations
- Arc::new(SimplifyExpressions::new()),
- Arc::new(SubqueryFilterToJoin::new()),
- Arc::new(EliminateFilter::new()),
- Arc::new(CommonSubexprEliminate::new()),
- Arc::new(EliminateLimit::new()),
- Arc::new(ProjectionPushDown::new()),
- Arc::new(FilterNullJoinKeys::default()),
- Arc::new(FilterPushDown::new()),
- Arc::new(LimitPushDown::new()),
- Arc::new(SingleDistinctToGroupBy::new()),
- ]),
+ optimizer: Optimizer::new(rules),
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 7a0cf475d..33501a10f 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -208,6 +208,7 @@ extern crate sqlparser;
pub mod avro_to_arrow;
pub mod catalog;
+pub mod config;
pub mod dataframe;
pub mod datasource;
pub mod error;
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 81cd572d6..bcb2f0ed1 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -43,6 +43,7 @@ Table of content
user-guide/library
user-guide/cli
user-guide/sql/index
+ user-guide/configs
user-guide/faq
.. _toc.specs:
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
new file mode 100644
index 000000000..9560a1f60
--- /dev/null
+++ b/docs/source/user-guide/configs.md
@@ -0,0 +1,26 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Configuration Settings
+
+The following configuration options can be passed to `SessionConfig` to
control various aspects of query execution.
+
+| key | type | default | description
|
+| --------------------------------------- | ------- | ------- |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to
true, the optimizer will insert filters before a join between a nullable and
non-nullable column to filter out nulls on the nullable side. This filter can
add additional overhead when the file format does not fully support predicate
push down. |