16pierre commented on issue #17334: URL: https://github.com/apache/datafusion/issues/17334#issuecomment-3237651689
> Besides I think maybe it's okay to remove memory reservations in RepartitionExec, it's memory usage should be bounded by batch_mem_size * partition_count, which is a constant. Interestingly, when experimenting locally with workarounds suggested in this issue (splitting the memory pools), I've been hitting OOM issues where `RepartitionExec` was consuming _much more_ than `batch_mem_size * partition_count` (FYI I'm using `RoundRobin`) Still investigating, I suspect that upon each `RepartitionExec#pull_from_input` loop iteration, we push one RecordBatch to each candidate `output_channels`, no matter how many batches are currently buffered ? See `batches_until_yield` and `tx.send`: https://github.com/apache/datafusion/blob/4bc66c80d560581f527dee5774fb4f0479786d3e/datafusion/physical-plan/src/repartition/mod.rs#L934-L935 https://github.com/apache/datafusion/blob/4bc66c80d560581f527dee5774fb4f0479786d3e/datafusion/physical-plan/src/repartition/mod.rs#L957 At first glance of the `tx.send` codepath I'm not seeing clear hints that this enforces bounded capacity and can back-pressure - which could make sense, I guess that blocking on this call could lead to all kinds of multi-threading/dead-lock problems, but I may be wrong here, this codepath isn't trivial. Docs seem to point in this direction too: https://github.com/apache/datafusion/blob/e718c1a5c5770c071c9c2e14a7681a7f1a2f3f23/datafusion/physical-plan/src/repartition/distributor_channels.rs#L37-L39 I interpret this as: if there is unbalance between the partition processing speeds, the `RepartitionExec` node could buffer a bunch of RecordBatches on some partitions, at least temporarily. And sorting could very well be a big offender here because some partitions get naturally stuck for long durations when they need to sort-spill. If this understanding is correct maybe I should consider controlling the parallelism differently than with a `RepartitionExec` here, such as parallelize at the datasource layer / coalesce some of the intermediate Union nodes etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
