rkrishn7 commented on code in PR #21170:
URL: https://github.com/apache/datafusion/pull/21170#discussion_r2996334958


##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1483,9 +1510,29 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
-    Ok(Transformed::yes(DistributionContext::new(
-        plan, data, children,
-    )))
+    let mut dist_context = DistributionContext::new(Arc::clone(&plan), data, 
children);
+
+    // If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
+    // had `UnspecifiedDistribution` or the child already had a single
+    // partition), the limit that was originally embedded in a distribution
+    // changing operator would be silently lost. Re-introduce it so the
+    // query still returns the correct number of rows.
+    if let Some(fetch_val) = fetch.take() {
+        let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
+            // Re-insert the original SortPreservingMergeExec with fetch.
+            spm.with_fetch(Some(fetch_val)).unwrap()

Review Comment:
   Hmm, is `spm` actually stale here? It was captured prior to processing of 
the plan's children above. I think we might want to use the 
`SortPreservingMergeExec` constructor directly here with the new plan.



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1483,9 +1510,29 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
-    Ok(Transformed::yes(DistributionContext::new(
-        plan, data, children,
-    )))
+    let mut dist_context = DistributionContext::new(Arc::clone(&plan), data, 
children);
+
+    // If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
+    // had `UnspecifiedDistribution` or the child already had a single
+    // partition), the limit that was originally embedded in a distribution
+    // changing operator would be silently lost. Re-introduce it so the
+    // query still returns the correct number of rows.
+    if let Some(fetch_val) = fetch.take() {
+        let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
+            // Re-insert the original SortPreservingMergeExec with fetch.
+            spm.with_fetch(Some(fetch_val)).unwrap()
+        } else {
+            // The fetch came from a CoalescePartitionsExec. Re-introduce
+            // it as a CoalescePartitionsExec(fetch=N) wrapping the output.
+            Arc::new(

Review Comment:
   Do we want to check if the new plan outputs more than a single partition? 
And if so, use `GlobalLimitExec` instead of `CoalescePartitionsExec`?
   
   Not sure this one matters too much though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to