2010YOUY01 commented on code in PR #11983:
URL: https://github.com/apache/datafusion/pull/11983#discussion_r1716940344


##########
datafusion/core/src/physical_optimizer/coalesce_batches.rs:
##########
@@ -43,45 +50,126 @@ impl CoalesceBatches {
         Self::default()
     }
 }
-impl PhysicalOptimizerRule for CoalesceBatches {
-    fn optimize(
-        &self,
-        plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
-        config: &ConfigOptions,
-    ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
-        if !config.execution.coalesce_batches {
-            return Ok(plan);
-        }
 
-        let target_batch_size = config.execution.batch_size;
-        plan.transform_up(|plan| {
-            let plan_any = plan.as_any();
-            // The goal here is to detect operators that could produce small 
batches and only
-            // wrap those ones with a CoalesceBatchesExec operator. An 
alternate approach here
-            // would be to build the coalescing logic directly into the 
operators
-            // See https://github.com/apache/datafusion/issues/139
-            let wrap_in_coalesce = 
plan_any.downcast_ref::<FilterExec>().is_some()
-                || plan_any.downcast_ref::<HashJoinExec>().is_some()
+#[inline]
+fn get_limit(plan: &dyn Any) -> Option<usize> {
+    if let Some(limit_exec) = plan.downcast_ref::<GlobalLimitExec>() {
+        limit_exec.fetch()
+    } else {
+        plan.downcast_ref::<LocalLimitExec>()
+            .map(|limit_exec| limit_exec.fetch())
+    }
+}
+
+#[inline]
+fn need_scan_all(plan: &dyn Any) -> bool {
+    plan.downcast_ref::<SortMergeJoinExec>().is_some()
+        || plan.downcast_ref::<AggregateExec>().is_some()
+        || plan.downcast_ref::<SortExec>().is_some()
+        || plan.downcast_ref::<SortPreservingMergeExec>().is_some()
+        || plan.downcast_ref::<WindowAggExec>().is_some()
+}
+
+#[inline]
+fn need_wrap_in_coalesce(plan: &dyn Any) -> bool {
+    // The goal here is to detect operators that could produce small batches 
and only
+    // wrap those ones with a CoalesceBatchesExec operator. An alternate 
approach here
+    // would be to build the coalescing logic directly into the operators
+    // See https://github.com/apache/arrow-datafusion/issues/139
+    plan.downcast_ref::<FilterExec>().is_some()
+                || plan.downcast_ref::<HashJoinExec>().is_some()
                 // Don't need to add CoalesceBatchesExec after a round robin 
RepartitionExec
-                || plan_any
+                || plan
                     .downcast_ref::<RepartitionExec>()
                     .map(|repart_exec| {
                         !matches!(
                             repart_exec.partitioning().clone(),
                             Partitioning::RoundRobinBatch(_)
                         )
                     })
-                    .unwrap_or(false);
-            if wrap_in_coalesce {
-                Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
-                    plan,
-                    target_batch_size,
-                ))))
-            } else {
-                Ok(Transformed::no(plan))
-            }
+                    .unwrap_or(false)
+}
+
+fn wrap_in_coalesce_rewrite_inner(
+    mut limit: Option<usize>,
+    partition: usize,
+    default_batch_size: usize,
+    plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
+) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
+    // If the entire table needs to be scanned, the limit at the upper level 
does not take effect
+    if need_scan_all(plan.as_any()) {
+        limit = None
+    }
+    let children = plan
+        .children()
+        .iter()
+        .map(|&child| {
+            // Update to downstream limit
+            let limit = match get_limit(child.as_any()) {
+                None => limit,
+                v => v,
+            };
+            wrap_in_coalesce_rewrite_inner(
+                limit,
+                partition,
+                default_batch_size,
+                child.clone(),
+            )
         })
-        .data()
+        .collect::<Result<Vec<_>>>()?;
+
+    let wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any());
+
+    // Take the smaller of `limit/partition` and `default_batch_size` as 
target_batch_size
+    let target_batch_size = match limit {
+        Some(limit) => std::cmp::min(ceil(limit, partition), 
default_batch_size),
+        None => default_batch_size,
+    };
+
+    let plan = if children.is_empty() {
+        plan
+    } else {
+        plan.with_new_children(children)?
+    };
+
+    Ok(if wrap_in_coalesce {
+        Arc::new(CoalesceBatchesExec::new(plan, target_batch_size))

Review Comment:
   We can use this API to limit max number of rows to fetch, instead of 
overwriting `target_batch_size`
   
https://github.com/apache/datafusion/blob/e4be013064943786c9915bbc79c18ee82106340a/datafusion/physical-plan/src/coalesce_batches.rs#L106-L110



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to