alamb commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023140383


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +529,104 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+/// Memory pool that can be used in a function scope.
+///
+/// This is helpful if there are many small memory allocations (so the 
overhead if tracking them in [`MemoryManager`] is
+/// high due to lock contention) and pre-calculating the entire allocation for 
a whole [`RecordBatch`] is complicated or
+/// expensive.
+///
+/// The pool will try to allocate a whole block of memory and gives back 
overallocated memory on [drop](Self::drop).

Review Comment:
   👌 



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -70,6 +72,16 @@ use hashbrown::raw::RawTable;
 /// [Compact]: datafusion_row::layout::RowType::Compact
 /// [WordAligned]: datafusion_row::layout::RowType::WordAligned
 pub(crate) struct GroupedHashAggregateStreamV2 {

Review Comment:
   This looks very much like other stream adapters we have in DataFusion -- 
perhaps we can name it something more general like 
`SendableRecordBatchStreamWrapper` or something and put it in
   
   
https://github.com/apache/arrow-datafusion/blob/c9361e0210861962074eb10d7e480949bb862b97/datafusion/core/src/physical_plan/stream.rs#L34
   
   we can always do this as a follow on PR as well



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == 
group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements
+                        let bump_elements = (group_state.indices.capacity() * 
2).max(2);

Review Comment:
   I wonder if we could somehow encapsulate the memory manager interactions 
into functions on `GroupAggrState` rather than treating it like a struct. I 
don't think that is necessary .
   
   However encapsulating might:
   1. Keep this code manageable for future readers
   2. Allow the memory allocation routines to be unit tested (like that when 
new groups are added that the memory allocation is incremented correctly)
   
   



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == 
group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements

Review Comment:
   Growth factors like this are sometimes capped at some large value (like 1G) 
to avoid the 2x memory overhead associated at large memory levels.
   
   If we use 2x growth with no cap, you can get into situations like the table 
would fit in 36GB but the code is trying to go from 32GB to 64GB and hits the 
limit even when the query could complete. This could always be handled in a 
follow on PR -- users can always disable the memory manager and let the 
allocations happen and suffer OOMs if they want the current behavior
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to