blaginin commented on code in PR #13377:
URL: https://github.com/apache/datafusion/pull/13377#discussion_r1838621019


##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -175,11 +247,103 @@ mod tests {
         )?;
 
         let file = BufReader::new(File::open(spill_file.path())?);
-        let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
+        let reader = FileReader::try_new(file, None)?;
 
         assert_eq!(reader.num_batches(), 4);
         assert_eq!(reader.schema(), schema);
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_record_batch_memory_size() {
+        // Create a simple record batch with two columns
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ints", DataType::Int32, true),
+            Field::new("float64", DataType::Float64, false),
+        ]));
+
+        let int_array =
+            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), 
Some(5)]);
+        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(int_array), Arc::new(float64_array)],
+        )
+        .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 60);

Review Comment:
   My biggest concern with this PR is that the result of 
`get_record_batch_memory_size` differs from `get_array_memory_size`. For 
example, here `batch.get_array_memory_size()` would return 252 instead of 60.
   
   This could be dangerous because the project would end up with two different 
methods of calculating memory sizes. I can imagine a scenario where we reserve 
memory based on one calculation method and shrink it using the result from the 
other. While the difference may not be large each time, over many repetitions 
or a large dataset, it could behave almost like a data leak, making debugging 
very challenging...



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
+/// Calculate total used memory of this batch.
+///
+/// This function is used to estimate the physical memory usage of the 
`RecordBatch`. The implementation will add up all unique `Buffer`'s memory
+/// size, due to:
+/// - The data pointer inside `Buffer` are memory regions returned by global 
memory
+///   allocator, those regions can't have overlap.
+/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have 
overlap
+///   or reuse the same `Buffer`. For example: taking a slice from `Array`.
+///
+/// Example:
+/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are 
pointing
+/// to a sub-region of the same buffer.
+///
+/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
+///       ^    ^  ^    ^
+///       |    |  |    |
+/// col1->{    }  |    |    
+/// col2--------->{    }
+///
+/// In the above case, `get_record_batch_memory_size` will return the size of
+/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
+///
+/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
+/// buffer memory size if multiple arrays within the batch are sharing the same
+/// `Buffer`. This method provides temporary fix until the issue is resolved:
+/// <https://github.com/apache/arrow-rs/issues/6439>
+pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
+    // Store pointers to `Buffer`'s start memory address (instead of actual
+    // used data region's pointer represented by current `Array`)
+    let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
+    let mut total_size = 0;
+
+    for array in batch.columns() {
+        let array_data = array.to_data();
+        count_array_data_memory_size(&array_data, &mut counted_buffers, &mut 
total_size);
+    }
+
+    total_size
+}
+
+/// Count the memory usage of `array_data` and its children recursively.
+fn count_array_data_memory_size(
+    array_data: &ArrayData,
+    counted_buffers: &mut HashSet<NonNull<u8>>,
+    total_size: &mut usize,
+) {
+    // Count memory usage for `array_data`
+    for buffer in array_data.buffers() {
+        if counted_buffers.insert(buffer.data_ptr()) {
+            *total_size += buffer.capacity();
+        } // Otherwise the buffer's memory is already counted
+    }
+
+    if let Some(null_buffer) = array_data.nulls() {
+        if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
+            *total_size += null_buffer.inner().inner().capacity();
+        }
+    }
+
+    // Count all children `ArrayData` recursively
+    for child in array_data.child_data() {
+        count_array_data_memory_size(child, counted_buffers, total_size);
+    }

Review Comment:
   Does it make sense to use `#[recursive]` to protect from cases with large 
nested data types?



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
+/// Calculate total used memory of this batch.
+///
+/// This function is used to estimate the physical memory usage of the 
`RecordBatch`. The implementation will add up all unique `Buffer`'s memory
+/// size, due to:
+/// - The data pointer inside `Buffer` are memory regions returned by global 
memory
+///   allocator, those regions can't have overlap.
+/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have 
overlap
+///   or reuse the same `Buffer`. For example: taking a slice from `Array`.
+///
+/// Example:
+/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are 
pointing
+/// to a sub-region of the same buffer.
+///
+/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
+///       ^    ^  ^    ^
+///       |    |  |    |
+/// col1->{    }  |    |    
+/// col2--------->{    }
+///
+/// In the above case, `get_record_batch_memory_size` will return the size of
+/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
+///
+/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
+/// buffer memory size if multiple arrays within the batch are sharing the same
+/// `Buffer`. This method provides temporary fix until the issue is resolved:
+/// <https://github.com/apache/arrow-rs/issues/6439>
+pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
+    // Store pointers to `Buffer`'s start memory address (instead of actual
+    // used data region's pointer represented by current `Array`)
+    let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
+    let mut total_size = 0;
+
+    for array in batch.columns() {
+        let array_data = array.to_data();
+        count_array_data_memory_size(&array_data, &mut counted_buffers, &mut 
total_size);
+    }
+
+    total_size
+}
+
+/// Count the memory usage of `array_data` and its children recursively.
+fn count_array_data_memory_size(
+    array_data: &ArrayData,
+    counted_buffers: &mut HashSet<NonNull<u8>>,
+    total_size: &mut usize,
+) {
+    // Count memory usage for `array_data`

Review Comment:
   nit, but you can probably add size of `array_data.data_type` itself



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -175,11 +247,103 @@ mod tests {
         )?;
 
         let file = BufReader::new(File::open(spill_file.path())?);
-        let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
+        let reader = FileReader::try_new(file, None)?;
 
         assert_eq!(reader.num_batches(), 4);
         assert_eq!(reader.schema(), schema);
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_record_batch_memory_size() {
+        // Create a simple record batch with two columns
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ints", DataType::Int32, true),
+            Field::new("float64", DataType::Float64, false),
+        ]));
+
+        let int_array =
+            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), 
Some(5)]);
+        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(int_array), Arc::new(float64_array)],
+        )
+        .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 60);

