yjshen edited a comment on pull request #1526:
URL: 
https://github.com/apache/arrow-datafusion/pull/1526#issuecomment-1008690139


   @houqp @alamb  Thanks for your detailed and insightful review!
   
   ### Resolved:
   
   - The maintained total trackers' memory and the background threads that 
update it are removed. Instead, total tracker memory is collected each time the 
memory manager runs its `can_grow` method.
   - Renamed controlling consumer to requesting consumer, requester in short.
   - Use Weak<dyn MemoryConsumer> in MemoryManager now.
   - Use TempDir instead of manually retries of creating scratch dirs. Use rand 
instead of `uuid` crate.
   
   ### To discuss:
   > I didn't see any code that registered any Tracking consumers yet.
   
   There is one in `SortMergingStream` while merging multiple partial order 
results from spill files and the last piece of batches that are still in 
memory. The last piece is created as in-memory batches backed `StreamWrapper`, 
and just reporting its usage as in-memory batches total size.
   > 
   > In terms of plumbing, what do you think about:
   > 
   > 1. making all `ExecutionPlans` `MemoryConsumers` and providing default 
implementations (that reported 0 usage)
   > 2. Registering all ExecutionPlans somehow as MemoryConsumers as part of 
physical plan creation?
   > 
   > That way all implementations of ExecutionPlan could report their usage 
without having to explicitly register themselves with the memory manager. Also 
the manager could report on how many operators were not providing any 
statistics, etc
   
   I think there is a gap between ExecPlan and MemoryConsumer.  Since an 
`execute` method would be called multiple times with different `partition`, 
it's always the `SendableRecordBatchStream` such as 
`SortPreservingMergeStream`, `CrossJoinStream` that takes up memory. Should I 
make it like:
   
   ```rust
   
   /// Trait for types that stream [arrow::record_batch::RecordBatch]
   pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> + 
MemoryConsumer {
       /// Returns the schema of this `RecordBatchStream`.
       ///
       /// Implementation of this trait should guarantee that all 
`RecordBatch`'s returned by this
       /// stream should have the same schema as returned from this method.
       fn schema(&self) -> SchemaRef;
   }
   
   /// Trait for a stream of record batches.
   pub type SendableRecordBatchStream = Pin<Arc<dyn RecordBatchStream + Send + 
Sync>>;
   
   ```
   Should I make SendableRecordBatchStream pin arc instead of pin box and 
register each stream arc to runtime at each `execute()` last line? Also 
register consumers through:
   
   ```rs
       pub fn register_consumer(&self, memory_consumer: &Arc<dyn 
MemoryConsumer>) {
   ```
   may sometimes be awkward:
   
   ```rs
       runtime.register_consumer(&(streams.clone() as Arc<dyn MemoryConsumer>));
   ``` 
   
   Any thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to