zhuqi-lucas commented on code in PR #21170:
URL: https://github.com/apache/datafusion/pull/21170#discussion_r2999234054


##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1011,20 +1014,39 @@ fn add_merge_on_top(input: DistributionContext) -> 
DistributionContext {
 /// ```text
 /// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, 
c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
 /// ```
+#[expect(clippy::type_complexity)]
 fn remove_dist_changing_operators(
     mut distribution_context: DistributionContext,
-) -> Result<DistributionContext> {
+) -> Result<(
+    DistributionContext,
+    Option<usize>,
+    Option<Arc<dyn ExecutionPlan>>,
+)> {
+    let mut fetch = None;
+    let mut spm: Option<Arc<dyn ExecutionPlan>> = None;
     while is_repartition(&distribution_context.plan)
         || is_coalesce_partitions(&distribution_context.plan)
         || is_sort_preserving_merge(&distribution_context.plan)
     {
+        // Preserve any `fetch` (limit) that was pushed into a
+        // `SortPreservingMergeExec` or `CoalescePartitionsExec` by
+        // `LimitPushdown`. Without this, the limit would be lost when
+        // the operator is stripped.
+        if let Some(child_fetch) = distribution_context.plan.fetch() {
+            if is_sort_preserving_merge(&distribution_context.plan) {
+                if fetch.is_none() {
+                    spm = Some(Arc::clone(&distribution_context.plan));
+                }
+            }

Review Comment:
   Agreed, fixed in 8d2ee25. The `spm_ordering` capture is now independent of 
`fetch` — uses `spm_ordering.is_none()` instead of `fetch.is_none()`. Also 
switched from storing the full plan to just the `LexOrdering`, which avoids the 
stale reference issue entirely.



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3915,6 +3915,86 @@ fn test_replace_order_preserving_variants_with_fetch() 
-> Result<()> {
     Ok(())
 }
 
+/// When `LimitPushdown` merges a `GlobalLimitExec` into 
`CoalescePartitionsExec`
+/// as `fetch`, `remove_dist_changing_operators` must preserve that fetch 
value.
+/// Otherwise, queries with LIMIT over multi-partition sources silently lose
+/// the limit and return duplicate/extra rows.
+///
+/// Regression test for: https://github.com/apache/datafusion/issues/21169
+#[test]
+fn coalesce_partitions_fetch_preserved_by_enforce_distribution() -> Result<()> 
{
+    // Simulate what LimitPushdown produces:
+    // CoalescePartitionsExec(fetch=1)
+    //   DataSourceExec (2 partitions)
+    let parquet = parquet_exec_multiple();
+    let coalesce_with_fetch: Arc<dyn ExecutionPlan> =
+        Arc::new(CoalescePartitionsExec::new(parquet).with_fetch(Some(1)));
+
+    let result = ensure_distribution_helper(coalesce_with_fetch, 10, false)?;
+
+    // The fetch=1 must survive. It can appear either as:
+    // - CoalescePartitionsExec: fetch=1  (re-inserted with fetch), or
+    // - GlobalLimitExec: skip=0, fetch=1 (fallback when merge wasn't re-added)

Review Comment:
   Fixed in 8d2ee25 — updated the comment to reflect the actual behavior.



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1483,9 +1509,30 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
-    Ok(Transformed::yes(DistributionContext::new(
-        plan, data, children,
-    )))
+    let mut dist_context = DistributionContext::new(Arc::clone(&plan), data, 
children);
+
+    // If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
+    // had `UnspecifiedDistribution` or the child already had a single
+    // partition), the limit that was originally embedded in a distribution
+    // changing operator would be silently lost. Re-introduce it so the
+    // query still returns the correct number of rows.
+    if let Some(fetch_val) = fetch.take() {
+        let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
+            // Re-insert the original SortPreservingMergeExec with fetch.
+            spm.with_fetch(Some(fetch_val)).unwrap()

Review Comment:
   Agreed on both points. Fixed in 8d2ee25:
   1. No longer reuse the stale SPM — we capture only the `LexOrdering` and 
reconstruct a fresh `SortPreservingMergeExec::new(ordering, dist_context.plan)` 
with the current child.
   2. The `unwrap()` is eliminated since we construct the SPM directly rather 
than going through the trait's `with_fetch` → `Option` path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to