This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new d5116a1b perf: Add criterion benchmark for aggregate expressions (#948)
d5116a1b is described below
commit d5116a1bcba8a7acc9399d5af469fed3262500a1
Author: Andy Grove <[email protected]>
AuthorDate: Wed Sep 18 16:02:35 2024 -0600
perf: Add criterion benchmark for aggregate expressions (#948)
---
native/Cargo.lock | 2 +
native/core/Cargo.toml | 6 +-
native/core/benches/aggregate.rs | 202 +++++++++++++++++++++++++++++++++++++++
3 files changed, 209 insertions(+), 1 deletion(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 480fb6f1..e55ea46e 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -701,6 +701,7 @@ dependencies = [
"ciborium",
"clap",
"criterion-plot",
+ "futures",
"is-terminal",
"itertools 0.10.5",
"num-traits",
@@ -713,6 +714,7 @@ dependencies = [
"serde_derive",
"serde_json",
"tinytemplate",
+ "tokio",
"walkdir",
]
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 58fe00e7..13f6b135 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -80,7 +80,7 @@ datafusion-comet-proto = { workspace = true }
[dev-dependencies]
pprof = { version = "0.13.0", features = ["flamegraph"] }
-criterion = "0.5.1"
+criterion = { version = "0.5.1", features = ["async_tokio"] }
jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "7"
@@ -122,3 +122,7 @@ harness = false
[[bench]]
name = "filter"
harness = false
+
+[[bench]]
+name = "aggregate"
+harness = false
diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs
new file mode 100644
index 00000000..e6b3e315
--- /dev/null
+++ b/native/core/benches/aggregate.rs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.use arrow::array::{ArrayRef, BooleanBuilder,
Int32Builder, RecordBatch, StringBuilder};
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow_array::builder::{Decimal128Builder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use comet::execution::datafusion::expressions::avg_decimal::AvgDecimal;
+use comet::execution::datafusion::expressions::sum_decimal::SumDecimal;
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion::functions_aggregate::average::avg_udaf;
+use datafusion::functions_aggregate::sum::sum_udaf;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_execution::TaskContext;
+use datafusion_expr::AggregateUDF;
+use datafusion_physical_expr::aggregate::AggregateExprBuilder;
+use datafusion_physical_expr::expressions::Column;
+use futures::StreamExt;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Runtime;
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let mut group = c.benchmark_group("aggregate");
+ let num_rows = 8192;
+ let batch = create_record_batch(num_rows);
+ let mut batches = Vec::new();
+ for _ in 0..10 {
+ batches.push(batch.clone());
+ }
+ let partitions = &[batches];
+ let c0: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c0", 0));
+ let c1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c1", 1));
+
+ let rt = Runtime::new().unwrap();
+
+ group.bench_function("avg_decimal_datafusion", |b| {
+ let datafusion_sum_decimal = avg_udaf();
+ b.to_async(&rt).iter(|| {
+ black_box(agg_test(
+ partitions,
+ c0.clone(),
+ c1.clone(),
+ datafusion_sum_decimal.clone(),
+ "avg",
+ ))
+ })
+ });
+
+ group.bench_function("avg_decimal_comet", |b| {
+ let comet_avg_decimal =
Arc::new(AggregateUDF::new_from_impl(AvgDecimal::new(
+ Arc::clone(&c1),
+ "avg",
+ DataType::Decimal128(38, 10),
+ DataType::Decimal128(38, 10),
+ )));
+ b.to_async(&rt).iter(|| {
+ black_box(agg_test(
+ partitions,
+ c0.clone(),
+ c1.clone(),
+ comet_avg_decimal.clone(),
+ "avg",
+ ))
+ })
+ });
+
+ group.bench_function("sum_decimal_datafusion", |b| {
+ let datafusion_sum_decimal = sum_udaf();
+ b.to_async(&rt).iter(|| {
+ black_box(agg_test(
+ partitions,
+ c0.clone(),
+ c1.clone(),
+ datafusion_sum_decimal.clone(),
+ "sum",
+ ))
+ })
+ });
+
+ group.bench_function("sum_decimal_comet", |b| {
+ let comet_sum_decimal =
Arc::new(AggregateUDF::new_from_impl(SumDecimal::new(
+ "sum",
+ Arc::clone(&c1),
+ DataType::Decimal128(38, 10),
+ )));
+ b.to_async(&rt).iter(|| {
+ black_box(agg_test(
+ partitions,
+ c0.clone(),
+ c1.clone(),
+ comet_sum_decimal.clone(),
+ "sum",
+ ))
+ })
+ });
+
+ group.finish();
+}
+
+async fn agg_test(
+ partitions: &[Vec<RecordBatch>],
+ c0: Arc<dyn PhysicalExpr>,
+ c1: Arc<dyn PhysicalExpr>,
+ aggregate_udf: Arc<AggregateUDF>,
+ alias: &str,
+) {
+ let schema = &partitions[0][0].schema();
+ let scan: Arc<dyn ExecutionPlan> =
+ Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema),
None).unwrap());
+ let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), schema,
aggregate_udf, alias);
+ let mut stream = aggregate
+ .execute(0, Arc::new(TaskContext::default()))
+ .unwrap();
+ while let Some(batch) = stream.next().await {
+ let _batch = batch.unwrap();
+ }
+}
+
+fn create_aggregate(
+ scan: Arc<dyn ExecutionPlan>,
+ c0: Arc<dyn PhysicalExpr>,
+ c1: Arc<dyn PhysicalExpr>,
+ schema: &SchemaRef,
+ aggregate_udf: Arc<AggregateUDF>,
+ alias: &str,
+) -> Arc<AggregateExec> {
+ let aggr_expr = AggregateExprBuilder::new(aggregate_udf, vec![c1])
+ .schema(schema.clone())
+ .alias(alias)
+ .with_ignore_nulls(false)
+ .with_distinct(false)
+ .build()
+ .unwrap();
+
+ Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ PhysicalGroupBy::new_single(vec![(c0, "c0".to_string())]),
+ vec![aggr_expr],
+ vec![None], // no filter expressions
+ scan,
+ Arc::clone(schema),
+ )
+ .unwrap(),
+ )
+}
+
+fn create_record_batch(num_rows: usize) -> RecordBatch {
+ let mut decimal_builder = Decimal128Builder::with_capacity(num_rows);
+ let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows *
32);
+ for i in 0..num_rows {
+ decimal_builder.append_value(i as i128);
+ string_builder.append_value(format!("this is string #{}", i % 1024));
+ }
+ let decimal_array = Arc::new(decimal_builder.finish());
+ let string_array = Arc::new(string_builder.finish());
+
+ let mut fields = vec![];
+ let mut columns: Vec<ArrayRef> = vec![];
+
+ // string column
+ fields.push(Field::new("c0", DataType::Utf8, false));
+ columns.push(string_array);
+
+ // decimal column
+ fields.push(Field::new("c1", DataType::Decimal128(38, 10), false));
+ columns.push(decimal_array);
+
+ let schema = Schema::new(fields);
+ RecordBatch::try_new(Arc::new(schema), columns).unwrap()
+}
+
+fn config() -> Criterion {
+ Criterion::default()
+ .measurement_time(Duration::from_millis(500))
+ .warm_up_time(Duration::from_millis(500))
+}
+
+criterion_group! {
+ name = benches;
+ config = config();
+ targets = criterion_benchmark
+}
+criterion_main!(benches);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]