adriangb commented on code in PR #18014:
URL: https://github.com/apache/datafusion/pull/18014#discussion_r2432692258
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1007,12 +1071,37 @@ impl RepartitionExec {
let timer = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
- if let Some((tx, reservation)) =
output_channels.get_mut(&partition) {
- reservation.lock().try_grow(size)?;
-
- if tx.send(Some(Ok(batch))).await.is_err() {
+ if let Some((tx, reservation, spill_manager)) =
+ output_channels.get_mut(&partition)
+ {
+ let (batch_to_send, is_memory_batch) =
+ match reservation.lock().try_grow(size) {
+ Ok(_) => {
+ // Memory available - send in-memory batch
+ (RepartitionBatch::Memory(batch), true)
+ }
+ Err(_) => {
+ // We're memory limited - spill this single
batch to its own file
Review Comment:
One alternative could be to do a spill file per-channel and have some sort
of gc process where we say "if the spill file exceeds XGB and/or we have more
than YGB of junk space we pay the price of copying over into a new spill file
to keep disk usage from blowing up". That might be a general approach to handle
cases like https://github.com/apache/datafusion/issues/18011 as well. But I'm
not sure the complexity is worth it. If that happens even once I feel that it
will vastly exceed the cost of the extra sys calls to create and delete files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]