zhuqi-lucas commented on code in PR #21170:
URL: https://github.com/apache/datafusion/pull/21170#discussion_r2999232892


##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1483,9 +1510,29 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
-    Ok(Transformed::yes(DistributionContext::new(
-        plan, data, children,
-    )))
+    let mut dist_context = DistributionContext::new(Arc::clone(&plan), data, 
children);
+
+    // If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
+    // had `UnspecifiedDistribution` or the child already had a single
+    // partition), the limit that was originally embedded in a distribution
+    // changing operator would be silently lost. Re-introduce it so the
+    // query still returns the correct number of rows.
+    if let Some(fetch_val) = fetch.take() {
+        let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
+            // Re-insert the original SortPreservingMergeExec with fetch.
+            spm.with_fetch(Some(fetch_val)).unwrap()

Review Comment:
   Good catch! Yes, `spm` was indeed stale — it was captured in 
`remove_dist_changing_operators` before the child subtree went through 
`ensure_distribution`, so it referenced the old (pre-rewrite) child plan.
   
   Fixed in 8d2ee25: instead of capturing the entire `Arc<dyn ExecutionPlan>`, 
we now only capture the `LexOrdering` from the SPM. In the fallback path, we 
reconstruct a fresh `SortPreservingMergeExec::new(ordering, dist_context.plan)` 
using the current (rewritten) child. This also eliminates the `unwrap()`.
   
   Additionally, the `spm` capture was gated on `fetch.is_none()`, which meant 
it would be skipped when an outer operator (e.g. `CoalescePartitionsExec`) 
already set `fetch`. I decoupled it to use `spm_ordering.is_none()` instead. 
Added a regression test (`nested_coalesce_over_spm_preserves_spm_ordering`) 
that confirms the old code produced `CoalescePartitionsExec` (losing sort 
semantics) while the fix correctly produces `SortPreservingMergeExec`.



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1483,9 +1510,29 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
-    Ok(Transformed::yes(DistributionContext::new(
-        plan, data, children,
-    )))
+    let mut dist_context = DistributionContext::new(Arc::clone(&plan), data, 
children);
+
+    // If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
+    // had `UnspecifiedDistribution` or the child already had a single
+    // partition), the limit that was originally embedded in a distribution
+    // changing operator would be silently lost. Re-introduce it so the
+    // query still returns the correct number of rows.
+    if let Some(fetch_val) = fetch.take() {
+        let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
+            // Re-insert the original SortPreservingMergeExec with fetch.
+            spm.with_fetch(Some(fetch_val)).unwrap()
+        } else {
+            // The fetch came from a CoalescePartitionsExec. Re-introduce
+            // it as a CoalescePartitionsExec(fetch=N) wrapping the output.
+            Arc::new(

Review Comment:
   Good point. In the current flow, when `fetch` is unconsumed it means 
`add_merge_on_top` didn't fire (either `UnspecifiedDistribution` or single 
partition). For single partition the CoalescePartitionsExec is essentially a 
no-op wrapper with fetch. For multi-partition cases, the fetch was already 
consumed by `add_merge_on_top`. So in practice I think the current behavior is 
correct, but I'll keep an eye on edge cases. Thanks for raising it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to