This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e13c6537c Fix SortExec bench case and Add SortExec input cases to 
bench for SortPreservingMergeExec (#5308)
e13c6537c is described below

commit e13c6537c5222a9254a6d614b0edfefd167a5b0e
Author: Jay Miller <[email protected]>
AuthorDate: Mon Feb 20 08:27:05 2023 -0500

    Fix SortExec bench case and Add SortExec input cases to bench for 
SortPreservingMergeExec (#5308)
    
    * add SortExec input case to each merge bench case
    
    * fix lil typo error in sort bench
    
    * fix sort bench to actually use full data set in non-preserve partition 
case
---
 datafusion/core/benches/merge.rs | 72 ++++++++++++++++++++++++++++++++++++++++
 datafusion/core/benches/sort.rs  |  6 ++--
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/benches/merge.rs b/datafusion/core/benches/merge.rs
index a7ac6cd41..f1c473603 100644
--- a/datafusion/core/benches/merge.rs
+++ b/datafusion/core/benches/merge.rs
@@ -80,6 +80,7 @@ use arrow::{
 
 /// Benchmarks for SortPreservingMerge stream
 use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::{
     execution::context::TaskContext,
     physical_plan::{
@@ -136,11 +137,22 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(move || case.run())
     });
 
+    c.bench_function("merge i64 SortExec input", |b| {
+        let case = MergeBenchCase::new_with_sort_input(&I64_STREAMS);
+
+        b.iter(move || case.run())
+    });
+
     c.bench_function("merge f64", |b| {
         let case = MergeBenchCase::new(&F64_STREAMS);
 
         b.iter(move || case.run())
     });
+    c.bench_function("merge f64 SortExec input", |b| {
+        let case = MergeBenchCase::new_with_sort_input(&F64_STREAMS);
+
+        b.iter(move || case.run())
+    });
 
     c.bench_function("merge utf8 low cardinality", |b| {
         let case = MergeBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS);
@@ -148,39 +160,79 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(move || case.run())
     });
 
+    c.bench_function("merge utf8 low cardinality SortExec", |b| {
+        let case = 
MergeBenchCase::new_with_sort_input(&UTF8_LOW_CARDINALITY_STREAMS);
+
+        b.iter(move || case.run())
+    });
+
     c.bench_function("merge utf8 high cardinality", |b| {
         let case = MergeBenchCase::new(&UTF8_HIGH_CARDINALITY_STREAMS);
 
         b.iter(move || case.run())
     });
 
+    c.bench_function("merge utf8 high cardinality SortExec input", |b| {
+        let case = 
MergeBenchCase::new_with_sort_input(&UTF8_HIGH_CARDINALITY_STREAMS);
+
+        b.iter(move || case.run())
+    });
+
     c.bench_function("merge utf8 tuple", |b| {
         let case = MergeBenchCase::new(&UTF8_TUPLE_STREAMS);
 
         b.iter(move || case.run())
     });
 
+    c.bench_function("merge utf8 tuple SortExec input", |b| {
+        let case = MergeBenchCase::new_with_sort_input(&UTF8_TUPLE_STREAMS);
+
+        b.iter(move || case.run())
+    });
+
     c.bench_function("merge utf8 dictionary", |b| {
         let case = MergeBenchCase::new(&DICTIONARY_STREAMS);
 
         b.iter(move || case.run())
     });
 
+    c.bench_function("merge utf8 dictionary SortExec input", |b| {
+        let case = MergeBenchCase::new_with_sort_input(&DICTIONARY_STREAMS);
+
+        b.iter(move || case.run())
+    });
+
     c.bench_function("merge utf8 dictionary tuple", |b| {
         let case = MergeBenchCase::new(&DICTIONARY_TUPLE_STREAMS);
         b.iter(move || case.run())
     });
 
+    c.bench_function("merge utf8 dictionary tuple SortExec input", |b| {
+        let case = 
MergeBenchCase::new_with_sort_input(&DICTIONARY_TUPLE_STREAMS);
+        b.iter(move || case.run())
+    });
+
     c.bench_function("merge mixed utf8 dictionary tuple", |b| {
         let case = MergeBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS);
         b.iter(move || case.run())
     });
 
+    c.bench_function("merge mixed utf8 dictionary tuple SortExec input", |b| {
+        let case = 
MergeBenchCase::new_with_sort_input(&MIXED_DICTIONARY_TUPLE_STREAMS);
+        b.iter(move || case.run())
+    });
+
     c.bench_function("merge mixed tuple", |b| {
         let case = MergeBenchCase::new(&MIXED_TUPLE_STREAMS);
 
         b.iter(move || case.run())
     });
+
+    c.bench_function("merge mixed tuple SortExec input", |b| {
+        let case = MergeBenchCase::new_with_sort_input(&MIXED_TUPLE_STREAMS);
+
+        b.iter(move || case.run())
+    });
 }
 
 /// Encapsulates running each test case
@@ -214,6 +266,26 @@ impl MergeBenchCase {
         }
     }
 
+    fn new_with_sort_input(partitions: &[Vec<RecordBatch>]) -> Self {
+        let runtime = 
tokio::runtime::Builder::new_multi_thread().build().unwrap();
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let schema = partitions[0][0].schema();
+        let sort = make_sort_exprs(schema.as_ref());
+
+        let projection = None;
+        let exec = Arc::new(MemoryExec::try_new(partitions, schema, 
projection).unwrap());
+        let sort_exec = SortExec::try_new(sort.to_owned(), exec, 
None).unwrap();
+        let plan = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(sort_exec)));
+
+        Self {
+            runtime,
+            task_ctx,
+            plan,
+        }
+    }
+
     /// runs the specified plan to completion, draining all input and
     /// panic'ing on error
     fn run(&self) {
diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs
index 2d9417d8b..0507a9308 100644
--- a/datafusion/core/benches/sort.rs
+++ b/datafusion/core/benches/sort.rs
@@ -29,6 +29,7 @@ use arrow::{
 
 /// Benchmarks for SortExec
 use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::{
     execution::context::TaskContext,
     physical_plan::{memory::MemoryExec, sorts::sort::SortExec, ExecutionPlan},
@@ -104,7 +105,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(move || case.run())
     });
     c.bench_function("sort utf8 low cardinality preserve partitioning", |b| {
-        let case = SortBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS);
+        let case = 
SortBenchCasePreservePartitioning::new(&UTF8_LOW_CARDINALITY_STREAMS);
 
         b.iter(move || case.run())
     });
@@ -199,7 +200,8 @@ impl SortBenchCase {
 
         let projection = None;
         let exec = MemoryExec::try_new(partitions, schema, 
projection).unwrap();
-        let plan = Arc::new(SortExec::try_new(sort, Arc::new(exec), 
None).unwrap());
+        let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
+        let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap());
 
         Self {
             runtime,

Reply via email to