This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d303f5817f chore: Add end-to-end benchmark for array_agg, code cleanup
(#20496)
d303f5817f is described below
commit d303f5817f696fc6250bf67a71d3ea22b0628124
Author: Neil Conway <[email protected]>
AuthorDate: Mon Feb 23 13:26:16 2026 -0500
chore: Add end-to-end benchmark for array_agg, code cleanup (#20496)
## Which issue does this PR close?
- Prep work for #20465
## Rationale for this change
- Add three queries to measure the end-to-end performance of
`array_agg()`, as prep work for optimizing its performance.
## What changes are included in this PR?
This PR also cleans up the `data_utils` benchmark code:
- Seed the RNG once and use it for all data generation. The previous
coding seeded an RNG but only used it for some data, and also used the
same seed for every batch, which lead to repeated data (... I assume
this was not the intent?)
- The previous code made `u64_wide` a nullable field, but passed `9.0`
for the `value_density` when generating data, which meant that no NULL
values would ever be generated. Switch to making `u64_wide`
non-nullable.
- Fix up comments, remove a clippy suppress, various other cleanups.
## Are these changes tested?
Yes.
## Are there any user-facing changes?
No.
---
datafusion/core/benches/aggregate_query_sql.rs | 33 ++++++++
datafusion/core/benches/data_utils/mod.rs | 101 ++++++++++++-------------
2 files changed, 80 insertions(+), 54 deletions(-)
diff --git a/datafusion/core/benches/aggregate_query_sql.rs
b/datafusion/core/benches/aggregate_query_sql.rs
index f785c94580..b47512e5e9 100644
--- a/datafusion/core/benches/aggregate_query_sql.rs
+++ b/datafusion/core/benches/aggregate_query_sql.rs
@@ -251,6 +251,39 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});
+
+ c.bench_function("array_agg_query_group_by_few_groups", |b| {
+ b.iter(|| {
+ query(
+ ctx.clone(),
+ &rt,
+ "SELECT u64_narrow, array_agg(f64) \
+ FROM t GROUP BY u64_narrow",
+ )
+ })
+ });
+
+ c.bench_function("array_agg_query_group_by_mid_groups", |b| {
+ b.iter(|| {
+ query(
+ ctx.clone(),
+ &rt,
+ "SELECT u64_mid, array_agg(f64) \
+ FROM t GROUP BY u64_mid",
+ )
+ })
+ });
+
+ c.bench_function("array_agg_query_group_by_many_groups", |b| {
+ b.iter(|| {
+ query(
+ ctx.clone(),
+ &rt,
+ "SELECT u64_wide, array_agg(f64) \
+ FROM t GROUP BY u64_wide",
+ )
+ })
+ });
}
criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/core/benches/data_utils/mod.rs
b/datafusion/core/benches/data_utils/mod.rs
index accd51ae58..a30ada4205 100644
--- a/datafusion/core/benches/data_utils/mod.rs
+++ b/datafusion/core/benches/data_utils/mod.rs
@@ -45,7 +45,7 @@ pub fn create_table_provider(
) -> Result<Arc<MemTable>> {
let schema = Arc::new(create_schema());
let partitions =
- create_record_batches(schema.clone(), array_len, partitions_len,
batch_size);
+ create_record_batches(&schema, array_len, partitions_len, batch_size);
// declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
MemTable::try_new(schema, partitions).map(Arc::new)
}
@@ -56,21 +56,19 @@ pub fn create_schema() -> Schema {
Field::new("utf8", DataType::Utf8, false),
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, true),
- // This field will contain integers randomly selected from a large
- // range of values, i.e. [0, u64::MAX], such that there are none (or
- // very few) repeated values.
- Field::new("u64_wide", DataType::UInt64, true),
- // This field will contain integers randomly selected from a narrow
- // range of values such that there are a few distinct values, but they
- // are repeated often.
+ // Integers randomly selected from a wide range of values, i.e. [0,
+ // u64::MAX], such that there are ~no repeated values.
+ Field::new("u64_wide", DataType::UInt64, false),
+ // Integers randomly selected from a mid-range of values [0, 1000),
+ // providing ~1000 distinct groups.
+ Field::new("u64_mid", DataType::UInt64, false),
+ // Integers randomly selected from a narrow range of values such that
+ // there are a few distinct values, but they are repeated often.
Field::new("u64_narrow", DataType::UInt64, false),
])
}
-fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
- // use random numbers to avoid spurious compiler optimizations wrt to
branching
- let mut rng = StdRng::seed_from_u64(42);
-
+fn create_data(rng: &mut StdRng, size: usize, null_density: f64) ->
Vec<Option<f64>> {
(0..size)
.map(|_| {
if rng.random::<f64>() > null_density {
@@ -82,56 +80,43 @@ fn create_data(size: usize, null_density: f64) ->
Vec<Option<f64>> {
.collect()
}
-fn create_integer_data(
- rng: &mut StdRng,
- size: usize,
- value_density: f64,
-) -> Vec<Option<u64>> {
- (0..size)
- .map(|_| {
- if rng.random::<f64>() > value_density {
- None
- } else {
- Some(rng.random::<u64>())
- }
- })
- .collect()
-}
-
fn create_record_batch(
schema: SchemaRef,
rng: &mut StdRng,
batch_size: usize,
- i: usize,
+ batch_index: usize,
) -> RecordBatch {
- // the 4 here is the number of different keys.
- // a higher number increase sparseness
- let vs = [0, 1, 2, 3];
- let keys: Vec<String> = (0..batch_size)
- .map(
- // use random numbers to avoid spurious compiler optimizations wrt
to branching
- |_| format!("hi{:?}", vs.choose(rng)),
- )
- .collect();
- let keys: Vec<&str> = keys.iter().map(|e| &**e).collect();
+ // Randomly choose from 4 distinct key values; a higher number increases
sparseness.
+ let key_suffixes = [0, 1, 2, 3];
+ let keys = StringArray::from_iter_values(
+ (0..batch_size).map(|_| format!("hi{}",
key_suffixes.choose(rng).unwrap())),
+ );
- let values = create_data(batch_size, 0.5);
+ let values = create_data(rng, batch_size, 0.5);
// Integer values between [0, u64::MAX].
- let integer_values_wide = create_integer_data(rng, batch_size, 9.0);
+ let integer_values_wide = (0..batch_size)
+ .map(|_| rng.random::<u64>())
+ .collect::<Vec<_>>();
- // Integer values between [0, 9].
+ // Integer values between [0, 1000).
+ let integer_values_mid = (0..batch_size)
+ .map(|_| rng.random_range(0..1000))
+ .collect::<Vec<_>>();
+
+ // Integer values between [0, 10).
let integer_values_narrow = (0..batch_size)
- .map(|_| rng.random_range(0_u64..10))
+ .map(|_| rng.random_range(0..10))
.collect::<Vec<_>>();
RecordBatch::try_new(
schema,
vec![
- Arc::new(StringArray::from(keys)),
- Arc::new(Float32Array::from(vec![i as f32; batch_size])),
+ Arc::new(keys),
+ Arc::new(Float32Array::from(vec![batch_index as f32; batch_size])),
Arc::new(Float64Array::from(values)),
Arc::new(UInt64Array::from(integer_values_wide)),
+ Arc::new(UInt64Array::from(integer_values_mid)),
Arc::new(UInt64Array::from(integer_values_narrow)),
],
)
@@ -140,21 +125,29 @@ fn create_record_batch(
/// Create record batches of `partitions_len` partitions and `batch_size` for
each batch,
/// with a total number of `array_len` records
-#[expect(clippy::needless_pass_by_value)]
pub fn create_record_batches(
- schema: SchemaRef,
+ schema: &SchemaRef,
array_len: usize,
partitions_len: usize,
batch_size: usize,
) -> Vec<Vec<RecordBatch>> {
let mut rng = StdRng::seed_from_u64(42);
- (0..partitions_len)
- .map(|_| {
- (0..array_len / batch_size / partitions_len)
- .map(|i| create_record_batch(schema.clone(), &mut rng,
batch_size, i))
- .collect::<Vec<_>>()
- })
- .collect::<Vec<_>>()
+ let mut partitions = Vec::with_capacity(partitions_len);
+ let batches_per_partition = array_len / batch_size / partitions_len;
+
+ for _ in 0..partitions_len {
+ let mut batches = Vec::with_capacity(batches_per_partition);
+ for batch_index in 0..batches_per_partition {
+ batches.push(create_record_batch(
+ schema.clone(),
+ &mut rng,
+ batch_size,
+ batch_index,
+ ));
+ }
+ partitions.push(batches);
+ }
+ partitions
}
/// An enum that wraps either a regular StringBuilder or a
GenericByteViewBuilder
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]