acking-you commented on code in PR #11983: URL: https://github.com/apache/datafusion/pull/11983#discussion_r1718127681
########## datafusion/core/src/physical_optimizer/coalesce_batches.rs: ########## @@ -43,45 +50,129 @@ impl CoalesceBatches { Self::default() } } -impl PhysicalOptimizerRule for CoalesceBatches { - fn optimize( - &self, - plan: Arc<dyn crate::physical_plan::ExecutionPlan>, - config: &ConfigOptions, - ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> { - if !config.execution.coalesce_batches { - return Ok(plan); - } - let target_batch_size = config.execution.batch_size; - plan.transform_up(|plan| { - let plan_any = plan.as_any(); - // The goal here is to detect operators that could produce small batches and only - // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here - // would be to build the coalescing logic directly into the operators - // See https://github.com/apache/datafusion/issues/139 - let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some() - || plan_any.downcast_ref::<HashJoinExec>().is_some() +#[inline] +fn get_limit(plan: &dyn Any) -> Option<usize> { + if let Some(limit_exec) = plan.downcast_ref::<GlobalLimitExec>() { + limit_exec.fetch().map(|fetch| limit_exec.skip() + fetch) + } else { + plan.downcast_ref::<LocalLimitExec>() + .map(|limit_exec| limit_exec.fetch()) + } +} + +#[inline] +fn need_scan_all(plan: &dyn Any) -> bool { + plan.downcast_ref::<SortMergeJoinExec>().is_some() + || plan.downcast_ref::<AggregateExec>().is_some() + || plan.downcast_ref::<SortExec>().is_some() + || plan.downcast_ref::<SortPreservingMergeExec>().is_some() + || plan.downcast_ref::<WindowAggExec>().is_some() +} + +#[inline] +fn need_wrap_in_coalesce(plan: &dyn Any) -> bool { + // The goal here is to detect operators that could produce small batches and only + // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here + // would be to build the coalescing logic directly into the operators + // See https://github.com/apache/arrow-datafusion/issues/139 + plan.downcast_ref::<FilterExec>().is_some() + || plan.downcast_ref::<HashJoinExec>().is_some() // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec - || plan_any + || plan .downcast_ref::<RepartitionExec>() .map(|repart_exec| { !matches!( repart_exec.partitioning().clone(), Partitioning::RoundRobinBatch(_) ) }) - .unwrap_or(false); - if wrap_in_coalesce { - Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new( - plan, - target_batch_size, - )))) - } else { - Ok(Transformed::no(plan)) - } + .unwrap_or(false) +} + +fn wrap_in_coalesce_rewrite_inner( + mut limit: Option<usize>, + partition: usize, + default_batch_size: usize, + plan: Arc<dyn crate::physical_plan::ExecutionPlan>, +) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> { + // If the entire table needs to be scanned, the limit at the upper level does not take effect + if need_scan_all(plan.as_any()) { Review Comment: > I think we should turn this around: allow any approved plan nodes instead of disallowing some. Otherwise this will be wrong for any added/forgotten nodes or user defined nodes. After carefully considering the revised plan, I realized that identifying operators requiring a full table scan is still necessary regardless of the changes. Because this optimization is always determined by whether or not there is a full table scan operator and whether or not it contains a limit operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org