zhuqi-lucas opened a new issue, #21174:
URL: https://github.com/apache/datafusion/issues/21174

   ### Describe the bug
   
   The `EnforceDistribution` optimizer rule [documents itself as 
idempotent](https://github.com/apache/datafusion/blob/d5302768c9cc0b0c79622985c847018bc9f5abbc/datafusion/physical-optimizer/src/enforce_distribution.rs#L161):
   
   > is idempotent; i.e. it can be applied multiple times, each time producing 
the same result.
   
   However, running `EnforceDistribution` twice on the same plan can produce 
materially different results. This is problematic for use cases that run 
`EnforceDistribution` multiple times (e.g., before and after custom optimizer 
rules that introduce new distribution-changing operators).
   
   ### Root causes
   
   The fundamental issue is the "strip-and-rebuild" design: 
`remove_dist_changing_operators` strips all distribution-changing operators 
(`CoalescePartitionsExec`, `SortPreservingMergeExec`, `RepartitionExec`), then 
the rule re-inserts them as needed. However, several pieces of state are lost 
or changed during this cycle:
   
   #### 1. `data` flag changes between passes
   
   `DistributionContext.data` tracks whether a subtree contains 
distribution-changing operators. After the first pass strips them, the second 
pass sees `data=false`, which changes the control flow in `ensure_distribution`:
   
   ```rust
   // L1407-1428
   let streaming_benefit = if child.data {  // false on 2nd pass
       preserving_order_enables_streaming(&plan, &child.plan)?
   } else {
       false
   };
   
   if (!ordering_satisfied || !order_preserving_variants_desirable)
       && !streaming_benefit
       && child.data  // false on 2nd pass → skip 
replace_order_preserving_variants
   {
       child = replace_order_preserving_variants(child)?;
   }
   ```
   
   #### 2. `replace_order_preserving_variants` makes irreversible 
transformations
   
   - `SortPreservingMergeExec` → `CoalescePartitionsExec` (L1086-1091)
   - `RepartitionExec(preserve_order=true)` → regular `RepartitionExec` 
(L1092-1101)
   
   On the second pass, these original operators are gone, so the function makes 
different decisions.
   
   #### 3. Hash partitioning decisions depend on current partition count
   
   ```rust
   // L896-901
   n_target > input.plan.output_partitioning().partition_count()
   ```
   
   After the first pass adds/removes `RepartitionExec`, the partition count 
changes, leading to different decisions on the second pass.
   
   #### 4. `fetch` values lost during strip-and-rebuild (fixed in #21170)
   
   `remove_dist_changing_operators` strips operators that carry `fetch` (limit 
push-down). Without preserving the fetch, the plan changes between passes. This 
specific case is addressed in #21170.
   
   ### To Reproduce
   
   ```
   Pass 1 input:
     AggregateExec (requires SinglePartition, maintains_input_order)
       SortPreservingMergeExec
         RepartitionExec(preserve_order=true, Hash)
           DataSourceExec (sorted, 2 partitions)
   
   Pass 1 output:
     AggregateExec
       SortPreservingMergeExec       ← data=true in child context
         RepartitionExec(preserve_order=true, Hash)
           DataSourceExec
   
   Pass 2 input = Pass 1 output, but now:
     - remove_dist_changing_operators strips SPM and RepartitionExec
     - data=false for the stripped subtree
     - streaming_benefit evaluation differs
     - replace_order_preserving_variants may or may not be called
     → Different plan structure
   ```
   
   ### Expected behavior
   
   `EnforceDistribution` should be truly idempotent: `enforce(enforce(plan)) == 
enforce(plan)` for all valid input plans. Either:
   1. Fix the rule to be idempotent, or
   2. Update the documentation to accurately reflect that it is NOT idempotent.
   
   ### Additional context
   
   - Related fix for `fetch` loss: #21170
   - The `data` flag dependency is the most pervasive source of non-idempotency 
and likely the hardest to fix.
   - Use cases that run `EnforceDistribution` multiple times (e.g., custom 
optimizer rules that insert distribution-changing operators between passes) are 
affected by this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to