JanKaul opened a new issue, #22090:
URL: https://github.com/apache/datafusion/issues/22090

   ### Describe the bug
   
   `RepartitionExec`'s distribution channels (`distributor_channels.rs`) only 
throttle producers when **every** output channel has at least one buffered item 
("Gate A"). With balanced hash partitioning and a single slow consumer, the 
slow channel grows linearly per input batch.
   
   ### To reproduce
   
   Trace with N=4 partitions, ch3's consumer slower than the others:
   
   1. After the first input batch each channel has ≥1 item → Gate A closes.
   2. ch0 drains → Gate A opens.
   3. Producer pushes the next input batch's sub-batches: one 
`take`-materialized batch per partition. Push to ch0 succeeds (was empty); ch1, 
ch2, ch3 already had data, so the empty-counter never decrements during the 
round.
   4. After the round all channels are non-empty again → Gate A re-closes. ch3 
has gained 1.
   
   Each cycle adds another batch to ch3 — linear growth in the number of input 
batches, until OOM (or until the per-partition `SharedMemoryReservation` 
triggers spilling).
   
   ### Cause
   
   Gate A's invariant is "no live channel is empty." A single fast channel 
oscillating empty/non-empty keeps the gate open forever, and every gate-open 
window lets the producer push to **every** partition — including the slow one. 
Skewed input data is *not* required; even hash distribution + one lagging 
consumer is enough.
   
   ### Expected behavior
   
   The producer should be throttled when total buffered memory crosses a 
configured threshold, regardless of how many channels are technically non-empty.
   
   ### Proposed fix
   
   Add a second gate condition — total buffered bytes across all channels ≥ a 
configured budget — and close the gate when **either** condition fires (`A || 
B`). Gate A still gives O(1) per-channel depth for balanced workloads with even 
consumer rates; Gate B caps total memory whenever some channel never drains. 
Velox's `LocalExchangeMemoryManager` is the design template.
   
   I'll send a PR shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to