martin-g commented on code in PR #21633:
URL: https://github.com/apache/datafusion/pull/21633#discussion_r3084310770


##########
datafusion/physical-plan/Cargo.toml:
##########
@@ -48,6 +48,7 @@ name = "datafusion_physical_plan"
 
 [dependencies]
 arrow = { workspace = true }
+arrow-data = { workspace = true }

Review Comment:
   This dependency seems unused.
   The only occurrence of `arrow_data` is at 
https://github.com/apache/datafusion/pull/21633/changes#diff-1f7d15c867929af294664ebbde4e8c9038186222cbb95ed86e527406cf066e84R463
 for a test helper.



##########
datafusion/physical-plan/src/spill/spill_manager.rs:
##########


Review Comment:
   The same is needed for BinaryViewArray, no ?



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -344,6 +346,153 @@ fn get_max_alignment_for_schema(schema: &Schema) -> usize 
{
     max_alignment
 }
 
+/// Size of a single view structure in StringView/BinaryView arrays (in bytes).
+/// Each view is 16 bytes: 4 bytes length + 4 bytes prefix + 8 bytes buffer 
ID/offset.
+const VIEW_SIZE_BYTES: usize = 16;
+
+/// Performs garbage collection on StringView and BinaryView arrays before 
spilling to reduce memory usage.
+///
+/// # Why GC is needed
+///
+/// StringView and BinaryView arrays can accumulate significant memory waste 
when sliced.
+/// When a large array is sliced (e.g., taking first 100 rows of 1000), the 
view array
+/// still references the original data buffers containing all 1000 rows of 
data.
+///
+/// For example, in the ClickBench benchmark (issue #19414), repeated slicing 
of StringView
+/// arrays resulted in 820MB of spill files that could be reduced to just 33MB 
after GC -
+/// a 96% reduction in size.
+///
+/// # How it works
+///
+/// The GC process:
+/// 1. Identifies view arrays (StringView/BinaryView) in the batch
+/// 2. Checks if their data buffers exceed a memory threshold
+/// 3. If exceeded, calls the Arrow `gc()` method which creates new compact 
buffers
+///    containing only the data referenced by the current views
+/// 4. Returns a new batch with GC'd arrays (or original arrays if GC not 
needed)
+///
+/// # When GC is triggered
+///
+/// GC is only performed when data buffers exceed a threshold (currently 10KB).
+/// This balances memory savings against the CPU overhead of garbage 
collection.
+/// Small arrays are passed through unchanged since the GC overhead would 
exceed
+/// any memory savings.
+///
+/// # Performance considerations
+///
+/// The function always returns a new RecordBatch for API consistency, but:
+/// - If no view arrays are present, it's a cheap clone (just Arc increments)
+/// - GC is skipped for small buffers to avoid unnecessary CPU overhead
+/// - The Arrow `gc()` method itself is optimized and only copies referenced 
data
+pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
+    // Early return optimization: Skip GC entirely if the batch contains no 
view arrays.
+    // This avoids unnecessary processing for batches with only primitive 
types.
+    let has_view_arrays = batch.columns().iter().any(|array| {
+        matches!(
+            array.data_type(),
+            arrow::datatypes::DataType::Utf8View | 
arrow::datatypes::DataType::BinaryView
+        )
+    });
+
+    if !has_view_arrays {
+        // RecordBatch::clone() is cheap - just Arc reference count bumps
+        return Ok(batch.clone());
+    }
+
+    let mut new_columns: Vec<Arc<dyn Array>> = 
Vec::with_capacity(batch.num_columns());
+
+    for array in batch.columns() {
+        let gc_array = match array.data_type() {
+            arrow::datatypes::DataType::Utf8View => {
+                let string_view = array
+                    .as_any()
+                    .downcast_ref::<StringViewArray>()
+                    .expect("Utf8View array should downcast to 
StringViewArray");
+                // Only perform GC if the array appears to be sliced (has 
potential waste).
+                // The gc() method internally checks if GC is beneficial.
+                if should_gc_view_array(string_view) {
+                    Arc::new(string_view.gc()) as Arc<dyn Array>
+                } else {
+                    Arc::clone(array)
+                }
+            }
+            arrow::datatypes::DataType::BinaryView => {
+                let binary_view = array
+                    .as_any()
+                    .downcast_ref::<BinaryViewArray>()
+                    .expect("BinaryView array should downcast to 
BinaryViewArray");
+                // Only perform GC if the array appears to be sliced (has 
potential waste).
+                // The gc() method internally checks if GC is beneficial.
+                if should_gc_view_array(binary_view) {
+                    Arc::new(binary_view.gc()) as Arc<dyn Array>
+                } else {
+                    Arc::clone(array)
+                }
+            }
+            // Non-view arrays are passed through unchanged
+            _ => Arc::clone(array),
+        };
+        new_columns.push(gc_array);
+    }
+
+    // Always return a new batch for consistency
+    Ok(RecordBatch::try_new(batch.schema(), new_columns)?)

Review Comment:
   Would there be a noticeable gain if the batch is just cloned when there were 
no garbage collections ?
   I.e.
   ```rust
   let mut mutated = false;
   ...
   if should_gc_view_array(a_view) {
      mutated = true;
      ...
   }
   ...
   
   if mutated {
       Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
   } else {
       Ok(batch.clone())
   }
   ```



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -866,4 +1015,312 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_gc_string_view_before_spill() -> Result<()> {
+        use arrow::array::StringViewArray;
+
+        let strings: Vec<String> = (0..200)
+            .map(|i| {
+                if i % 2 == 0 {
+                    "short_string".to_string()
+                } else {
+                    
"this_is_a_much_longer_string_that_will_not_be_inlined".to_string()
+                }
+            })
+            .collect();
+
+        let string_array = StringViewArray::from(strings);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "strings",
+            DataType::Utf8View,
+            false,
+        )]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(string_array) as ArrayRef],
+        )?;
+        let sliced_batch = batch.slice(0, 20);
+        let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+        assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
+        assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_gc_binary_view_before_spill() -> Result<()> {
+        use arrow::array::BinaryViewArray;
+
+        let binaries: Vec<Vec<u8>> = (0..200)
+            .map(|i| {
+                if i % 2 == 0 {
+                    vec![1, 2, 3, 4]
+                } else {
+                    vec![1; 50]
+                }
+            })
+            .collect();
+
+        let binary_array =
+            BinaryViewArray::from_iter(binaries.iter().map(|b| 
Some(b.as_slice())));
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "binaries",
+            DataType::BinaryView,
+            false,
+        )]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(binary_array) as ArrayRef],
+        )?;
+        let sliced_batch = batch.slice(0, 20);
+        let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+        assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
+        assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_gc_skips_small_arrays() -> Result<()> {
+        use arrow::array::StringViewArray;
+
+        let strings: Vec<String> = (0..10).map(|i| 
format!("string_{i}")).collect();
+
+        let string_array = StringViewArray::from(strings);
+        let array_ref: ArrayRef = Arc::new(string_array);
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "strings",
+            DataType::Utf8View,
+            false,
+        )]));
+
+        let batch = RecordBatch::try_new(Arc::clone(&schema), 
vec![array_ref])?;
+
+        // GC should return the original batch for small arrays
+        let gc_batch = gc_view_arrays(&batch)?;
+
+        // The batch should be unchanged (cloned, not GC'd)
+        assert_eq!(gc_batch.num_rows(), batch.num_rows());

Review Comment:
   This would be true even if there was a GC, no ?
   Wouldn't it better to assert `let should_gc = 
should_gc_view_array(string_array); assert!(!should_gc);` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to