This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d5a9b7423 Add config option for coalesce_batches physical optimization 
rule, make optional (#2791)
d5a9b7423 is described below

commit d5a9b742344c1a6d34e88cac4e34950125dd880b
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jun 28 11:02:29 2022 -0700

    Add config option for coalesce_batches physical optimization rule, make 
optional (#2791)
    
    * Add config option for coalesce_batches
    
    * remove hard-coded setting names from documentation
    
    * fix
    
    * update docs
    
    * tests
---
 datafusion/core/src/config.rs                      | 33 ++++++++++--
 datafusion/core/src/execution/context.rs           | 29 +++++++---
 .../src/physical_optimizer/coalesce_batches.rs     | 61 ++++++++++------------
 datafusion/core/src/physical_optimizer/mod.rs      |  2 +
 .../core/src/physical_plan/coalesce_batches.rs     | 47 ++++++++++++++++-
 docs/source/user-guide/configs.md                  | 10 ++--
 6 files changed, 131 insertions(+), 51 deletions(-)

diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index ee428cef1..40bbf0f4a 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -19,6 +19,7 @@
 
 use arrow::datatypes::DataType;
 use datafusion_common::ScalarValue;
+use itertools::Itertools;
 use std::collections::HashMap;
 
 /// Configuration option "datafusion.optimizer.filter_null_join_keys"
@@ -27,6 +28,13 @@ pub const OPT_FILTER_NULL_JOIN_KEYS: &str = 
"datafusion.optimizer.filter_null_jo
 /// Configuration option "datafusion.execution.batch_size"
 pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
 
+/// Configuration option "datafusion.execution.coalesce_batches"
+pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
+
+/// Configuration option "datafusion.execution.coalesce_target_batch_size"
+pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
+    "datafusion.execution.coalesce_target_batch_size";
+
 /// Definition of a configuration option
 pub struct ConfigDefinition {
     /// key used to identifier this configuration option
@@ -115,6 +123,21 @@ impl BuiltInConfigs {
             buffer-in-memory batches since creating tiny batches would results 
in too much metadata \
             memory consumption.",
             8192,
+            ),
+            ConfigDefinition::new_bool(
+                OPT_COALESCE_BATCHES,
+                format!("When set to true, record batches will be examined 
between each operator and \
+                small batches will be coalesced into larger batches. This is 
helpful when there \
+                are highly selective filters or joins that could produce tiny 
output batches. The \
+                target batch size is determined by the configuration setting \
+                '{}'.", OPT_COALESCE_TARGET_BATCH_SIZE),
+                true,
+            ),
+             ConfigDefinition::new_u64(
+                 OPT_COALESCE_TARGET_BATCH_SIZE,
+                 format!("Target batch size when coalescing batches. Uses in 
conjunction with the \
+            configuration setting '{}'.", OPT_COALESCE_BATCHES),
+                 4096,
             )],
         }
     }
@@ -124,7 +147,11 @@ impl BuiltInConfigs {
         let configs = Self::new();
         let mut docs = "| key | type | default | description |\n".to_string();
         docs += "|-----|------|---------|-------------|\n";
-        for config in configs.config_definitions {
+        for config in configs
+            .config_definitions
+            .iter()
+            .sorted_by_key(|c| c.key.as_str())
+        {
             docs += &format!(
                 "| {} | {} | {} | {} |\n",
                 config.key, config.data_type, config.default_value, 
config.description
@@ -206,10 +233,6 @@ mod test {
     #[test]
     fn docs() {
         let docs = BuiltInConfigs::generate_config_markdown();
-        // uncomment this println to see the docs so they can be 
copy-and-pasted to
-        // docs/source/user-guide/configs.md until this task is automated
-        // in https://github.com/apache/arrow-datafusion/issues/2770
-        //println!("{}", docs);
         let mut lines = docs.lines();
         assert_eq!(
             lines.next().unwrap(),
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 1eb34c710..458b91526 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -81,7 +81,10 @@ use 
crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use crate::physical_optimizer::repartition::Repartition;
 
-use crate::config::{ConfigOptions, OPT_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS};
+use crate::config::{
+    ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, 
OPT_COALESCE_TARGET_BATCH_SIZE,
+    OPT_FILTER_NULL_JOIN_KEYS,
+};
 use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use crate::logical_plan::plan::Explain;
 use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, 
plan_to_parquet};
@@ -1249,16 +1252,26 @@ impl SessionState {
         rules.push(Arc::new(LimitPushDown::new()));
         rules.push(Arc::new(SingleDistinctToGroupBy::new()));
 
+        let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync 
+ Send>> = vec![
+            Arc::new(AggregateStatistics::new()),
+            Arc::new(HashBuildProbeOrder::new()),
+        ];
+        if config.config_options.get_bool(OPT_COALESCE_BATCHES) {
+            physical_optimizers.push(Arc::new(CoalesceBatches::new(
+                config
+                    .config_options
+                    .get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
+                    .try_into()
+                    .unwrap(),
+            )));
+        }
+        physical_optimizers.push(Arc::new(Repartition::new()));
+        physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
+
         SessionState {
             session_id,
             optimizer: Optimizer::new(rules),
-            physical_optimizers: vec![
-                Arc::new(AggregateStatistics::new()),
-                Arc::new(HashBuildProbeOrder::new()),
-                Arc::new(CoalesceBatches::new()),
-                Arc::new(Repartition::new()),
-                Arc::new(AddCoalescePartitionsExec::new()),
-            ],
+            physical_optimizers,
             query_planner: Arc::new(DefaultQueryPlanner {}),
             catalog_list,
             scalar_functions: HashMap::new(),
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs 
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index a87fce934..51d56d28d 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -18,25 +18,29 @@
 //! CoalesceBatches optimizer that groups batches together rows
 //! in bigger batches to avoid overhead with small batches
 
-use super::optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::with_new_children_if_necessary;
 use crate::{
     error::Result,
+    physical_optimizer::PhysicalOptimizerRule,
     physical_plan::{
         coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
         hash_join::HashJoinExec, repartition::RepartitionExec,
+        with_new_children_if_necessary,
     },
 };
 use std::sync::Arc;
 
-/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small 
batches
+/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with 
small batches that
+/// are produced by highly selective filters
 #[derive(Default)]
-pub struct CoalesceBatches {}
+pub struct CoalesceBatches {
+    /// Target batch size
+    target_batch_size: usize,
+}
 
 impl CoalesceBatches {
     #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
+    pub fn new(target_batch_size: usize) -> Self {
+        Self { target_batch_size }
     }
 }
 impl PhysicalOptimizerRule for CoalesceBatches {
@@ -45,39 +49,30 @@ impl PhysicalOptimizerRule for CoalesceBatches {
         plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
         config: &crate::execution::context::SessionConfig,
     ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
-        // wrap operators in CoalesceBatches to avoid lots of tiny batches 
when we have
-        // highly selective filters
-        let children = plan
-            .children()
-            .iter()
-            .map(|child| self.optimize(child.clone(), config))
-            .collect::<Result<Vec<_>>>()?;
-
-        let plan_any = plan.as_any();
-        // TODO we should do this in a more generic way either by wrapping all 
operators
-        // or having an API so that operators can declare when their inputs or 
outputs
-        // need to be wrapped in a coalesce batches operator.
-        // See https://issues.apache.org/jira/browse/ARROW-11068
-        let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
-            || plan_any.downcast_ref::<HashJoinExec>().is_some()
-            || plan_any.downcast_ref::<RepartitionExec>().is_some();
-
-        // TODO we should also do this for AggregateExec but we need to update 
tests
-        // as part of this work - see 
https://issues.apache.org/jira/browse/ARROW-11068
-        // || plan_any.downcast_ref::<AggregateExec>().is_some();
-
         if plan.children().is_empty() {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // recurse down first
+            let children = plan
+                .children()
+                .iter()
+                .map(|child| self.optimize(child.clone(), config))
+                .collect::<Result<Vec<_>>>()?;
             let plan = with_new_children_if_necessary(plan, children)?;
+            // The goal here is to detect operators that could produce small 
batches and only
+            // wrap those ones with a CoalesceBatchesExec operator. An 
alternate approach here
+            // would be to build the coalescing logic directly into the 
operators
+            // See https://github.com/apache/arrow-datafusion/issues/139
+            let plan_any = plan.as_any();
+            let wrap_in_coalesce = 
plan_any.downcast_ref::<FilterExec>().is_some()
+                || plan_any.downcast_ref::<HashJoinExec>().is_some()
+                || plan_any.downcast_ref::<RepartitionExec>().is_some();
             Ok(if wrap_in_coalesce {
-                // TODO we should add specific configuration settings for 
coalescing batches and
-                // we should do that once 
https://issues.apache.org/jira/browse/ARROW-11059 is
-                // implemented. For now, we choose half the configured batch 
size to avoid copies
-                // when a small number of rows are removed from a batch
-                let target_batch_size = config.batch_size() / 2;
-                Arc::new(CoalesceBatchesExec::new(plan.clone(), 
target_batch_size))
+                Arc::new(CoalesceBatchesExec::new(
+                    plan.clone(),
+                    self.target_batch_size,
+                ))
             } else {
                 plan.clone()
             })
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index ed4505778..55550bcd2 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -26,3 +26,5 @@ pub mod optimizer;
 pub mod pruning;
 pub mod repartition;
 mod utils;
+
+pub use optimizer::PhysicalOptimizerRule;
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs 
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 75ecaf53e..3f39caaef 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -297,11 +297,56 @@ pub fn concat_batches(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE};
+    use crate::datasource::MemTable;
+    use crate::physical_plan::filter::FilterExec;
+    use crate::physical_plan::projection::ProjectionExec;
     use crate::physical_plan::{memory::MemoryExec, 
repartition::RepartitionExec};
-    use crate::prelude::SessionContext;
+    use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::create_vec_batches;
     use arrow::datatypes::{DataType, Field, Schema};
 
+    #[tokio::test]
+    async fn test_custom_batch_size() -> Result<()> {
+        let ctx = SessionContext::with_config(
+            SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234),
+        );
+        let plan = create_physical_plan(ctx).await?;
+        let projection = 
plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
+        let coalesce = projection
+            .input()
+            .as_any()
+            .downcast_ref::<CoalesceBatchesExec>()
+            .unwrap();
+        assert_eq!(1234, coalesce.target_batch_size);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_disable_coalesce() -> Result<()> {
+        let ctx = SessionContext::with_config(
+            SessionConfig::new().set_bool(OPT_COALESCE_BATCHES, false),
+        );
+        let plan = create_physical_plan(ctx).await?;
+        let projection = 
plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
+        // projection should directly wrap filter with no coalesce step
+        let _filter = projection
+            .input()
+            .as_any()
+            .downcast_ref::<FilterExec>()
+            .unwrap();
+        Ok(())
+    }
+
+    async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn 
ExecutionPlan>> {
+        let schema = test_schema();
+        let partition = create_vec_batches(&schema, 10);
+        let table = MemTable::try_new(schema, vec![partition])?;
+        ctx.register_table("a", Arc::new(table))?;
+        let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?;
+        ctx.create_physical_plan(&plan).await
+    }
+
     #[tokio::test(flavor = "multi_thread")]
     async fn test_concat_batches() -> Result<()> {
         let schema = test_schema();
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index fecd54106..1fc0b5fa8 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -27,7 +27,9 @@ Instead, edit dev/update_config_docs.sh or the docstrings in 
datafusion/core/src
 
 The following configuration options can be passed to `SessionConfig` to 
control various aspects of query execution.
 
-| key                                     | type    | default | description    
                                                                                
                                                                                
                                                                                
 |
-| --------------------------------------- | ------- | ------- | 
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
-| datafusion.optimizer.filterNullJoinKeys | Boolean | false   | When set to 
true, the optimizer will insert filters before a join between a nullable and 
non-nullable column to filter out nulls on the nullable side. This filter can 
add additional overhead when the file format does not fully support predicate 
push down. |
-| datafusion.execution.batch_size         | UInt32  | 8192    | Default batch 
size while creating new batches, it's especially useful for buffer-in-memory 
batches since creating tiny batches would results in too much metadata memory 
consumption.                                                                    
       |
+| key                                             | type    | default | 
description                                                                     
                                                                                
                                                                                
                                                                                
                              |
+| ----------------------------------------------- | ------- | ------- | 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
+| datafusion.execution.batch_size                 | UInt64  | 8192    | 
Default batch size while creating new batches, it's especially useful for 
buffer-in-memory batches since creating tiny batches would results in too much 
metadata memory consumption.                                                    
                                                                                
                                     |
+| datafusion.execution.coalesce_batches           | Boolean | true    | When 
set to true, record batches will be examined between each operator and small 
batches will be coalesced into larger batches. This is helpful when there are 
highly selective filters or joins that could produce tiny output batches. The 
target batch size is determined by the configuration setting 
'datafusion.execution.coalesce_target_batch_size'. |
+| datafusion.execution.coalesce_target_batch_size | UInt64  | 4096    | Target 
batch size when coalescing batches. Uses in conjunction with the configuration 
setting 'datafusion.execution.coalesce_batches'.                                
                                                                                
                                                                                
                        |
+| datafusion.optimizer.filter_null_join_keys      | Boolean | false   | When 
set to true, the optimizer will insert filters before a join between a nullable 
and non-nullable column to filter out nulls on the nullable side. This filter 
can add additional overhead when the file format does not fully support 
predicate push down.                                                            
                                   |

Reply via email to