kosiew commented on code in PR #21425:
URL: https://github.com/apache/datafusion/pull/21425#discussion_r3071570320
##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -294,6 +319,7 @@ impl MemoryConsumer {
name: self.name.clone(),
can_spill: self.can_spill,
id: Self::new_unique_id(),
+ reclaimer: self.reclaimer.clone(),
Review Comment:
I’m a bit concerned about `clone_with_new_id` carrying over the same
`MemoryReclaimer` into a new consumer id. That means the reclaimer’s identity
no longer matches the tracked consumer id.
In particular, if a clone gets registered, `TrackConsumersPool::reclaim`
might pick the clone but end up invoking a callback that actually reclaims
state from the original operator or reservation. It also seems like this could
bypass `exclude_consumer_id`, since excluding the original id would not exclude
the clone that still has the same callback.
Would it make sense to avoid cloning the reclaimer here, or require
something like a callback factory or an explicit reset when creating a new
consumer id?
##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -307,6 +333,15 @@ impl MemoryConsumer {
Self { can_spill, ..self }
}
+ /// Configure a callback that can reclaim memory from this consumer when
another consumer in
+ /// the same pool is under pressure.
+ pub fn with_reclaimer(self, reclaimer: Arc<dyn MemoryReclaimer>) -> Self {
Review Comment:
It looks like `with_reclaimer` and `with_can_spill(true)` now effectively
need to be used together, but the API doesn’t make that obvious.
Right now, a caller can attach a reclaimer that will silently never be used
because `TrackConsumersPool::reclaim` also filters on `can_spill`.
Maybe `with_reclaimer` could imply `can_spill`, or at least we could
document or assert that reclaimers are ignored unless `can_spill` is true?
##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -331,6 +371,12 @@ impl MemoryConsumer {
}
}
+/// Callback implemented by spillable operators that can synchronously reclaim
existing
+/// reservations when another consumer in the same pool is under pressure.
+pub trait MemoryReclaimer: Send + Sync {
+ fn reclaim(&self, target_bytes: usize) -> Result<usize>;
Review Comment:
The `MemoryReclaimer` contract could use a bit more detail to make it safer
for implementers.
For example, it would help to clarify that:
- it should only return bytes that were actually released from pool-tracked
reservations
- it should not report more than `target_bytes`
- it should avoid holding strong references that could create cycles back to
`MemoryReservation` or `MemoryConsumer`
Spelling this out would make custom implementations a lot less error-prone.
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -488,6 +503,50 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
Ok(())
}
+ fn reclaim(
Review Comment:
Since `TrackConsumersPool` wraps an arbitrary `I: MemoryPool`, this override
ends up hiding any reclaim logic implemented by the inner pool.
If someone wraps a custom reclaim-aware pool just to get the top-consumer
diagnostics, `reclaim` will only use the tracked callbacks here and never
delegate.
Should we consider calling `self.inner.reclaim(...)` as a fallback after
trying local candidates? Or alternatively, document clearly that this wrapper
fully owns reclaim selection?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]