crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024216048
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
}
}
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+ /// Consumer ID.
+ id: MemoryConsumerId,
+
+ /// Linked memory manager.
+ memory_manager: Arc<MemoryManager>,
+
+ /// Currently used size in bytes.
+ used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+ fn name(&self) -> String {
+ "AggregationState".to_owned()
+ }
+
+ fn id(&self) -> &crate::execution::MemoryConsumerId {
+ &self.id
+ }
+
+ fn memory_manager(&self) -> Arc<MemoryManager> {
+ Arc::clone(&self.memory_manager)
+ }
+
+ fn type_(&self) -> &ConsumerType {
+ &ConsumerType::Tracking
+ }
+
+ async fn spill(&self) -> Result<usize> {
+ Err(DataFusionError::ResourcesExhausted(
+ "Cannot spill AggregationState".to_owned(),
+ ))
+ }
+
+ fn mem_used(&self) -> usize {
+ self.used
+ }
+}
+
+impl AggregationStateMemoryConsumer {
+ async fn alloc(&mut self, bytes: usize) -> Result<()> {
+ self.try_grow(bytes).await?;
+ self.used = self.used.checked_add(bytes).expect("overflow");
+ Ok(())
+ }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+ fn drop(&mut self) {
+ self.memory_manager
+ .drop_consumer(self.id(), self.mem_used());
+ }
+}
+
+trait VecAllocExt {
+ type T;
+
+ fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {
+ type T = T;
+
+ fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
+ if self.capacity() == self.len() {
+ // allocate more
+
+ // growth factor: 2, but at least 2 elements
+ let bump_elements = (self.capacity() * 2).max(2);
+ let bump_size = std::mem::size_of::<u32>() * bump_elements;
+ self.reserve(bump_elements);
+ *accounting =
(*accounting).checked_add(bump_size).expect("overflow");
+ }
+
+ self.push(x);
+ }
+}
+
+trait RawTableAllocExt {
+ type T;
+
+ fn insert_accounted(
+ &mut self,
+ x: Self::T,
+ hasher: impl Fn(&Self::T) -> u64,
Review Comment:
Long-term I would wish that Rust stabilizes the [`Allocator`
trait](https://doc.rust-lang.org/std/alloc/trait.Allocator.html) so we could
plug this into the data structures and measure their usage (no need to guess).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]