zhuqi-lucas opened a new issue, #21174: URL: https://github.com/apache/datafusion/issues/21174
### Describe the bug The `EnforceDistribution` optimizer rule [documents itself as idempotent](https://github.com/apache/datafusion/blob/d5302768c9cc0b0c79622985c847018bc9f5abbc/datafusion/physical-optimizer/src/enforce_distribution.rs#L161): > is idempotent; i.e. it can be applied multiple times, each time producing the same result. However, running `EnforceDistribution` twice on the same plan can produce materially different results. This is problematic for use cases that run `EnforceDistribution` multiple times (e.g., before and after custom optimizer rules that introduce new distribution-changing operators). ### Root causes The fundamental issue is the "strip-and-rebuild" design: `remove_dist_changing_operators` strips all distribution-changing operators (`CoalescePartitionsExec`, `SortPreservingMergeExec`, `RepartitionExec`), then the rule re-inserts them as needed. However, several pieces of state are lost or changed during this cycle: #### 1. `data` flag changes between passes `DistributionContext.data` tracks whether a subtree contains distribution-changing operators. After the first pass strips them, the second pass sees `data=false`, which changes the control flow in `ensure_distribution`: ```rust // L1407-1428 let streaming_benefit = if child.data { // false on 2nd pass preserving_order_enables_streaming(&plan, &child.plan)? } else { false }; if (!ordering_satisfied || !order_preserving_variants_desirable) && !streaming_benefit && child.data // false on 2nd pass → skip replace_order_preserving_variants { child = replace_order_preserving_variants(child)?; } ``` #### 2. `replace_order_preserving_variants` makes irreversible transformations - `SortPreservingMergeExec` → `CoalescePartitionsExec` (L1086-1091) - `RepartitionExec(preserve_order=true)` → regular `RepartitionExec` (L1092-1101) On the second pass, these original operators are gone, so the function makes different decisions. #### 3. Hash partitioning decisions depend on current partition count ```rust // L896-901 n_target > input.plan.output_partitioning().partition_count() ``` After the first pass adds/removes `RepartitionExec`, the partition count changes, leading to different decisions on the second pass. #### 4. `fetch` values lost during strip-and-rebuild (fixed in #21170) `remove_dist_changing_operators` strips operators that carry `fetch` (limit push-down). Without preserving the fetch, the plan changes between passes. This specific case is addressed in #21170. ### To Reproduce ``` Pass 1 input: AggregateExec (requires SinglePartition, maintains_input_order) SortPreservingMergeExec RepartitionExec(preserve_order=true, Hash) DataSourceExec (sorted, 2 partitions) Pass 1 output: AggregateExec SortPreservingMergeExec ← data=true in child context RepartitionExec(preserve_order=true, Hash) DataSourceExec Pass 2 input = Pass 1 output, but now: - remove_dist_changing_operators strips SPM and RepartitionExec - data=false for the stripped subtree - streaming_benefit evaluation differs - replace_order_preserving_variants may or may not be called → Different plan structure ``` ### Expected behavior `EnforceDistribution` should be truly idempotent: `enforce(enforce(plan)) == enforce(plan)` for all valid input plans. Either: 1. Fix the rule to be idempotent, or 2. Update the documentation to accurately reflect that it is NOT idempotent. ### Additional context - Related fix for `fetch` loss: #21170 - The `data` flag dependency is the most pervasive source of non-idempotency and likely the hardest to fix. - Use cases that run `EnforceDistribution` multiple times (e.g., custom optimizer rules that insert distribution-changing operators between passes) are affected by this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
