kosiew commented on code in PR #21425:
URL: https://github.com/apache/datafusion/pull/21425#discussion_r3071570320


##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -294,6 +319,7 @@ impl MemoryConsumer {
             name: self.name.clone(),
             can_spill: self.can_spill,
             id: Self::new_unique_id(),
+            reclaimer: self.reclaimer.clone(),

Review Comment:
   I’m a bit concerned about `clone_with_new_id` carrying over the same 
`MemoryReclaimer` into a new consumer id. That means the reclaimer’s identity 
no longer matches the tracked consumer id.
   
   In particular, if a clone gets registered, `TrackConsumersPool::reclaim` 
might pick the clone but end up invoking a callback that actually reclaims 
state from the original operator or reservation. It also seems like this could 
bypass `exclude_consumer_id`, since excluding the original id would not exclude 
the clone that still has the same callback.
   
   Would it make sense to avoid cloning the reclaimer here, or require 
something like a callback factory or an explicit reset when creating a new 
consumer id?



##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -307,6 +333,15 @@ impl MemoryConsumer {
         Self { can_spill, ..self }
     }
 
+    /// Configure a callback that can reclaim memory from this consumer when 
another consumer in
+    /// the same pool is under pressure.
+    pub fn with_reclaimer(self, reclaimer: Arc<dyn MemoryReclaimer>) -> Self {

Review Comment:
   It looks like `with_reclaimer` and `with_can_spill(true)` now effectively 
need to be used together, but the API doesn’t make that obvious.
   
   Right now, a caller can attach a reclaimer that will silently never be used 
because `TrackConsumersPool::reclaim` also filters on `can_spill`.
   
   Maybe `with_reclaimer` could imply `can_spill`, or at least we could 
document or assert that reclaimers are ignored unless `can_spill` is true?



##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -331,6 +371,12 @@ impl MemoryConsumer {
     }
 }
 
+/// Callback implemented by spillable operators that can synchronously reclaim 
existing
+/// reservations when another consumer in the same pool is under pressure.
+pub trait MemoryReclaimer: Send + Sync {
+    fn reclaim(&self, target_bytes: usize) -> Result<usize>;

Review Comment:
   The `MemoryReclaimer` contract could use a bit more detail to make it safer 
for implementers.
   
   For example, it would help to clarify that:
   - it should only return bytes that were actually released from pool-tracked 
reservations
   - it should not report more than `target_bytes`
   - it should avoid holding strong references that could create cycles back to 
`MemoryReservation` or `MemoryConsumer`
   
   Spelling this out would make custom implementations a lot less error-prone.



##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -488,6 +503,50 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
         Ok(())
     }
 
+    fn reclaim(

Review Comment:
   Since `TrackConsumersPool` wraps an arbitrary `I: MemoryPool`, this override 
ends up hiding any reclaim logic implemented by the inner pool.
   
   If someone wraps a custom reclaim-aware pool just to get the top-consumer 
diagnostics, `reclaim` will only use the tracked callbacks here and never 
delegate.
   
   Should we consider calling `self.inner.reclaim(...)` as a fallback after 
trying local candidates? Or alternatively, document clearly that this wrapper 
fully owns reclaim selection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to