This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 2b2a95ac96 arrow: add oversized coalesce take benchmarks (#9799)
2b2a95ac96 is described below
commit 2b2a95ac966ba73b3d020259d43111d2f76c557b
Author: ClSlaid <[email protected]>
AuthorDate: Wed May 27 03:36:47 2026 +0800
arrow: add oversized coalesce take benchmarks (#9799)
## Summary
- add `coalesce_kernels` take benchmarks alongside the existing filter
coverage for primitive, view, utf8, and dictionary schemas
- add oversized repeated-index benchmark cases that stress the
materialized `push_batch_with_indices` fallback with a configured
`biggest_coalesce_batch_size`
- extend the benchmark harness so take benchmarks can use explicit
output lengths and optionally drain all completed batches while running
## Verification
- cargo fmt -p arrow
- cargo clippy -p arrow --bench coalesce_kernels --features test_utils
-- -D warnings
- cargo bench -p arrow --bench coalesce_kernels --features test_utils --
'extra_large_repeat'
Signed-off-by: 蔡略 <[email protected]>
---
arrow/benches/coalesce_kernels.rs | 394 +++++++++++++++++++++++++++++++++++++-
1 file changed, 393 insertions(+), 1 deletion(-)
diff --git a/arrow/benches/coalesce_kernels.rs
b/arrow/benches/coalesce_kernels.rs
index b85c5cc532..0816d1a2e8 100644
--- a/arrow/benches/coalesce_kernels.rs
+++ b/arrow/benches/coalesce_kernels.rs
@@ -25,6 +25,7 @@ use arrow_array::types::{Float64Type, Int32Type,
TimestampNanosecondType};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow_select::coalesce::BatchCoalescer;
use criterion::{Criterion, criterion_group, criterion_main};
+use rand::{Rng, SeedableRng, rngs::StdRng};
/// Benchmarks for generating evently sized output RecordBatches
/// from a sequence of filtered source batches
@@ -158,7 +159,155 @@ fn add_all_filter_benchmarks(c: &mut Criterion) {
}
}
-criterion_group!(benches, add_all_filter_benchmarks);
+fn add_all_take_benchmarks(c: &mut Criterion) {
+ let batch_size = 8192;
+
+ let primitive_schema = SchemaRef::new(Schema::new(vec![
+ Field::new("int32_val", DataType::Int32, true),
+ Field::new("float_val", DataType::Float64, true),
+ Field::new(
+ "timestamp_val",
+ DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
+ true,
+ ),
+ ]));
+
+ let single_schema = SchemaRef::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::Utf8View,
+ true,
+ )]));
+
+ let single_binaryview_schema = SchemaRef::new(Schema::new(vec![Field::new(
+ "value",
+ DataType::BinaryView,
+ true,
+ )]));
+
+ let mixed_utf8view_schema = SchemaRef::new(Schema::new(vec![
+ Field::new("int32_val", DataType::Int32, true),
+ Field::new("float_val", DataType::Float64, true),
+ Field::new("utf8view_val", DataType::Utf8View, true),
+ ]));
+
+ let mixed_binaryview_schema = SchemaRef::new(Schema::new(vec![
+ Field::new("int32_val", DataType::Int32, true),
+ Field::new("float_val", DataType::Float64, true),
+ Field::new("binaryview_val", DataType::BinaryView, true),
+ ]));
+
+ let mixed_utf8_schema = SchemaRef::new(Schema::new(vec![
+ Field::new("int32_val", DataType::Int32, true),
+ Field::new("float_val", DataType::Float64, true),
+ Field::new("utf8", DataType::Utf8, true),
+ ]));
+
+ let mixed_dict_schema = SchemaRef::new(Schema::new(vec![
+ Field::new(
+ "string_dict",
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8)),
+ true,
+ ),
+ Field::new("float_val1", DataType::Float64, true),
+ Field::new("float_val2", DataType::Float64, true),
+ ]));
+
+ for null_density in [0.0, 0.1] {
+ for selectivity in [0.001, 0.01, 0.1, 0.8] {
+ for scenario in [
+ TakeBenchmarkScenario {
+ name: "primitive",
+ num_output_batches: 50,
+ max_string_len: 30,
+ schema: &primitive_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "single_utf8view",
+ num_output_batches: 50,
+ max_string_len: 30,
+ schema: &single_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "single_binaryview",
+ num_output_batches: 50,
+ max_string_len: 30,
+ schema: &single_binaryview_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "mixed_utf8view (max_string_len=20)",
+ num_output_batches: 20,
+ max_string_len: 20,
+ schema: &mixed_utf8view_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "mixed_utf8view (max_string_len=128)",
+ num_output_batches: 20,
+ max_string_len: 128,
+ schema: &mixed_utf8view_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "mixed_binaryview (max_string_len=20)",
+ num_output_batches: 20,
+ max_string_len: 20,
+ schema: &mixed_binaryview_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "mixed_binaryview (max_string_len=128)",
+ num_output_batches: 20,
+ max_string_len: 128,
+ schema: &mixed_binaryview_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "mixed_utf8",
+ num_output_batches: 20,
+ max_string_len: 30,
+ schema: &mixed_utf8_schema,
+ },
+ TakeBenchmarkScenario {
+ name: "mixed_dict",
+ num_output_batches: 10,
+ max_string_len: 30,
+ schema: &mixed_dict_schema,
+ },
+ ] {
+ TakeBenchmarkBuilder::from_scenario(
+ c,
+ batch_size,
+ null_density,
+ selectivity,
+ scenario,
+ )
+ .build();
+ }
+ }
+ }
+
+ // Repeated indices make the taken batch much larger than the source batch,
+ // which exercises the materialized fallback path for unsupported schemas.
+ for (name, schema) in [
+ ("primitive extra_large_repeat", &primitive_schema),
+ ("mixed_utf8 extra_large_repeat", &mixed_utf8_schema),
+ ] {
+ TakeBenchmarkBuilder::from_scenario(
+ c,
+ batch_size,
+ 0.0,
+ 1.0,
+ TakeBenchmarkScenario {
+ name,
+ num_output_batches: 64,
+ max_string_len: 30,
+ schema,
+ },
+ )
+ .with_biggest_coalesce_batch_size(1024)
+ .with_index_output_len(131_072)
+ .with_drain_all_completed_batches()
+ .build();
+ }
+}
+
+criterion_group!(benches, add_all_filter_benchmarks, add_all_take_benchmarks);
criterion_main!(benches);
/// Run the filters with a batch_size, null_density, selectivity, and schema
@@ -222,6 +371,129 @@ impl FilterBenchmarkBuilder<'_> {
}
}
+struct TakeBenchmarkBuilder<'a> {
+ c: &'a mut Criterion,
+ name: &'a str,
+ batch_size: usize,
+ num_output_batches: usize,
+ null_density: f32,
+ selectivity: f32,
+ max_string_len: usize,
+ schema: &'a SchemaRef,
+ biggest_coalesce_batch_size: Option<usize>,
+ index_output_len: Option<usize>,
+ drain_all_completed_batches: bool,
+}
+
+#[derive(Clone, Copy)]
+struct TakeBenchmarkScenario<'a> {
+ name: &'a str,
+ num_output_batches: usize,
+ max_string_len: usize,
+ schema: &'a SchemaRef,
+}
+
+impl<'a> TakeBenchmarkBuilder<'a> {
+ fn from_scenario(
+ c: &'a mut Criterion,
+ batch_size: usize,
+ null_density: f32,
+ selectivity: f32,
+ scenario: TakeBenchmarkScenario<'a>,
+ ) -> TakeBenchmarkBuilder<'a> {
+ let TakeBenchmarkScenario {
+ name,
+ num_output_batches,
+ max_string_len,
+ schema,
+ } = scenario;
+
+ TakeBenchmarkBuilder {
+ c,
+ name,
+ batch_size,
+ num_output_batches,
+ null_density,
+ selectivity,
+ max_string_len,
+ schema,
+ biggest_coalesce_batch_size: None,
+ index_output_len: None,
+ drain_all_completed_batches: false,
+ }
+ }
+
+ fn with_biggest_coalesce_batch_size(mut self, limit: usize) -> Self {
+ self.biggest_coalesce_batch_size = Some(limit);
+ self
+ }
+
+ fn with_index_output_len(mut self, output_len: usize) -> Self {
+ self.index_output_len = Some(output_len);
+ self
+ }
+
+ fn with_drain_all_completed_batches(mut self) -> Self {
+ self.drain_all_completed_batches = true;
+ self
+ }
+
+ fn build(self) {
+ let Self {
+ c,
+ name,
+ batch_size,
+ num_output_batches,
+ null_density,
+ selectivity,
+ max_string_len,
+ schema,
+ biggest_coalesce_batch_size,
+ index_output_len,
+ drain_all_completed_batches,
+ } = self;
+
+ let output_len = index_output_len
+ .unwrap_or_else(|| ((batch_size as f32) *
selectivity).round().max(1.0) as usize);
+
+ let indices = match index_output_len {
+ Some(_) => IndexStreamBuilder::new()
+ .with_batch_size(batch_size)
+ .with_output_len(output_len)
+ .build(),
+ None => IndexStreamBuilder::new()
+ .with_batch_size(batch_size)
+ .with_selectivity(selectivity)
+ .build(),
+ };
+
+ let data = DataStreamBuilder::new(Arc::clone(schema))
+ .with_batch_size(batch_size)
+ .with_null_density(null_density)
+ .with_max_string_len(max_string_len)
+ .build();
+
+ let id = if index_output_len.is_some() ||
biggest_coalesce_batch_size.is_some() {
+ format!(
+ "take: {name}, input: {batch_size}, output: {output_len},
nulls: {null_density}, biggest: {biggest_coalesce_batch_size:?}"
+ )
+ } else {
+ format!("take: {name}, {batch_size}, nulls: {null_density},
selectivity: {selectivity}")
+ };
+ c.bench_function(&id, |b| {
+ b.iter(|| {
+ take_streams(
+ num_output_batches,
+ indices.clone(),
+ data.clone(),
+ biggest_coalesce_batch_size,
+ drain_all_completed_batches,
+ );
+ })
+ });
+ }
+}
+
/// Pull RecordBatches from a data stream and apply a sequence of
/// filters from a filter stream until we have a specified number of output
/// batches.
@@ -247,6 +519,34 @@ fn filter_streams(
}
}
+fn take_streams(
+ mut num_output_batches: usize,
+ mut index_stream: IndexStream,
+ mut data_stream: DataStream,
+ biggest_coalesce_batch_size: Option<usize>,
+ drain_all_completed_batches: bool,
+) {
+ let schema = data_stream.schema();
+ let batch_size = data_stream.batch_size();
+ let mut coalescer = BatchCoalescer::new(Arc::clone(schema), batch_size)
+ .with_biggest_coalesce_batch_size(biggest_coalesce_batch_size);
+
+ while num_output_batches > 0 {
+ let indices = index_stream.next_indices();
+ let batch = data_stream.next_batch();
+ coalescer
+ .push_batch_with_indices(batch.clone(), indices)
+ .unwrap();
+ if drain_all_completed_batches {
+ while num_output_batches > 0 &&
coalescer.next_completed_batch().is_some() {
+ num_output_batches -= 1;
+ }
+ } else if coalescer.next_completed_batch().is_some() {
+ num_output_batches -= 1;
+ }
+ }
+}
+
/// Stream of filters to apply to a sequence of input RecordBatches
///
/// This pre-computes a sequence of filters and then repeats it forever.
@@ -257,6 +557,25 @@ struct FilterStream {
batches: Arc<[BooleanArray]>,
}
+#[derive(Debug, Clone)]
+struct IndexStream {
+ index: usize,
+ batches: Arc<[UInt32Array]>,
+}
+
+impl IndexStream {
+ fn next_indices(&mut self) -> &UInt32Array {
+ let current_index = self.index;
+ self.index += 1;
+ if self.index >= self.batches.len() {
+ self.index = 0;
+ }
+ self.batches
+ .get(current_index)
+ .expect("No more index batches available")
+ }
+}
+
impl FilterStream {
pub fn next_filter(&mut self) -> &BooleanArray {
let current_index = self.index;
@@ -324,6 +643,68 @@ impl FilterStreamBuilder {
}
}
+#[derive(Debug)]
+struct IndexStreamBuilder {
+ batch_size: usize,
+ num_batches: usize,
+ output_len: Option<usize>,
+ selectivity: f32,
+}
+
+impl IndexStreamBuilder {
+ fn new() -> Self {
+ Self {
+ batch_size: 8192,
+ num_batches: 11,
+ output_len: None,
+ selectivity: 0.5,
+ }
+ }
+
+ fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
+ fn with_selectivity(mut self, selectivity: f32) -> Self {
+ assert!((0.0..=1.0).contains(&selectivity));
+ self.selectivity = selectivity;
+ self
+ }
+
+ fn with_output_len(mut self, output_len: usize) -> Self {
+ self.output_len = Some(output_len.max(1));
+ self
+ }
+
+ fn build(self) -> IndexStream {
+ let Self {
+ batch_size,
+ num_batches,
+ output_len,
+ selectivity,
+ } = self;
+
+ let output_len = output_len
+ .unwrap_or_else(|| ((batch_size as f32) *
selectivity).round().max(1.0) as usize);
+ let batches = (0..num_batches)
+ .map(|seed| create_index_array(batch_size, output_len, seed as
u64))
+ .collect::<Vec<_>>();
+
+ IndexStream {
+ index: 0,
+ batches: Arc::from(batches),
+ }
+ }
+}
+
+fn create_index_array(input_len: usize, output_len: usize, seed: u64) ->
UInt32Array {
+ let mut rng = StdRng::seed_from_u64(seed);
+ UInt32Array::from_iter_values(
+ (0..output_len).map(|_|
rng.random_range(0..u32::try_from(input_len).unwrap())),
+ )
+}
+
#[derive(Debug, Clone)]
struct DataStream {
schema: SchemaRef,
@@ -455,6 +836,17 @@ impl DataStreamBuilder {
self.max_string_len,
)) // TODO seed
}
+ DataType::BinaryView => Arc::new(BinaryViewArray::from_iter(
+ create_binary_array_with_len_range_and_prefix_and_seed::<i32>(
+ self.batch_size,
+ self.null_density,
+ 0,
+ self.max_string_len,
+ b"",
+ seed,
+ )
+ .iter(),
+ )),
DataType::Dictionary(key_type, value_type)
if key_type.as_ref() == &DataType::Int32
&& value_type.as_ref() == &DataType::Utf8 =>