This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b2a95ac96 arrow: add oversized coalesce take benchmarks (#9799)
2b2a95ac96 is described below

commit 2b2a95ac966ba73b3d020259d43111d2f76c557b
Author: ClSlaid <[email protected]>
AuthorDate: Wed May 27 03:36:47 2026 +0800

    arrow: add oversized coalesce take benchmarks (#9799)
    
    ## Summary
    - add `coalesce_kernels` take benchmarks alongside the existing filter
    coverage for primitive, view, utf8, and dictionary schemas
    - add oversized repeated-index benchmark cases that stress the
    materialized `push_batch_with_indices` fallback with a configured
    `biggest_coalesce_batch_size`
    - extend the benchmark harness so take benchmarks can use explicit
    output lengths and optionally drain all completed batches while running
    
    ## Verification
    - cargo fmt -p arrow
    - cargo clippy -p arrow --bench coalesce_kernels --features test_utils
    -- -D warnings
    - cargo bench -p arrow --bench coalesce_kernels --features test_utils --
    'extra_large_repeat'
    
    Signed-off-by: 蔡略 <[email protected]>
---
 arrow/benches/coalesce_kernels.rs | 394 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 393 insertions(+), 1 deletion(-)

diff --git a/arrow/benches/coalesce_kernels.rs 
b/arrow/benches/coalesce_kernels.rs
index b85c5cc532..0816d1a2e8 100644
--- a/arrow/benches/coalesce_kernels.rs
+++ b/arrow/benches/coalesce_kernels.rs
@@ -25,6 +25,7 @@ use arrow_array::types::{Float64Type, Int32Type, 
TimestampNanosecondType};
 use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
 use arrow_select::coalesce::BatchCoalescer;
 use criterion::{Criterion, criterion_group, criterion_main};
+use rand::{Rng, SeedableRng, rngs::StdRng};
 
 /// Benchmarks for generating evently sized output RecordBatches
 /// from a sequence of filtered source batches
@@ -158,7 +159,155 @@ fn add_all_filter_benchmarks(c: &mut Criterion) {
     }
 }
 
-criterion_group!(benches, add_all_filter_benchmarks);
+fn add_all_take_benchmarks(c: &mut Criterion) {
+    let batch_size = 8192;
+
+    let primitive_schema = SchemaRef::new(Schema::new(vec![
+        Field::new("int32_val", DataType::Int32, true),
+        Field::new("float_val", DataType::Float64, true),
+        Field::new(
+            "timestamp_val",
+            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
+            true,
+        ),
+    ]));
+
+    let single_schema = SchemaRef::new(Schema::new(vec![Field::new(
+        "value",
+        DataType::Utf8View,
+        true,
+    )]));
+
+    let single_binaryview_schema = SchemaRef::new(Schema::new(vec![Field::new(
+        "value",
+        DataType::BinaryView,
+        true,
+    )]));
+
+    let mixed_utf8view_schema = SchemaRef::new(Schema::new(vec![
+        Field::new("int32_val", DataType::Int32, true),
+        Field::new("float_val", DataType::Float64, true),
+        Field::new("utf8view_val", DataType::Utf8View, true),
+    ]));
+
+    let mixed_binaryview_schema = SchemaRef::new(Schema::new(vec![
+        Field::new("int32_val", DataType::Int32, true),
+        Field::new("float_val", DataType::Float64, true),
+        Field::new("binaryview_val", DataType::BinaryView, true),
+    ]));
+
+    let mixed_utf8_schema = SchemaRef::new(Schema::new(vec![
+        Field::new("int32_val", DataType::Int32, true),
+        Field::new("float_val", DataType::Float64, true),
+        Field::new("utf8", DataType::Utf8, true),
+    ]));
+
+    let mixed_dict_schema = SchemaRef::new(Schema::new(vec![
+        Field::new(
+            "string_dict",
+            DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8)),
+            true,
+        ),
+        Field::new("float_val1", DataType::Float64, true),
+        Field::new("float_val2", DataType::Float64, true),
+    ]));
+
+    for null_density in [0.0, 0.1] {
+        for selectivity in [0.001, 0.01, 0.1, 0.8] {
+            for scenario in [
+                TakeBenchmarkScenario {
+                    name: "primitive",
+                    num_output_batches: 50,
+                    max_string_len: 30,
+                    schema: &primitive_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "single_utf8view",
+                    num_output_batches: 50,
+                    max_string_len: 30,
+                    schema: &single_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "single_binaryview",
+                    num_output_batches: 50,
+                    max_string_len: 30,
+                    schema: &single_binaryview_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "mixed_utf8view (max_string_len=20)",
+                    num_output_batches: 20,
+                    max_string_len: 20,
+                    schema: &mixed_utf8view_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "mixed_utf8view (max_string_len=128)",
+                    num_output_batches: 20,
+                    max_string_len: 128,
+                    schema: &mixed_utf8view_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "mixed_binaryview (max_string_len=20)",
+                    num_output_batches: 20,
+                    max_string_len: 20,
+                    schema: &mixed_binaryview_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "mixed_binaryview (max_string_len=128)",
+                    num_output_batches: 20,
+                    max_string_len: 128,
+                    schema: &mixed_binaryview_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "mixed_utf8",
+                    num_output_batches: 20,
+                    max_string_len: 30,
+                    schema: &mixed_utf8_schema,
+                },
+                TakeBenchmarkScenario {
+                    name: "mixed_dict",
+                    num_output_batches: 10,
+                    max_string_len: 30,
+                    schema: &mixed_dict_schema,
+                },
+            ] {
+                TakeBenchmarkBuilder::from_scenario(
+                    c,
+                    batch_size,
+                    null_density,
+                    selectivity,
+                    scenario,
+                )
+                .build();
+            }
+        }
+    }
+
+    // Repeated indices make the taken batch much larger than the source batch,
+    // which exercises the materialized fallback path for unsupported schemas.
+    for (name, schema) in [
+        ("primitive extra_large_repeat", &primitive_schema),
+        ("mixed_utf8 extra_large_repeat", &mixed_utf8_schema),
+    ] {
+        TakeBenchmarkBuilder::from_scenario(
+            c,
+            batch_size,
+            0.0,
+            1.0,
+            TakeBenchmarkScenario {
+                name,
+                num_output_batches: 64,
+                max_string_len: 30,
+                schema,
+            },
+        )
+        .with_biggest_coalesce_batch_size(1024)
+        .with_index_output_len(131_072)
+        .with_drain_all_completed_batches()
+        .build();
+    }
+}
+
+criterion_group!(benches, add_all_filter_benchmarks, add_all_take_benchmarks);
 criterion_main!(benches);
 
 /// Run the filters with a batch_size, null_density, selectivity, and schema
