2010YOUY01 commented on code in PR #19444:
URL: https://github.com/apache/datafusion/pull/19444#discussion_r2641790336


##########
datafusion/physical-plan/src/spill/in_progress_spill_file.rs:
##########
@@ -50,6 +50,7 @@ impl InProgressSpillFile {
     }
 
     /// Appends a `RecordBatch` to the spill file, initializing the writer if 
necessary.
+    /// Performs garbage collection on StringView/BinaryView arrays to reduce 
spill file size.

Review Comment:
   I recommend to add more comments to explain the rationale for views gc, 
perhaps just copy and paste from 
https://github.com/apache/datafusion/blob/bb9a4a7ea19896fab7da6554cf5dd103e50d8e74/datafusion/physical-plan/src/sorts/sort.rs#L483



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -799,4 +890,426 @@ mod tests {
         assert_eq!(alignment, 8);
         Ok(())
     }
+
+    #[test]
+    fn test_gc_string_view_before_spill() -> Result<()> {
+        use arrow::array::StringViewArray;
+
+        let strings: Vec<String> = (0..1000)
+            .map(|i| {
+                if i % 2 == 0 {
+                    "short_string".to_string()
+                } else {
+                    
"this_is_a_much_longer_string_that_will_not_be_inlined".to_string()
+                }
+            })
+            .collect();
+
+        let string_array = StringViewArray::from(strings);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "strings",
+            DataType::Utf8View,
+            false,
+        )]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(string_array) as ArrayRef],
+        )?;
+        let sliced_batch = batch.slice(0, 100);
+        let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+        assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
+        assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_gc_binary_view_before_spill() -> Result<()> {
+        use arrow::array::BinaryViewArray;
+
+        let binaries: Vec<Vec<u8>> = (0..1000)
+            .map(|i| {
+                if i % 2 == 0 {
+                    vec![1, 2, 3, 4]
+                } else {
+                    vec![1; 50]
+                }
+            })
+            .collect();
+
+        let binary_array =
+            BinaryViewArray::from_iter(binaries.iter().map(|b| 
Some(b.as_slice())));
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "binaries",
+            DataType::BinaryView,
+            false,
+        )]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(binary_array) as ArrayRef],
+        )?;
+        let sliced_batch = batch.slice(0, 100);
+        let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+        assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
+        assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_gc_skips_small_arrays() -> Result<()> {
+        use arrow::array::StringViewArray;
+
+        let strings: Vec<String> = (0..10).map(|i| 
format!("string_{i}")).collect();
+
+        let string_array = StringViewArray::from(strings);
+        let array_ref: ArrayRef = Arc::new(string_array);
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "strings",
+            DataType::Utf8View,
+            false,
+        )]));
+
+        let batch = RecordBatch::try_new(Arc::clone(&schema), 
vec![array_ref])?;
+
+        // GC should return the original batch for small arrays
+        let gc_batch = gc_view_arrays(&batch)?;
+
+        // The batch should be unchanged (cloned, not GC'd)
+        assert_eq!(gc_batch.num_rows(), batch.num_rows());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_gc_with_mixed_columns() -> Result<()> {
+        use arrow::array::{Int32Array, StringViewArray};
+
+        let strings: Vec<String> = (0..200)
+            .map(|i| format!("long_string_for_gc_testing_{i}"))
+            .collect();
+
+        let string_array = StringViewArray::from(strings);
+        let int_array = Int32Array::from((0..200).collect::<Vec<i32>>());
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("strings", DataType::Utf8View, false),
+            Field::new("ints", DataType::Int32, false),
+        ]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(string_array) as ArrayRef,
+                Arc::new(int_array) as ArrayRef,
+            ],
+        )?;
+
+        let sliced_batch = batch.slice(0, 50);
+        let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+        assert_eq!(gc_batch.num_columns(), 2);
+        assert_eq!(gc_batch.num_rows(), 50);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_verify_gc_triggers_for_sliced_arrays() -> Result<()> {
+        let strings: Vec<String> = (0..1000)
+            .map(|i| {
+                format!(
+                    
"http://example.com/very/long/path/that/exceeds/inline/threshold/{i}";
+                )
+            })
+            .collect();
+
+        let string_array = StringViewArray::from(strings);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "url",
+            DataType::Utf8View,
+            false,
+        )]));
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(string_array.clone()) as ArrayRef],
+        )?;
+
+        let sliced = batch.slice(0, 100);
+
+        let sliced_array = sliced
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringViewArray>()
+            .unwrap();
+        let should_gc =
+            should_gc_view_array(sliced_array.len(), 
sliced_array.data_buffers());
+        let waste_ratio = calculate_string_view_waste_ratio(sliced_array);
+
+        assert!(
+            waste_ratio > 0.8,
+            "Waste ratio should be > 0.8 for sliced array"
+        );
+        assert!(
+            should_gc,
+            "GC should trigger for sliced array with high waste"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_reproduce_issue_19414_string_view_spill_without_gc() -> Result<()> 
{
+        use arrow::array::StringViewArray;
+        use std::fs;
+
+        let num_rows = 5000;
+        let mut strings = Vec::with_capacity(num_rows);
+
+        for i in 0..num_rows {
+            let url = match i % 5 {
+                0 => format!(
+                    
"http://irr.ru/index.php?showalbum/login-leniya7777294,938303130/{i}";
+                ),
+                1 => 
format!("http://komme%2F27.0.1453.116/very/long/path/{i}";),
+                2 => format!("https://produkty%2Fproduct/category/item/{i}";),
+                3 => format!(
+                    
"http://irr.ru/index.php?showalbum/login-kapusta-advert2668/{i}";
+                ),
+                4 => format!(
+                    
"http://irr.ru/index.php?showalbum/login-kapustic/product/{i}";
+                ),
+                _ => unreachable!(),
+            };
+            strings.push(url);
+        }
+
+        let string_array = StringViewArray::from(strings);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "URL",
+            DataType::Utf8View,
+            false,
+        )]));
+
+        let original_batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(string_array.clone()) as ArrayRef],
+        )?;
+
+        let total_buffer_size: usize = string_array
+            .data_buffers()
+            .iter()
+            .map(|buffer| buffer.capacity())
+            .sum();
+
+        let mut sliced_batches = Vec::new();
+        let slice_size = 100;
+
+        for i in (0..num_rows).step_by(slice_size) {
+            let len = std::cmp::min(slice_size, num_rows - i);
+            let sliced = original_batch.slice(i, len);
+            sliced_batches.push(sliced);
+        }
+
+        let env = Arc::new(RuntimeEnv::default());
+        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+        let spill_manager = SpillManager::new(env, metrics, schema);
+
+        let mut in_progress_file = spill_manager.create_in_progress_file("Test 
GC")?;
+
+        for batch in &sliced_batches {
+            in_progress_file.append_batch(batch)?;
+        }
+
+        let spill_file = in_progress_file.finish()?.unwrap();
+        let file_size = fs::metadata(spill_file.path())?.len() as usize;
+
+        let theoretical_without_gc = total_buffer_size * sliced_batches.len();
+        let reduction_percent = ((theoretical_without_gc - file_size) as f64
+            / theoretical_without_gc as f64)
+            * 100.0;
+
+        assert!(
+            reduction_percent > 80.0,
+            "GC should reduce spill file size by >80%, got 
{reduction_percent:.1}%"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exact_clickbench_issue_19414() -> Result<()> {
+        use arrow::array::StringViewArray;
+        use std::fs;
+
+        // Test for clickbench issue: 820MB -> 33MB spill reduction
+        let unique_urls = vec![
+            "http://irr.ru/index.php?showalbum/login-leniya7777294,938303130";,
+            "http://komme%2F27.0.1453.116";,
+            "https://produkty%2Fproduct";,
+            
"http://irr.ru/index.php?showalbum/login-kapusta-advert2668]=0&order_by=0";,
+            "http://irr.ru/index.php?showalbum/login-kapustic/product_name";,
+            "http://irr.ru/index.php";,
+            "https://produkty%2F";,
+            "http://irr.ru/index.php?showalbum/login";,
+            "https://produkty/kurortmag";,
+            "https://produkty%2Fpulove.ru/album/login";,
+        ];
+
+        let mut urls = Vec::with_capacity(200_000);

Review Comment:
   Yes, it would be great to make tests faster and use less memory.



##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -339,6 +339,97 @@ fn get_max_alignment_for_schema(schema: &Schema) -> usize {
     max_alignment
 }
 
+#[cfg(test)]
+const VIEW_SIZE_BYTES: usize = 16;
+#[cfg(test)]
+const INLINE_THRESHOLD: usize = 12;
+
+/// Performs garbage collection on view arrays before spilling.
+pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
+    let mut new_columns: Vec<Arc<dyn Array>> = 
Vec::with_capacity(batch.num_columns());
+    let mut any_gc_performed = false;
+
+    for array in batch.columns() {
+        let gc_array = match array.data_type() {
+            arrow::datatypes::DataType::Utf8View => {
+                let string_view = array
+                    .as_any()
+                    .downcast_ref::<StringViewArray>()
+                    .expect("Utf8View array should downcast to 
StringViewArray");
+                if should_gc_view_array(string_view.len(), 
string_view.data_buffers()) {
+                    any_gc_performed = true;
+                    Arc::new(string_view.gc()) as Arc<dyn Array>
+                } else {
+                    Arc::clone(array)
+                }
+            }
+            arrow::datatypes::DataType::BinaryView => {
+                let binary_view = array
+                    .as_any()
+                    .downcast_ref::<BinaryViewArray>()
+                    .expect("BinaryView array should downcast to 
BinaryViewArray");
+                if should_gc_view_array(binary_view.len(), 
binary_view.data_buffers()) {
+                    any_gc_performed = true;
+                    Arc::new(binary_view.gc()) as Arc<dyn Array>
+                } else {
+                    Arc::clone(array)
+                }
+            }
+            _ => Arc::clone(array),
+        };
+        new_columns.push(gc_array);
+    }
+
+    if any_gc_performed {
+        Ok(RecordBatch::try_new(batch.schema(), new_columns)?)

Review Comment:
   nit: I think we can get rid of the `any_gc_performed` condition, and always 
go this branch to make it a little bit simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to