yjshen commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820040751
##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
} else if current < min_per_rqt {
// if we cannot acquire at lease 1/2n memory, just wait for
others
// to spill instead spill self frequently with limited total
mem
- self.cv.wait(&mut rqt_current_used);
+ let timeout = self
+ .cv
+ .wait_for(&mut rqt_current_used, Duration::from_secs(5));
Review comment:
By introducing tracking consumers, and a `mem_used()` method for all
consumers, we need each memory-consuming operator implementation to acquire
memory and release it eagerly when it's not used.
For example, the external sorter releases its requester's memory to zero and
transfer that to a tracking consumer.
>How do we know when other consumers will release? If no consumers release?
Each task is processing a finite partition of data. If no consumers release
and cause a stuck, I prefer to treat it as a potential bug, and we should fix
it instead of letting it slip through our fingers. We are not dealing with a
black box of operators like UDFs, but all controlled physical operators in the
engine.
Spilling others in the same task won't make the case easier. It has limited
scope and will also cause chained window operators in Spark task to deteriorate
performance badly, resulting in more spills and even crash or triggering
another speculative task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]