Dandandan commented on a change in pull request #9865: URL: https://github.com/apache/arrow/pull/9865#discussion_r605958280
########## File path: rust/datafusion/src/physical_optimizer/coalesce_batches.rs ########## @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CoalesceBatches optimizer that groups batches together rows +//! in bigger batches to avoid overhead with small batches + +use super::optimizer::PhysicalOptimizerRule; +use crate::{ + error::Result, + physical_plan::{ + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, + hash_join::HashJoinExec, repartition::RepartitionExec, + }, +}; +use std::sync::Arc; + +/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches +pub struct CoalesceBatches {} + +impl CoalesceBatches { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +impl PhysicalOptimizerRule for CoalesceBatches { + fn optimize( + &self, + plan: Arc<dyn crate::physical_plan::ExecutionPlan>, + config: &crate::execution::context::ExecutionConfig, + ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> { + // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have + // highly selective filters + let children = plan + .children() + .iter() + .map(|child| self.optimize(child.clone(), config)) + .collect::<Result<Vec<_>>>()?; Review comment: I think that might be a useful direction indeed! I think indeed it can be more efficient in some cases for nodes to write to multiple buffers than produce smaller batches and concatenate them afterwards, although currently it does not seem to me like it would be a enormous performance improvement based on what I saw in profiling info. Probably not something in the scope of this PR indeed as it's already getting pretty big. Some other notes: * In this PR I think I had to create the physical optimizer abstraction, as otherwise I felt the planner would become unmaintainable. The planning and optimization are now separated and not in the same pass like before (I was a bit confused actually about how it worked before!) * Currently I added the AddMergeExec as an optimization pass, as that was like that in the code before, however it feels a bit off as optimization pass? But I will probably keep it like that for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org