Review Comment:
   Should we completely switch to the new method, blocking the usage of the old 
one? Should we try to make two numbers match closely? 



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size(
     Ok(())
 }
 
+/// Calculate total used memory of this batch.
+///
+/// This function is used to estimate the physical memory usage of the 
`RecordBatch`. The implementation will add up all unique `Buffer`'s memory
+/// size, due to:
+/// - The data pointer inside `Buffer` are memory regions returned by global 
memory
+///   allocator, those regions can't have overlap.
+/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have 
overlap
+///   or reuse the same `Buffer`. For example: taking a slice from `Array`.
+///
+/// Example:
+/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are 
pointing
+/// to a sub-region of the same buffer.
+///
+/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
+///       ^    ^  ^    ^
+///       |    |  |    |
+/// col1->{    }  |    |    
+/// col2--------->{    }
+///
+/// In the above case, `get_record_batch_memory_size` will return the size of
+/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
+///
+/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
+/// buffer memory size if multiple arrays within the batch are sharing the same
+/// `Buffer`. This method provides temporary fix until the issue is resolved:
+/// <https://github.com/apache/arrow-rs/issues/6439>
+pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {

Review Comment:
   in TopK, `RecordBatchStore` still uses `get_array_memory_size`, do you think 
we should switch to `get_record_batch_memory_size` there as well?



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -175,11 +247,103 @@ mod tests {
         )?;
 
         let file = BufReader::new(File::open(spill_file.path())?);
-        let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
+        let reader = FileReader::try_new(file, None)?;
 
         assert_eq!(reader.num_batches(), 4);
         assert_eq!(reader.schema(), schema);
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_record_batch_memory_size() {
+        // Create a simple record batch with two columns
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ints", DataType::Int32, true),
+            Field::new("float64", DataType::Float64, false),
+        ]));
+
+        let int_array =
+            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), 
Some(5)]);
+        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(int_array), Arc::new(float64_array)],
+        )
+        .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 60);
+    }
+
+    #[test]
+    fn test_get_record_batch_memory_size_empty() {
+        // Test with empty record batch
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ints",
+            DataType::Int32,
+            false,
+        )]));
+
+        let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
+        let batch = RecordBatch::try_new(schema, 
vec![Arc::new(int_array)]).unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 0, "Empty batch should have 0 memory size");
+    }
+
+    #[test]
+    fn test_get_record_batch_memory_size_shared_buffer() {
+        // Test with slices that share the same underlying buffer
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("slice1", DataType::Int32, false),
+            Field::new("slice2", DataType::Int32, false),
+        ]));
+
+        let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let slice1 = original.slice(0, 3);
+        let slice2 = original.slice(2, 3);
+
+        let batch =
+            RecordBatch::try_new(schema, vec![Arc::new(slice1), 
Arc::new(slice2)])
+                .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        // The size should only count the shared buffer once
+        assert_eq!(size, 20);
+    }
+
+    #[test]
+    fn test_get_record_batch_memory_size_nested_array() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "nested_int",
+                DataType::List(Arc::new(Field::new("item", DataType::Int32, 
true))),
+                false,
+            ),
+            Field::new(
+                "nested_int2",
+                DataType::List(Arc::new(Field::new("item", DataType::Int32, 
true))),
+                false,
+            ),
+        ]));
+
+        let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![
+            Some(vec![Some(1), Some(2), Some(3)]),
+        ]);
+
+        let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![
+            Some(vec![Some(4), Some(5), Some(6)]),
+        ]);
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
+        )
+        .unwrap();
+
+        let size = get_record_batch_memory_size(&batch);
+        assert_eq!(size, 8320);
+    }
 }

Review Comment:
   <img width="649" alt="image" 
src="https://github.com/user-attachments/assets/6b518f0e-6835-4001-b1a6-cd55c845a864";>
   
   I think this line isn't covered, because I commented it out and all tests in 
this file passed. Let's add one more test? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to