gabotechs commented on code in PR #19501:
URL: https://github.com/apache/datafusion/pull/19501#discussion_r2655060697


##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -518,13 +518,29 @@ impl Accumulator for DistinctArrayAggAccumulator {
         Ok(ScalarValue::List(arr))
     }
 
-    fn size(&self) -> usize {
-        size_of_val(self) + ScalarValue::size_of_hashset(&self.values)
-            - size_of_val(&self.values)
+    fn size(&self, pool: Option<&dyn MemoryPool>) -> usize {
+        let mut total = size_of_val(self)
+            + size_of_val(&self.values)
+            + (size_of::<ScalarValue>() * self.values.capacity())
             + self.datatype.size()
             - size_of_val(&self.datatype)
             - size_of_val(&self.sort_options)
-            + size_of::<Option<SortOptions>>()
+            + size_of::<Option<SortOptions>>();
+
+        for scalar in &self.values {
+            if let Some(array) = scalar.get_array_ref() {
+                total += size_of::<Arc<dyn Array>>();
+                if let Some(pool) = pool {
+                    claim_buffers_recursive(&array.to_data(), pool);

Review Comment:
   I see that if a `&dyn MemoryPool` is passed, then the `array` size does not 
compute towards the `total` size, and it's instead claimed in the `&dyn 
MemoryPool` instead.
   
   Imagine this scenario:
   
   - The underlying `array` is huge
   - A `dyn MemoryPool` is passed, so the `array` size does not compute towards 
the `total_size`, it's just claimed in the Arrow Buffer memory pool
   - In 
[GroupedHashAggregateStream::update_memory_reservation](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/row_hash.rs#L1064-L1064),
 the `total_size` is very small, as the `array` size did not compute towards it.
   - When calling `reservation.try_resize()` with the small `total_size`, the 
reservation succeeds
   
   Isn't this a problematic scenario?



##########
datafusion/functions-aggregate/src/first_last.rs:
##########
@@ -1524,11 +1576,36 @@ impl Accumulator for LastValueAccumulator {
         Ok(self.last.clone())
     }
 
-    fn size(&self) -> usize {
-        size_of_val(self) - size_of_val(&self.last)
-            + self.last.size()
-            + ScalarValue::size_of_vec(&self.orderings)
-            - size_of_val(&self.orderings)
+    fn size(&self, pool: Option<&dyn MemoryPool>) -> usize {
+        let mut total =
+            size_of_val(self) - size_of_val(&self.last) - 
size_of_val(&self.orderings)
+                + size_of::<ScalarValue>() * self.orderings.capacity();
+
+        if let Some(array) = self.last.get_array_ref() {
+            total += size_of::<Arc<dyn Array>>();
+            if let Some(pool) = pool {
+                claim_buffers_recursive(&array.to_data(), pool);
+            } else {
+                total += self.last.size() - size_of_val(&self.last);
+            }
+        } else {
+            total += self.last.size();
+        }
+
+        for scalar in &self.orderings {
+            if let Some(array) = scalar.get_array_ref() {
+                total += size_of::<Arc<dyn Array>>();
+                if let Some(pool) = pool {
+                    claim_buffers_recursive(&array.to_data(), pool);
+                } else {
+                    total += scalar.size() - size_of_val(scalar);
+                }
+            } else {
+                total += scalar.size() - size_of_val(scalar);
+            }

Review Comment:
   This pattern seems to be repeated several times across the project. Maybe a 
helper could be useful?



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -390,27 +393,25 @@ impl Accumulator for ArrayAggAccumulator {
         Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar())
     }
 
-    fn size(&self) -> usize {
-        size_of_val(self)
+    fn size(&self, pool: Option<&dyn MemoryPool>) -> usize {
+        let mut total = size_of_val(self)
             + (size_of::<ArrayRef>() * self.values.capacity())
-            + self
+            + self.datatype.size()
+            - size_of_val(&self.datatype);
+
+        if let Some(pool) = pool {
+            for arr in &self.values {
+                claim_buffers_recursive(&arr.to_data(), pool);

Review Comment:
   I imagine that `.size()` here can be called an arbitrary amount of times. 
What would happen with `claim_buffers_recursive` if this is called a lot of 
times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to