alamb commented on code in PR #13377:
URL: https://github.com/apache/datafusion/pull/13377#discussion_r1841846876


##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -175,11 +247,103 @@ mod tests {
         )?;
 
         let file = BufReader::new(File::open(spill_file.path())?);
-        let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
+        let reader = FileReader::try_new(file, None)?;
 
         assert_eq!(reader.num_batches(), 4);
         assert_eq!(reader.schema(), schema);
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_record_batch_memory_size() {
+        // Create a simple record batch with two columns
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ints", DataType::Int32, true),
+            Field::new("float64", DataType::Float64, false),
+        ]));
+
+        let int_array =
+            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), 
Some(5)]);
+        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(int_array), Arc::new(float64_array)],
+        )
+        .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 60);
+    }
+
+    #[test]
+    fn test_get_record_batch_memory_size_empty() {
+        // Test with empty record batch
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ints",
+            DataType::Int32,
+            false,
+        )]));
+
+        let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
+        let batch = RecordBatch::try_new(schema, 
vec![Arc::new(int_array)]).unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 0, "Empty batch should have 0 memory size");
+    }
+
+    #[test]
+    fn test_get_record_batch_memory_size_shared_buffer() {
+        // Test with slices that share the same underlying buffer
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("slice1", DataType::Int32, false),
+            Field::new("slice2", DataType::Int32, false),
+        ]));
+
+        let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let slice1 = original.slice(0, 3);
+        let slice2 = original.slice(2, 3);
+
+        let batch =

Review Comment:
   💯 



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -175,11 +247,103 @@ mod tests {
         )?;
 
         let file = BufReader::new(File::open(spill_file.path())?);
-        let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
+        let reader = FileReader::try_new(file, None)?;
 
         assert_eq!(reader.num_batches(), 4);
         assert_eq!(reader.schema(), schema);
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_record_batch_memory_size() {
+        // Create a simple record batch with two columns
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ints", DataType::Int32, true),
+            Field::new("float64", DataType::Float64, false),
+        ]));
+
+        let int_array =
+            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), 
Some(5)]);
+        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(int_array), Arc::new(float64_array)],
+        )
+        .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 60);
+    }
+
+    #[test]
+    fn test_get_record_batch_memory_size_empty() {
+        // Test with empty record batch
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ints",
+            DataType::Int32,
+            false,
+        )]));
+
+        let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
+        let batch = RecordBatch::try_new(schema, 
vec![Arc::new(int_array)]).unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 0, "Empty batch should have 0 memory size");
+    }
+
+    #[test]
+    fn test_get_record_batch_memory_size_shared_buffer() {
+        // Test with slices that share the same underlying buffer
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("slice1", DataType::Int32, false),
+            Field::new("slice2", DataType::Int32, false),
+        ]));
+
+        let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let slice1 = original.slice(0, 3);
+        let slice2 = original.slice(2, 3);
+
+        let batch =
+            RecordBatch::try_new(schema, vec![Arc::new(slice1), 
Arc::new(slice2)])
+                .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        // The size should only count the shared buffer once

Review Comment:
   It would be good in my mind to change this test so that rather than testing 
a hard coded size, it would compute the size of a single slice and verify that 
is the same
   
   that way the test would verify the actual invariant (that the sizes are the 
same) rather than relying on keeping the two values in sync



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
+/// Calculate total used memory of this batch.

Review Comment:
   💯  for this comment



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
+/// Calculate total used memory of this batch.
+///
+/// This function is used to estimate the physical memory usage of the 
`RecordBatch`. The implementation will add up all unique `Buffer`'s memory
+/// size, due to:
+/// - The data pointer inside `Buffer` are memory regions returned by global 
memory
+///   allocator, those regions can't have overlap.
+/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have 
overlap
+///   or reuse the same `Buffer`. For example: taking a slice from `Array`.
+///
+/// Example:
+/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are 
pointing
+/// to a sub-region of the same buffer.
+///
+/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
+///       ^    ^  ^    ^
+///       |    |  |    |
+/// col1->{    }  |    |    
+/// col2--------->{    }
+///
+/// In the above case, `get_record_batch_memory_size` will return the size of
+/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
+///
+/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
+/// buffer memory size if multiple arrays within the batch are sharing the same
+/// `Buffer`. This method provides temporary fix until the issue is resolved:
+/// <https://github.com/apache/arrow-rs/issues/6439>
+pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
+    // Store pointers to `Buffer`'s start memory address (instead of actual
+    // used data region's pointer represented by current `Array`)
+    let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
+    let mut total_size = 0;
+
+    for array in batch.columns() {
+        let array_data = array.to_data();
+        count_array_data_memory_size(&array_data, &mut counted_buffers, &mut 
total_size);
+    }
+
+    total_size
+}
+
+/// Count the memory usage of `array_data` and its children recursively.
+fn count_array_data_memory_size(
+    array_data: &ArrayData,
+    counted_buffers: &mut HashSet<NonNull<u8>>,
+    total_size: &mut usize,
+) {
+    // Count memory usage for `array_data`
+    for buffer in array_data.buffers() {
+        if counted_buffers.insert(buffer.data_ptr()) {
+            *total_size += buffer.capacity();
+        } // Otherwise the buffer's memory is already counted
+    }
+
+    if let Some(null_buffer) = array_data.nulls() {
+        if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
+            *total_size += null_buffer.inner().inner().capacity();
+        }
+    }
+
+    // Count all children `ArrayData` recursively
+    for child in array_data.child_data() {
+        count_array_data_memory_size(child, counted_buffers, total_size);
+    }

Review Comment:
   yes I agree we don't need to annoate all recursive function calls -- only 
the ones that will become very large/deep



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -175,11 +247,103 @@ mod tests {
         )?;
 
         let file = BufReader::new(File::open(spill_file.path())?);
-        let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
+        let reader = FileReader::try_new(file, None)?;
 
         assert_eq!(reader.num_batches(), 4);
         assert_eq!(reader.schema(), schema);
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_record_batch_memory_size() {
+        // Create a simple record batch with two columns
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ints", DataType::Int32, true),
+            Field::new("float64", DataType::Float64, false),
+        ]));
+
+        let int_array =
+            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), 
Some(5)]);
+        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(int_array), Arc::new(float64_array)],
+        )
+        .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 60);

Review Comment:
   Finding a way to automatically update the memory accounting is certainly a 
good idea in my mind. As we have mentioned, I think the most important thing 
will be to find a way to account for arrow buffers completely Then we can work 
it into DataFusion



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
+/// Calculate total used memory of this batch.
+///
+/// This function is used to estimate the physical memory usage of the 
`RecordBatch`. The implementation will add up all unique `Buffer`'s memory
+/// size, due to:
+/// - The data pointer inside `Buffer` are memory regions returned by global 
memory
+///   allocator, those regions can't have overlap.
+/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have 
overlap
+///   or reuse the same `Buffer`. For example: taking a slice from `Array`.
+///
+/// Example:
+/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are 
pointing
+/// to a sub-region of the same buffer.
+///
+/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
+///       ^    ^  ^    ^
+///       |    |  |    |
+/// col1->{    }  |    |    
+/// col2--------->{    }
+///
+/// In the above case, `get_record_batch_memory_size` will return the size of
+/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
+///
+/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
+/// buffer memory size if multiple arrays within the batch are sharing the same
+/// `Buffer`. This method provides temporary fix until the issue is resolved:
+/// <https://github.com/apache/arrow-rs/issues/6439>
+pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {

Review Comment:
   Cool -- can you possibly file a ticket to track any work that you know 
about?  I can help file it / with the explanation as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to