@@ -222,6 +371,129 @@ impl FilterBenchmarkBuilder<'_> {
     }
 }
 
+struct TakeBenchmarkBuilder<'a> {
+    c: &'a mut Criterion,
+    name: &'a str,
+    batch_size: usize,
+    num_output_batches: usize,
+    null_density: f32,
+    selectivity: f32,
+    max_string_len: usize,
+    schema: &'a SchemaRef,
+    biggest_coalesce_batch_size: Option<usize>,
+    index_output_len: Option<usize>,
+    drain_all_completed_batches: bool,
+}
+
+#[derive(Clone, Copy)]
+struct TakeBenchmarkScenario<'a> {
+    name: &'a str,
+    num_output_batches: usize,
+    max_string_len: usize,
+    schema: &'a SchemaRef,
+}
+
+impl<'a> TakeBenchmarkBuilder<'a> {
+    fn from_scenario(
+        c: &'a mut Criterion,
+        batch_size: usize,
+        null_density: f32,
+        selectivity: f32,
+        scenario: TakeBenchmarkScenario<'a>,
+    ) -> TakeBenchmarkBuilder<'a> {
+        let TakeBenchmarkScenario {
+            name,
+            num_output_batches,
+            max_string_len,
+            schema,
+        } = scenario;
+
+        TakeBenchmarkBuilder {
+            c,
+            name,
+            batch_size,
+            num_output_batches,
+            null_density,
+            selectivity,
+            max_string_len,
+            schema,
+            biggest_coalesce_batch_size: None,
+            index_output_len: None,
+            drain_all_completed_batches: false,
+        }
+    }
+
+    fn with_biggest_coalesce_batch_size(mut self, limit: usize) -> Self {
+        self.biggest_coalesce_batch_size = Some(limit);
+        self
+    }
+
+    fn with_index_output_len(mut self, output_len: usize) -> Self {
+        self.index_output_len = Some(output_len);
+        self
+    }
+
+    fn with_drain_all_completed_batches(mut self) -> Self {
+        self.drain_all_completed_batches = true;
+        self
+    }
+
+    fn build(self) {
+        let Self {
+            c,
+            name,
+            batch_size,
+            num_output_batches,
+            null_density,
+            selectivity,
+            max_string_len,
+            schema,
+            biggest_coalesce_batch_size,
+            index_output_len,
+            drain_all_completed_batches,
+        } = self;
+
+        let output_len = index_output_len
+            .unwrap_or_else(|| ((batch_size as f32) * 
selectivity).round().max(1.0) as usize);
+
+        let indices = match index_output_len {
+            Some(_) => IndexStreamBuilder::new()
+                .with_batch_size(batch_size)
+                .with_output_len(output_len)
+                .build(),
+            None => IndexStreamBuilder::new()
+                .with_batch_size(batch_size)
+                .with_selectivity(selectivity)
+                .build(),
+        };
+
+        let data = DataStreamBuilder::new(Arc::clone(schema))
+            .with_batch_size(batch_size)
+            .with_null_density(null_density)
+            .with_max_string_len(max_string_len)
+            .build();
+
+        let id = if index_output_len.is_some() || 
biggest_coalesce_batch_size.is_some() {
+            format!(
+                "take: {name}, input: {batch_size}, output: {output_len}, 
nulls: {null_density}, biggest: {biggest_coalesce_batch_size:?}"
+            )
+        } else {
+            format!("take: {name}, {batch_size}, nulls: {null_density}, 
selectivity: {selectivity}")
+        };
+        c.bench_function(&id, |b| {
+            b.iter(|| {
+                take_streams(
+                    num_output_batches,
+                    indices.clone(),
+                    data.clone(),
+                    biggest_coalesce_batch_size,
+                    drain_all_completed_batches,
+                );
+            })
+        });
+    }
+}
+
 /// Pull RecordBatches from a data stream and apply a sequence of
 /// filters from a filter stream until we have a specified number of output
 /// batches.
