Copilot commented on code in PR #21170:
URL: https://github.com/apache/datafusion/pull/21170#discussion_r2993392553
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3915,6 +3915,86 @@ fn test_replace_order_preserving_variants_with_fetch()
-> Result<()> {
Ok(())
}
+/// When `LimitPushdown` merges a `GlobalLimitExec` into
`CoalescePartitionsExec`
+/// as `fetch`, `remove_dist_changing_operators` must preserve that fetch
value.
+/// Otherwise, queries with LIMIT over multi-partition sources silently lose
+/// the limit and return duplicate/extra rows.
+///
+/// Regression test for: https://github.com/apache/datafusion/issues/21169
+#[test]
+fn coalesce_partitions_fetch_preserved_by_enforce_distribution() -> Result<()>
{
+ // Simulate what LimitPushdown produces:
+ // CoalescePartitionsExec(fetch=1)
+ // DataSourceExec (2 partitions)
+ let parquet = parquet_exec_multiple();
+ let coalesce_with_fetch: Arc<dyn ExecutionPlan> =
+ Arc::new(CoalescePartitionsExec::new(parquet).with_fetch(Some(1)));
+
+ let result = ensure_distribution_helper(coalesce_with_fetch, 10, false)?;
+
+ // The fetch=1 must survive. It can appear either as:
+ // - CoalescePartitionsExec: fetch=1 (re-inserted with fetch), or
+ // - GlobalLimitExec: skip=0, fetch=1 (fallback when merge wasn't re-added)
Review Comment:
The test comment mentions a possible `GlobalLimitExec` fallback, but the
optimizer change here re-introduces the limit as
`CoalescePartitionsExec(fetch=N)` (or an SPM with fetch), not as
`GlobalLimitExec`. Updating this comment would keep the test documentation
aligned with the actual behavior under test.
```suggestion
// - SortPreservingMergeExec: fetch=1 (when a merge is re-added with
fetch)
```
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1011,20 +1014,39 @@ fn add_merge_on_top(input: DistributionContext) ->
DistributionContext {
/// ```text
/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b,
c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
/// ```
+#[expect(clippy::type_complexity)]
fn remove_dist_changing_operators(
mut distribution_context: DistributionContext,
-) -> Result<DistributionContext> {
+) -> Result<(
+ DistributionContext,
+ Option<usize>,
+ Option<Arc<dyn ExecutionPlan>>,
+)> {
+ let mut fetch = None;
+ let mut spm: Option<Arc<dyn ExecutionPlan>> = None;
while is_repartition(&distribution_context.plan)
|| is_coalesce_partitions(&distribution_context.plan)
|| is_sort_preserving_merge(&distribution_context.plan)
{
+ // Preserve any `fetch` (limit) that was pushed into a
+ // `SortPreservingMergeExec` or `CoalescePartitionsExec` by
+ // `LimitPushdown`. Without this, the limit would be lost when
+ // the operator is stripped.
+ if let Some(child_fetch) = distribution_context.plan.fetch() {
+ if is_sort_preserving_merge(&distribution_context.plan) {
+ if fetch.is_none() {
+ spm = Some(Arc::clone(&distribution_context.plan));
+ }
+ }
Review Comment:
The logic that captures the original `SortPreservingMergeExec` for fallback
reinsertion is currently gated on `fetch.is_none()`. If multiple stripped
operators have `fetch` (e.g. outer `CoalescePartitionsExec(fetch=...)` and
inner `SortPreservingMergeExec(fetch=...)`), this can fail to record the SPM
even when the effective `fetch` comes from it, causing the fallback to
reintroduce a `CoalescePartitionsExec` and potentially lose the ordering
semantics the original SPM provided. Consider tracking `spm` independently
(e.g. `if is_sort_preserving_merge && spm.is_none()`) and/or storing the
ordering metadata rather than relying on `fetch.is_none()`.
```suggestion
if is_sort_preserving_merge(&distribution_context.plan) &&
spm.is_none() {
// Remember the original SortPreservingMergeExec so it can be
// reinserted later if necessary, independent of how `fetch` is
// aggregated across stripped operators.
spm = Some(Arc::clone(&distribution_context.plan));
}
if let Some(child_fetch) = distribution_context.plan.fetch() {
```
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1483,9 +1509,30 @@ pub fn ensure_distribution(
plan.with_new_children(children_plans)?
};
- Ok(Transformed::yes(DistributionContext::new(
- plan, data, children,
- )))
+ let mut dist_context = DistributionContext::new(Arc::clone(&plan), data,
children);
+
+ // If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
+ // had `UnspecifiedDistribution` or the child already had a single
+ // partition), the limit that was originally embedded in a distribution
+ // changing operator would be silently lost. Re-introduce it so the
+ // query still returns the correct number of rows.
+ if let Some(fetch_val) = fetch.take() {
+ let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
+ // Re-insert the original SortPreservingMergeExec with fetch.
+ spm.with_fetch(Some(fetch_val)).unwrap()
Review Comment:
When re-introducing `fetch` for the stripped `SortPreservingMergeExec`, this
uses `spm.with_fetch(...).unwrap()`, which (a) will panic if `with_fetch`
returns `None` and (b) reuses the original SPM with its original child
plan/cached properties. That can drop any rewrites `ensure_distribution` made
to the child subtree and also makes the returned `PlanContext` inconsistent
because `limit_plan`'s child no longer matches `vec![dist_context]`. Instead,
reconstruct a new `SortPreservingMergeExec` from the captured ordering (and any
other needed flags) using `dist_context.plan` as its input, or rebuild via
`with_new_children` and handle the `Option` without `unwrap`.
```suggestion
// Re-insert SortPreservingMergeExec with fetch, ensuring it
wraps
// the updated `dist_context.plan` as its child.
if let Some(plan_with_fetch) = spm.with_fetch(Some(fetch_val)) {
plan_with_fetch.with_new_children(vec![Arc::clone(&dist_context.plan)])?
} else {
// If `with_fetch` is not supported, fall back to using
// CoalescePartitionsExec(fetch=N) wrapping the output.
Arc::new(
CoalescePartitionsExec::new(Arc::clone(&dist_context.plan))
.with_fetch(Some(fetch_val)),
)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]