This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e13c6537c Fix SortExec bench case and Add SortExec input cases to
bench for SortPreservingMergeExec (#5308)
e13c6537c is described below
commit e13c6537c5222a9254a6d614b0edfefd167a5b0e
Author: Jay Miller <[email protected]>
AuthorDate: Mon Feb 20 08:27:05 2023 -0500
Fix SortExec bench case and Add SortExec input cases to bench for
SortPreservingMergeExec (#5308)
* add SortExec input case to each merge bench case
* fix lil typo error in sort bench
* fix sort bench to actually use full data set in non-preserve partition
case
---
datafusion/core/benches/merge.rs | 72 ++++++++++++++++++++++++++++++++++++++++
datafusion/core/benches/sort.rs | 6 ++--
2 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/benches/merge.rs b/datafusion/core/benches/merge.rs
index a7ac6cd41..f1c473603 100644
--- a/datafusion/core/benches/merge.rs
+++ b/datafusion/core/benches/merge.rs
@@ -80,6 +80,7 @@ use arrow::{
/// Benchmarks for SortPreservingMerge stream
use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::{
execution::context::TaskContext,
physical_plan::{
@@ -136,11 +137,22 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || case.run())
});
+ c.bench_function("merge i64 SortExec input", |b| {
+ let case = MergeBenchCase::new_with_sort_input(&I64_STREAMS);
+
+ b.iter(move || case.run())
+ });
+
c.bench_function("merge f64", |b| {
let case = MergeBenchCase::new(&F64_STREAMS);
b.iter(move || case.run())
});
+ c.bench_function("merge f64 SortExec input", |b| {
+ let case = MergeBenchCase::new_with_sort_input(&F64_STREAMS);
+
+ b.iter(move || case.run())
+ });
c.bench_function("merge utf8 low cardinality", |b| {
let case = MergeBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS);
@@ -148,39 +160,79 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || case.run())
});
+ c.bench_function("merge utf8 low cardinality SortExec", |b| {
+ let case =
MergeBenchCase::new_with_sort_input(&UTF8_LOW_CARDINALITY_STREAMS);
+
+ b.iter(move || case.run())
+ });
+
c.bench_function("merge utf8 high cardinality", |b| {
let case = MergeBenchCase::new(&UTF8_HIGH_CARDINALITY_STREAMS);
b.iter(move || case.run())
});
+ c.bench_function("merge utf8 high cardinality SortExec input", |b| {
+ let case =
MergeBenchCase::new_with_sort_input(&UTF8_HIGH_CARDINALITY_STREAMS);
+
+ b.iter(move || case.run())
+ });
+
c.bench_function("merge utf8 tuple", |b| {
let case = MergeBenchCase::new(&UTF8_TUPLE_STREAMS);
b.iter(move || case.run())
});
+ c.bench_function("merge utf8 tuple SortExec input", |b| {
+ let case = MergeBenchCase::new_with_sort_input(&UTF8_TUPLE_STREAMS);
+
+ b.iter(move || case.run())
+ });
+
c.bench_function("merge utf8 dictionary", |b| {
let case = MergeBenchCase::new(&DICTIONARY_STREAMS);
b.iter(move || case.run())
});
+ c.bench_function("merge utf8 dictionary SortExec input", |b| {
+ let case = MergeBenchCase::new_with_sort_input(&DICTIONARY_STREAMS);
+
+ b.iter(move || case.run())
+ });
+
c.bench_function("merge utf8 dictionary tuple", |b| {
let case = MergeBenchCase::new(&DICTIONARY_TUPLE_STREAMS);
b.iter(move || case.run())
});
+ c.bench_function("merge utf8 dictionary tuple SortExec input", |b| {
+ let case =
MergeBenchCase::new_with_sort_input(&DICTIONARY_TUPLE_STREAMS);
+ b.iter(move || case.run())
+ });
+
c.bench_function("merge mixed utf8 dictionary tuple", |b| {
let case = MergeBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS);
b.iter(move || case.run())
});
+ c.bench_function("merge mixed utf8 dictionary tuple SortExec input", |b| {
+ let case =
MergeBenchCase::new_with_sort_input(&MIXED_DICTIONARY_TUPLE_STREAMS);
+ b.iter(move || case.run())
+ });
+
c.bench_function("merge mixed tuple", |b| {
let case = MergeBenchCase::new(&MIXED_TUPLE_STREAMS);
b.iter(move || case.run())
});
+
+ c.bench_function("merge mixed tuple SortExec input", |b| {
+ let case = MergeBenchCase::new_with_sort_input(&MIXED_TUPLE_STREAMS);
+
+ b.iter(move || case.run())
+ });
}
/// Encapsulates running each test case
@@ -214,6 +266,26 @@ impl MergeBenchCase {
}
}
+ fn new_with_sort_input(partitions: &[Vec<RecordBatch>]) -> Self {
+ let runtime =
tokio::runtime::Builder::new_multi_thread().build().unwrap();
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ let schema = partitions[0][0].schema();
+ let sort = make_sort_exprs(schema.as_ref());
+
+ let projection = None;
+ let exec = Arc::new(MemoryExec::try_new(partitions, schema,
projection).unwrap());
+ let sort_exec = SortExec::try_new(sort.to_owned(), exec,
None).unwrap();
+ let plan = Arc::new(SortPreservingMergeExec::new(sort,
Arc::new(sort_exec)));
+
+ Self {
+ runtime,
+ task_ctx,
+ plan,
+ }
+ }
+
/// runs the specified plan to completion, draining all input and
/// panic'ing on error
fn run(&self) {
diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs
index 2d9417d8b..0507a9308 100644
--- a/datafusion/core/benches/sort.rs
+++ b/datafusion/core/benches/sort.rs
@@ -29,6 +29,7 @@ use arrow::{
/// Benchmarks for SortExec
use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::{
execution::context::TaskContext,
physical_plan::{memory::MemoryExec, sorts::sort::SortExec, ExecutionPlan},
@@ -104,7 +105,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || case.run())
});
c.bench_function("sort utf8 low cardinality preserve partitioning", |b| {
- let case = SortBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS);
+ let case =
SortBenchCasePreservePartitioning::new(&UTF8_LOW_CARDINALITY_STREAMS);
b.iter(move || case.run())
});
@@ -199,7 +200,8 @@ impl SortBenchCase {
let projection = None;
let exec = MemoryExec::try_new(partitions, schema,
projection).unwrap();
- let plan = Arc::new(SortExec::try_new(sort, Arc::new(exec),
None).unwrap());
+ let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
+ let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap());
Self {
runtime,