bharath-techie commented on code in PR #19444:
URL: https://github.com/apache/datafusion/pull/19444#discussion_r2639817545
##########
datafusion/physical-plan/src/spill/mod.rs:
##########
@@ -799,4 +890,426 @@ mod tests {
assert_eq!(alignment, 8);
Ok(())
}
+
+ #[test]
+ fn test_gc_string_view_before_spill() -> Result<()> {
+ use arrow::array::StringViewArray;
+
+ let strings: Vec<String> = (0..1000)
+ .map(|i| {
+ if i % 2 == 0 {
+ "short_string".to_string()
+ } else {
+
"this_is_a_much_longer_string_that_will_not_be_inlined".to_string()
+ }
+ })
+ .collect();
+
+ let string_array = StringViewArray::from(strings);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "strings",
+ DataType::Utf8View,
+ false,
+ )]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(string_array) as ArrayRef],
+ )?;
+ let sliced_batch = batch.slice(0, 100);
+ let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+ assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
+ assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_gc_binary_view_before_spill() -> Result<()> {
+ use arrow::array::BinaryViewArray;
+
+ let binaries: Vec<Vec<u8>> = (0..1000)
+ .map(|i| {
+ if i % 2 == 0 {
+ vec![1, 2, 3, 4]
+ } else {
+ vec![1; 50]
+ }
+ })
+ .collect();
+
+ let binary_array =
+ BinaryViewArray::from_iter(binaries.iter().map(|b|
Some(b.as_slice())));
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "binaries",
+ DataType::BinaryView,
+ false,
+ )]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(binary_array) as ArrayRef],
+ )?;
+ let sliced_batch = batch.slice(0, 100);
+ let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+ assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows());
+ assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_gc_skips_small_arrays() -> Result<()> {
+ use arrow::array::StringViewArray;
+
+ let strings: Vec<String> = (0..10).map(|i|
format!("string_{i}")).collect();
+
+ let string_array = StringViewArray::from(strings);
+ let array_ref: ArrayRef = Arc::new(string_array);
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "strings",
+ DataType::Utf8View,
+ false,
+ )]));
+
+ let batch = RecordBatch::try_new(Arc::clone(&schema),
vec![array_ref])?;
+
+ // GC should return the original batch for small arrays
+ let gc_batch = gc_view_arrays(&batch)?;
+
+ // The batch should be unchanged (cloned, not GC'd)
+ assert_eq!(gc_batch.num_rows(), batch.num_rows());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_gc_with_mixed_columns() -> Result<()> {
+ use arrow::array::{Int32Array, StringViewArray};
+
+ let strings: Vec<String> = (0..200)
+ .map(|i| format!("long_string_for_gc_testing_{i}"))
+ .collect();
+
+ let string_array = StringViewArray::from(strings);
+ let int_array = Int32Array::from((0..200).collect::<Vec<i32>>());
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("strings", DataType::Utf8View, false),
+ Field::new("ints", DataType::Int32, false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(string_array) as ArrayRef,
+ Arc::new(int_array) as ArrayRef,
+ ],
+ )?;
+
+ let sliced_batch = batch.slice(0, 50);
+ let gc_batch = gc_view_arrays(&sliced_batch)?;
+
+ assert_eq!(gc_batch.num_columns(), 2);
+ assert_eq!(gc_batch.num_rows(), 50);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_verify_gc_triggers_for_sliced_arrays() -> Result<()> {
+ let strings: Vec<String> = (0..1000)
+ .map(|i| {
+ format!(
+
"http://example.com/very/long/path/that/exceeds/inline/threshold/{i}"
+ )
+ })
+ .collect();
+
+ let string_array = StringViewArray::from(strings);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "url",
+ DataType::Utf8View,
+ false,
+ )]));
+
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![Arc::new(string_array.clone()) as ArrayRef],
+ )?;
+
+ let sliced = batch.slice(0, 100);
+
+ let sliced_array = sliced
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringViewArray>()
+ .unwrap();
+ let should_gc =
+ should_gc_view_array(sliced_array.len(),
sliced_array.data_buffers());
+ let waste_ratio = calculate_string_view_waste_ratio(sliced_array);
+
+ assert!(
+ waste_ratio > 0.8,
+ "Waste ratio should be > 0.8 for sliced array"
+ );
+ assert!(
+ should_gc,
+ "GC should trigger for sliced array with high waste"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_reproduce_issue_19414_string_view_spill_without_gc() -> Result<()>
{
+ use arrow::array::StringViewArray;
+ use std::fs;
+
+ let num_rows = 5000;
+ let mut strings = Vec::with_capacity(num_rows);
+
+ for i in 0..num_rows {
+ let url = match i % 5 {
+ 0 => format!(
+
"http://irr.ru/index.php?showalbum/login-leniya7777294,938303130/{i}"
+ ),
+ 1 =>
format!("http://komme%2F27.0.1453.116/very/long/path/{i}"),
+ 2 => format!("https://produkty%2Fproduct/category/item/{i}"),
+ 3 => format!(
+
"http://irr.ru/index.php?showalbum/login-kapusta-advert2668/{i}"
+ ),
+ 4 => format!(
+
"http://irr.ru/index.php?showalbum/login-kapustic/product/{i}"
+ ),
+ _ => unreachable!(),
+ };
+ strings.push(url);
+ }
+
+ let string_array = StringViewArray::from(strings);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "URL",
+ DataType::Utf8View,
+ false,
+ )]));
+
+ let original_batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(string_array.clone()) as ArrayRef],
+ )?;
+
+ let total_buffer_size: usize = string_array
+ .data_buffers()
+ .iter()
+ .map(|buffer| buffer.capacity())
+ .sum();
+
+ let mut sliced_batches = Vec::new();
+ let slice_size = 100;
+
+ for i in (0..num_rows).step_by(slice_size) {
+ let len = std::cmp::min(slice_size, num_rows - i);
+ let sliced = original_batch.slice(i, len);
+ sliced_batches.push(sliced);
+ }
+
+ let env = Arc::new(RuntimeEnv::default());
+ let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+ let spill_manager = SpillManager::new(env, metrics, schema);
+
+ let mut in_progress_file = spill_manager.create_in_progress_file("Test
GC")?;
+
+ for batch in &sliced_batches {
+ in_progress_file.append_batch(batch)?;
+ }
+
+ let spill_file = in_progress_file.finish()?.unwrap();
+ let file_size = fs::metadata(spill_file.path())?.len() as usize;
+
+ let theoretical_without_gc = total_buffer_size * sliced_batches.len();
+ let reduction_percent = ((theoretical_without_gc - file_size) as f64
+ / theoretical_without_gc as f64)
+ * 100.0;
+
+ assert!(
+ reduction_percent > 80.0,
+ "GC should reduce spill file size by >80%, got
{reduction_percent:.1}%"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_exact_clickbench_issue_19414() -> Result<()> {
+ use arrow::array::StringViewArray;
+ use std::fs;
+
+ // Test for clickbench issue: 820MB -> 33MB spill reduction
+ let unique_urls = vec![
+ "http://irr.ru/index.php?showalbum/login-leniya7777294,938303130",
+ "http://komme%2F27.0.1453.116",
+ "https://produkty%2Fproduct",
+
"http://irr.ru/index.php?showalbum/login-kapusta-advert2668]=0&order_by=0",
+ "http://irr.ru/index.php?showalbum/login-kapustic/product_name",
+ "http://irr.ru/index.php",
+ "https://produkty%2F",
+ "http://irr.ru/index.php?showalbum/login",
+ "https://produkty/kurortmag",
+ "https://produkty%2Fpulove.ru/album/login",
+ ];
+
+ let mut urls = Vec::with_capacity(200_000);
Review Comment:
This might be quite heavy - maybe we can just keep the minimal reproducible
version to verify that the changes are working as expected [ like the test
above this ]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]