yjshen edited a comment on pull request #1526: URL: https://github.com/apache/arrow-datafusion/pull/1526#issuecomment-1008690139
@houqp @alamb Thanks for your detailed and insightful review! ### Resolved: - The maintained total trackers' memory and the background threads that update it are removed. Instead, total tracker memory is collected each time the memory manager runs its `can_grow` method. - Renamed controlling consumer to requesting consumer, requester in short. - Use Weak<dyn MemoryConsumer> in MemoryManager now. - Use TempDir instead of manually retries of creating scratch dirs. Use rand instead of `uuid` crate. ### To discuss: > I didn't see any code that registered any Tracking consumers yet. There is one in `SortMergingStream` while merging multiple partial order results from spill files and the last piece of batches that are still in memory. The last piece is created as in-memory batches backed `StreamWrapper`, and just reporting its usage as in-memory batches total size. > > In terms of plumbing, what do you think about: > > 1. making all `ExecutionPlans` `MemoryConsumers` and providing default implementations (that reported 0 usage) > 2. Registering all ExecutionPlans somehow as MemoryConsumers as part of physical plan creation? > > That way all implementations of ExecutionPlan could report their usage without having to explicitly register themselves with the memory manager. Also the manager could report on how many operators were not providing any statistics, etc I think there is a gap between ExecPlan and MemoryConsumer. Since an `execute` method would be called multiple times with different `partition`, it's always the `SendableRecordBatchStream` such as `SortPreservingMergeStream`, `CrossJoinStream` that takes up memory. Should I make it like: ```rust /// Trait for types that stream [arrow::record_batch::RecordBatch] pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> + MemoryConsumer { /// Returns the schema of this `RecordBatchStream`. /// /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this /// stream should have the same schema as returned from this method. fn schema(&self) -> SchemaRef; } /// Trait for a stream of record batches. pub type SendableRecordBatchStream = Pin<Arc<dyn RecordBatchStream + Send + Sync>>; ``` Should I make SendableRecordBatchStream pin arc instead of pin box and register each stream arc to runtime at each `execute()` last line? Also register consumers through: ```rs pub fn register_consumer(&self, memory_consumer: &Arc<dyn MemoryConsumer>) { ``` may sometimes be awkward: ```rs runtime.register_consumer(&(streams.clone() as Arc<dyn MemoryConsumer>)); ``` Any thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org