adriangb commented on code in PR #19444:
URL: https://github.com/apache/datafusion/pull/19444#discussion_r2813210726


##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -344,6 +344,138 @@ fn get_max_alignment_for_schema(schema: &Schema) -> usize 
{
     max_alignment
 }
 
+/// Size of a single view structure in StringView/BinaryView arrays (in bytes).
+/// Each view is 16 bytes: 4 bytes length + 4 bytes prefix + 8 bytes buffer 
ID/offset.
+#[cfg(test)]
+const VIEW_SIZE_BYTES: usize = 16;
+
+/// Performs garbage collection on StringView and BinaryView arrays before 
spilling to reduce memory usage.
+///
+/// # Why GC is needed
+///
+/// StringView and BinaryView arrays can accumulate significant memory waste 
when sliced.
+/// When a large array is sliced (e.g., taking first 100 rows of 1000), the 
view array
+/// still references the original data buffers containing all 1000 rows of 
data.
+///
+/// For example, in the ClickBench benchmark (issue #19414), repeated slicing 
of StringView
+/// arrays resulted in 820MB of spill files that could be reduced to just 33MB 
after GC -
+/// a 96% reduction in size.
+///
+/// # How it works
+///
+/// The GC process creates new compact buffers containing only the data 
referenced by the
+/// current views, eliminating unreferenced data from sliced arrays.
+pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
+    // Early return optimization: Skip GC entirely if the batch contains no 
view arrays.
+    // This avoids unnecessary processing for batches with only primitive 
types.
+    let has_view_arrays = batch.columns().iter().any(|array| {
+        matches!(
+            array.data_type(),
+            arrow::datatypes::DataType::Utf8View | 
arrow::datatypes::DataType::BinaryView
+        )
+    });
+
+    if !has_view_arrays {
+        // RecordBatch::clone() is cheap - just Arc reference count bumps
+        return Ok(batch.clone());
+    }
+
+    let mut new_columns: Vec<Arc<dyn Array>> = 
Vec::with_capacity(batch.num_columns());
+
+    for array in batch.columns() {
+        let gc_array = match array.data_type() {
+            arrow::datatypes::DataType::Utf8View => {
+                let string_view = array
+                    .as_any()
+                    .downcast_ref::<StringViewArray>()
+                    .expect("Utf8View array should downcast to 
StringViewArray");
+                // Only perform GC if the data buffers exceed our size 
threshold.
+                // This balances memory savings against GC overhead.
+                if should_gc_view_array(string_view.data_buffers()) {
+                    Arc::new(string_view.gc()) as Arc<dyn Array>
+                } else {
+                    Arc::clone(array)
+                }
+            }
+            arrow::datatypes::DataType::BinaryView => {
+                let binary_view = array
+                    .as_any()
+                    .downcast_ref::<BinaryViewArray>()
+                    .expect("BinaryView array should downcast to 
BinaryViewArray");
+                // Only perform GC if the data buffers exceed our size 
threshold.
+                // This balances memory savings against GC overhead.
+                if should_gc_view_array(binary_view.data_buffers()) {
+                    Arc::new(binary_view.gc()) as Arc<dyn Array>
+                } else {
+                    Arc::clone(array)
+                }
+            }
+            // Non-view arrays are passed through unchanged
+            _ => Arc::clone(array),
+        };
+        new_columns.push(gc_array);
+    }
+
+    // Always return a new batch for consistency
+    Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
+}
+
+/// Determines whether a view array should be garbage collected based on its 
buffer usage.
+///
+/// Uses a minimum buffer size threshold to avoid unnecessary GC on small 
arrays.
+/// This prevents the overhead of GC for arrays with negligible memory 
footprint,
+/// while still capturing cases where sliced arrays carry large unreferenced 
buffers.
+///
+/// # Why not use get_record_batch_memory_size
+///
+/// We use manual buffer size calculation here because:
+/// - `get_record_batch_memory_size` operates on entire arrays, not just the 
data buffers
+/// - We need to check buffer capacity specifically to determine GC potential
+/// - The data buffers are what gets compacted during GC, so their size is the 
key metric
+fn should_gc_view_array(data_buffers: &[arrow::buffer::Buffer]) -> bool {

Review Comment:
   Can we check if there is anything to gc (i.e. if the array is sliced)? Or is 
`gc()` a no-op in that case?



##########
datafusion/physical-plan/src/spill/in_progress_spill_file.rs:
##########
@@ -50,6 +50,7 @@ impl InProgressSpillFile {
     }
 
     /// Appends a `RecordBatch` to the spill file, initializing the writer if 
necessary.
+    /// Performs garbage collection on StringView/BinaryView arrays to reduce 
spill file size.

Review Comment:
   @EeshanBembi could you enhance the comment here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to