yjshen commented on pull request #1526:
URL:
https://github.com/apache/arrow-datafusion/pull/1526#issuecomment-1008690139
@houqp @alamb Thanks for your detailed and insightful review!
### Resolved:
- The maintained total trackers' memory and the background threads that
update it are removed. Instead, total tracker memory is collected each time the
memory manager runs its `can_grow` method.
- Renamed controlling consumer to requesting consumer, requester in short.
- Use Weak<dyn MemoryConsumer> in MemoryManager now.
- Use TempDir instead of manually retries of creating scratch dirs. Use rand
instead of `uuid` crate.
### To discuss:
> I didn't see any code that registered any Tracking consumers yet.
There is one in `SortMergingStream` while merging multiple partial order
results from spill files and the last piece of batches that are still in
memory. The last piece is created as in-memory batches backed `StreamWrapper`,
and just reporting its usage as in-memory batches total size.
>
> In terms of plumbing, what do you think about:
>
> 1. making all `ExecutionPlans` `MemoryConsumers` and providing default
implementations (that reported 0 usage)
> 2. Registering all ExecutionPlans somehow as MemoryConsumers as part of
physical plan creation?
>
> That way all implementations of ExecutionPlan could report their usage
without having to explicitly register themselves with the memory manager. Also
the manager could report on how many operators were not providing any
statistics, etc
I think there is a gap between ExecPlan and MemoryConsumer. Since an
`execute` method would be called multiple times with different `partition`,
it's always the `SendableRecordBatchStream` such as
`SortPreservingMergeStream`, `CrossJoinStream` that takes up memory. Should I
make it like:
```rust
/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> +
MemoryConsumer {
/// Returns the schema of this `RecordBatchStream`.
///
/// Implementation of this trait should guarantee that all
`RecordBatch`'s returned by this
/// stream should have the same schema as returned from this method.
fn schema(&self) -> SchemaRef;
}
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Arc<dyn RecordBatchStream + Send +
Sync>>;
```
Should I make SendableRecordBatchStream pin arc instead of pin box and
register each stream arc to runtime at each `execute()` last line? Also
register consumers through:
```
pub fn register_consumer(&self, memory_consumer: &Arc<dyn
MemoryConsumer>) {
``
may sometimes be awkward:
```
runtime.register_consumer(&(streams.clone() as Arc<dyn
MemoryConsumer>));
```
Any thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]