@@ -247,6 +519,34 @@ fn filter_streams(
     }
 }
 
+fn take_streams(
+    mut num_output_batches: usize,
+    mut index_stream: IndexStream,
+    mut data_stream: DataStream,
+    biggest_coalesce_batch_size: Option<usize>,
+    drain_all_completed_batches: bool,
+) {
+    let schema = data_stream.schema();
+    let batch_size = data_stream.batch_size();
+    let mut coalescer = BatchCoalescer::new(Arc::clone(schema), batch_size)
+        .with_biggest_coalesce_batch_size(biggest_coalesce_batch_size);
+
+    while num_output_batches > 0 {
+        let indices = index_stream.next_indices();
+        let batch = data_stream.next_batch();
+        coalescer
+            .push_batch_with_indices(batch.clone(), indices)
+            .unwrap();
+        if drain_all_completed_batches {
+            while num_output_batches > 0 && 
coalescer.next_completed_batch().is_some() {
+                num_output_batches -= 1;
+            }
+        } else if coalescer.next_completed_batch().is_some() {
+            num_output_batches -= 1;
+        }
+    }
+}
+
 /// Stream of filters to apply to a sequence of input RecordBatches
 ///
 /// This pre-computes a sequence of filters and then repeats it forever.
@@ -257,6 +557,25 @@ struct FilterStream {
     batches: Arc<[BooleanArray]>,
 }
 
+#[derive(Debug, Clone)]
+struct IndexStream {
+    index: usize,
+    batches: Arc<[UInt32Array]>,
+}
+
+impl IndexStream {
+    fn next_indices(&mut self) -> &UInt32Array {
+        let current_index = self.index;
+        self.index += 1;
+        if self.index >= self.batches.len() {
+            self.index = 0;
+        }
+        self.batches
+            .get(current_index)
+            .expect("No more index batches available")
+    }
+}
+
 impl FilterStream {
     pub fn next_filter(&mut self) -> &BooleanArray {
         let current_index = self.index;
@@ -324,6 +643,68 @@ impl FilterStreamBuilder {
     }
 }
 
+#[derive(Debug)]
+struct IndexStreamBuilder {
+    batch_size: usize,
+    num_batches: usize,
+    output_len: Option<usize>,
+    selectivity: f32,
+}
+
+impl IndexStreamBuilder {
+    fn new() -> Self {
+        Self {
+            batch_size: 8192,
+            num_batches: 11,
+            output_len: None,
+            selectivity: 0.5,
+        }
+    }
+
+    fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    fn with_selectivity(mut self, selectivity: f32) -> Self {
+        assert!((0.0..=1.0).contains(&selectivity));
+        self.selectivity = selectivity;
+        self
+    }
+
+    fn with_output_len(mut self, output_len: usize) -> Self {
+        self.output_len = Some(output_len.max(1));
+        self
+    }
+
+    fn build(self) -> IndexStream {
+        let Self {
+            batch_size,
+            num_batches,
+            output_len,
+            selectivity,
+        } = self;
+
+        let output_len = output_len
+            .unwrap_or_else(|| ((batch_size as f32) * 
selectivity).round().max(1.0) as usize);
+        let batches = (0..num_batches)
+            .map(|seed| create_index_array(batch_size, output_len, seed as 
u64))
+            .collect::<Vec<_>>();
+
+        IndexStream {
+            index: 0,
+            batches: Arc::from(batches),
+        }
+    }
+}
+
+fn create_index_array(input_len: usize, output_len: usize, seed: u64) -> 
UInt32Array {
+    let mut rng = StdRng::seed_from_u64(seed);
+    UInt32Array::from_iter_values(
+        (0..output_len).map(|_| 
rng.random_range(0..u32::try_from(input_len).unwrap())),
+    )
+}
+
 #[derive(Debug, Clone)]
 struct DataStream {
     schema: SchemaRef,
@@ -455,6 +836,17 @@ impl DataStreamBuilder {
                     self.max_string_len,
                 )) // TODO seed
             }
+            DataType::BinaryView => Arc::new(BinaryViewArray::from_iter(
+                create_binary_array_with_len_range_and_prefix_and_seed::<i32>(
+                    self.batch_size,
+                    self.null_density,
+                    0,
+                    self.max_string_len,
+                    b"",
+                    seed,
+                )
+                .iter(),
+            )),
             DataType::Dictionary(key_type, value_type)
                 if key_type.as_ref() == &DataType::Int32
                     && value_type.as_ref() == &DataType::Utf8 =>

Reply via email to