This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d5a9b7423 Add config option for coalesce_batches physical optimization
rule, make optional (#2791)
d5a9b7423 is described below
commit d5a9b742344c1a6d34e88cac4e34950125dd880b
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jun 28 11:02:29 2022 -0700
Add config option for coalesce_batches physical optimization rule, make
optional (#2791)
* Add config option for coalesce_batches
* remove hard-coded setting names from documentation
* fix
* update docs
* tests
---
datafusion/core/src/config.rs | 33 ++++++++++--
datafusion/core/src/execution/context.rs | 29 +++++++---
.../src/physical_optimizer/coalesce_batches.rs | 61 ++++++++++------------
datafusion/core/src/physical_optimizer/mod.rs | 2 +
.../core/src/physical_plan/coalesce_batches.rs | 47 ++++++++++++++++-
docs/source/user-guide/configs.md | 10 ++--
6 files changed, 131 insertions(+), 51 deletions(-)
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index ee428cef1..40bbf0f4a 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -19,6 +19,7 @@
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
+use itertools::Itertools;
use std::collections::HashMap;
/// Configuration option "datafusion.optimizer.filter_null_join_keys"
@@ -27,6 +28,13 @@ pub const OPT_FILTER_NULL_JOIN_KEYS: &str =
"datafusion.optimizer.filter_null_jo
/// Configuration option "datafusion.execution.batch_size"
pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
+/// Configuration option "datafusion.execution.coalesce_batches"
+pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
+
+/// Configuration option "datafusion.execution.coalesce_target_batch_size"
+pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
+ "datafusion.execution.coalesce_target_batch_size";
+
/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
@@ -115,6 +123,21 @@ impl BuiltInConfigs {
buffer-in-memory batches since creating tiny batches would results
in too much metadata \
memory consumption.",
8192,
+ ),
+ ConfigDefinition::new_bool(
+ OPT_COALESCE_BATCHES,
+ format!("When set to true, record batches will be examined
between each operator and \
+ small batches will be coalesced into larger batches. This is
helpful when there \
+ are highly selective filters or joins that could produce tiny
output batches. The \
+ target batch size is determined by the configuration setting \
+ '{}'.", OPT_COALESCE_TARGET_BATCH_SIZE),
+ true,
+ ),
+ ConfigDefinition::new_u64(
+ OPT_COALESCE_TARGET_BATCH_SIZE,
+ format!("Target batch size when coalescing batches. Uses in
conjunction with the \
+ configuration setting '{}'.", OPT_COALESCE_BATCHES),
+ 4096,
)],
}
}
@@ -124,7 +147,11 @@ impl BuiltInConfigs {
let configs = Self::new();
let mut docs = "| key | type | default | description |\n".to_string();
docs += "|-----|------|---------|-------------|\n";
- for config in configs.config_definitions {
+ for config in configs
+ .config_definitions
+ .iter()
+ .sorted_by_key(|c| c.key.as_str())
+ {
docs += &format!(
"| {} | {} | {} | {} |\n",
config.key, config.data_type, config.default_value,
config.description
@@ -206,10 +233,6 @@ mod test {
#[test]
fn docs() {
let docs = BuiltInConfigs::generate_config_markdown();
- // uncomment this println to see the docs so they can be
copy-and-pasted to
- // docs/source/user-guide/configs.md until this task is automated
- // in https://github.com/apache/arrow-datafusion/issues/2770
- //println!("{}", docs);
let mut lines = docs.lines();
assert_eq!(
lines.next().unwrap(),
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 1eb34c710..458b91526 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -81,7 +81,10 @@ use
crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;
-use crate::config::{ConfigOptions, OPT_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS};
+use crate::config::{
+ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES,
OPT_COALESCE_TARGET_BATCH_SIZE,
+ OPT_FILTER_NULL_JOIN_KEYS,
+};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json,
plan_to_parquet};
@@ -1249,16 +1252,26 @@ impl SessionState {
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));
+ let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync
+ Send>> = vec![
+ Arc::new(AggregateStatistics::new()),
+ Arc::new(HashBuildProbeOrder::new()),
+ ];
+ if config.config_options.get_bool(OPT_COALESCE_BATCHES) {
+ physical_optimizers.push(Arc::new(CoalesceBatches::new(
+ config
+ .config_options
+ .get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
+ .try_into()
+ .unwrap(),
+ )));
+ }
+ physical_optimizers.push(Arc::new(Repartition::new()));
+ physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
+
SessionState {
session_id,
optimizer: Optimizer::new(rules),
- physical_optimizers: vec![
- Arc::new(AggregateStatistics::new()),
- Arc::new(HashBuildProbeOrder::new()),
- Arc::new(CoalesceBatches::new()),
- Arc::new(Repartition::new()),
- Arc::new(AddCoalescePartitionsExec::new()),
- ],
+ physical_optimizers,
query_planner: Arc::new(DefaultQueryPlanner {}),
catalog_list,
scalar_functions: HashMap::new(),
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index a87fce934..51d56d28d 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -18,25 +18,29 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches
-use super::optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::with_new_children_if_necessary;
use crate::{
error::Result,
+ physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
hash_join::HashJoinExec, repartition::RepartitionExec,
+ with_new_children_if_necessary,
},
};
use std::sync::Arc;
-/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small
batches
+/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with
small batches that
+/// are produced by highly selective filters
#[derive(Default)]
-pub struct CoalesceBatches {}
+pub struct CoalesceBatches {
+ /// Target batch size
+ target_batch_size: usize,
+}
impl CoalesceBatches {
#[allow(missing_docs)]
- pub fn new() -> Self {
- Self {}
+ pub fn new(target_batch_size: usize) -> Self {
+ Self { target_batch_size }
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
@@ -45,39 +49,30 @@ impl PhysicalOptimizerRule for CoalesceBatches {
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::SessionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
- // wrap operators in CoalesceBatches to avoid lots of tiny batches
when we have
- // highly selective filters
- let children = plan
- .children()
- .iter()
- .map(|child| self.optimize(child.clone(), config))
- .collect::<Result<Vec<_>>>()?;
-
- let plan_any = plan.as_any();
- // TODO we should do this in a more generic way either by wrapping all
operators
- // or having an API so that operators can declare when their inputs or
outputs
- // need to be wrapped in a coalesce batches operator.
- // See https://issues.apache.org/jira/browse/ARROW-11068
- let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
- || plan_any.downcast_ref::<HashJoinExec>().is_some()
- || plan_any.downcast_ref::<RepartitionExec>().is_some();
-
- // TODO we should also do this for AggregateExec but we need to update
tests
- // as part of this work - see
https://issues.apache.org/jira/browse/ARROW-11068
- // || plan_any.downcast_ref::<AggregateExec>().is_some();
-
if plan.children().is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
+ // recurse down first
+ let children = plan
+ .children()
+ .iter()
+ .map(|child| self.optimize(child.clone(), config))
+ .collect::<Result<Vec<_>>>()?;
let plan = with_new_children_if_necessary(plan, children)?;
+ // The goal here is to detect operators that could produce small
batches and only
+ // wrap those ones with a CoalesceBatchesExec operator. An
alternate approach here
+ // would be to build the coalescing logic directly into the
operators
+ // See https://github.com/apache/arrow-datafusion/issues/139
+ let plan_any = plan.as_any();
+ let wrap_in_coalesce =
plan_any.downcast_ref::<FilterExec>().is_some()
+ || plan_any.downcast_ref::<HashJoinExec>().is_some()
+ || plan_any.downcast_ref::<RepartitionExec>().is_some();
Ok(if wrap_in_coalesce {
- // TODO we should add specific configuration settings for
coalescing batches and
- // we should do that once
https://issues.apache.org/jira/browse/ARROW-11059 is
- // implemented. For now, we choose half the configured batch
size to avoid copies
- // when a small number of rows are removed from a batch
- let target_batch_size = config.batch_size() / 2;
- Arc::new(CoalesceBatchesExec::new(plan.clone(),
target_batch_size))
+ Arc::new(CoalesceBatchesExec::new(
+ plan.clone(),
+ self.target_batch_size,
+ ))
} else {
plan.clone()
})
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index ed4505778..55550bcd2 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -26,3 +26,5 @@ pub mod optimizer;
pub mod pruning;
pub mod repartition;
mod utils;
+
+pub use optimizer::PhysicalOptimizerRule;
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 75ecaf53e..3f39caaef 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -297,11 +297,56 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE};
+ use crate::datasource::MemTable;
+ use crate::physical_plan::filter::FilterExec;
+ use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{memory::MemoryExec,
repartition::RepartitionExec};
- use crate::prelude::SessionContext;
+ use crate::prelude::{SessionConfig, SessionContext};
use crate::test::create_vec_batches;
use arrow::datatypes::{DataType, Field, Schema};
+ #[tokio::test]
+ async fn test_custom_batch_size() -> Result<()> {
+ let ctx = SessionContext::with_config(
+ SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234),
+ );
+ let plan = create_physical_plan(ctx).await?;
+ let projection =
plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
+ let coalesce = projection
+ .input()
+ .as_any()
+ .downcast_ref::<CoalesceBatchesExec>()
+ .unwrap();
+ assert_eq!(1234, coalesce.target_batch_size);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_disable_coalesce() -> Result<()> {
+ let ctx = SessionContext::with_config(
+ SessionConfig::new().set_bool(OPT_COALESCE_BATCHES, false),
+ );
+ let plan = create_physical_plan(ctx).await?;
+ let projection =
plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
+ // projection should directly wrap filter with no coalesce step
+ let _filter = projection
+ .input()
+ .as_any()
+ .downcast_ref::<FilterExec>()
+ .unwrap();
+ Ok(())
+ }
+
+ async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn
ExecutionPlan>> {
+ let schema = test_schema();
+ let partition = create_vec_batches(&schema, 10);
+ let table = MemTable::try_new(schema, vec![partition])?;
+ ctx.register_table("a", Arc::new(table))?;
+ let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?;
+ ctx.create_physical_plan(&plan).await
+ }
+
#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
let schema = test_schema();
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index fecd54106..1fc0b5fa8 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -27,7 +27,9 @@ Instead, edit dev/update_config_docs.sh or the docstrings in
datafusion/core/src
The following configuration options can be passed to `SessionConfig` to
control various aspects of query execution.
-| key | type | default | description
|
-| --------------------------------------- | ------- | ------- |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
-| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to
true, the optimizer will insert filters before a join between a nullable and
non-nullable column to filter out nulls on the nullable side. This filter can
add additional overhead when the file format does not fully support predicate
push down. |
-| datafusion.execution.batch_size | UInt32 | 8192 | Default batch
size while creating new batches, it's especially useful for buffer-in-memory
batches since creating tiny batches would results in too much metadata memory
consumption.
|
+| key | type | default |
description
|
+| ----------------------------------------------- | ------- | ------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| datafusion.execution.batch_size | UInt64 | 8192 |
Default batch size while creating new batches, it's especially useful for
buffer-in-memory batches since creating tiny batches would results in too much
metadata memory consumption.
|
+| datafusion.execution.coalesce_batches | Boolean | true | When
set to true, record batches will be examined between each operator and small
batches will be coalesced into larger batches. This is helpful when there are
highly selective filters or joins that could produce tiny output batches. The
target batch size is determined by the configuration setting
'datafusion.execution.coalesce_target_batch_size'. |
+| datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target
batch size when coalescing batches. Uses in conjunction with the configuration
setting 'datafusion.execution.coalesce_batches'.
|
+| datafusion.optimizer.filter_null_join_keys | Boolean | false | When
set to true, the optimizer will insert filters before a join between a nullable
and non-nullable column to filter out nulls on the nullable side. This filter
can add additional overhead when the file format does not fully support
predicate push down.
|