buraksenn commented on code in PR #21073:
URL: https://github.com/apache/datafusion/pull/21073#discussion_r2967379268
##########
datafusion/core/benches/topk_aggregate.rs:
##########
@@ -290,152 +290,107 @@ fn criterion_benchmark(c: &mut Criterion) {
let limit = LIMIT;
let partitions = 10;
let samples = 1_000_000;
+ let total_rows = partitions * samples;
+
+ // Numeric aggregate benchmarks
+ // (asc, use_topk, use_view, run_asc)
+ let numeric_cases: &[(bool, bool, bool, bool, &str)] = &[
+ (
+ false,
+ false,
+ false,
+ false,
+ "aggregate {rows} time-series rows",
+ ),
+ (true, false, false, true, "aggregate {rows} worst-case rows"),
+ (
+ false,
+ true,
+ false,
+ false,
+ "top k={limit} aggregate {rows} time-series rows",
+ ),
+ (
+ true,
+ true,
+ false,
+ true,
+ "top k={limit} aggregate {rows} worst-case rows",
+ ),
+ (
+ false,
+ true,
+ true,
+ false,
+ "top k={limit} aggregate {rows} time-series rows [Utf8View]",
+ ),
+ (
+ true,
+ true,
+ true,
+ true,
+ "top k={limit} aggregate {rows} worst-case rows [Utf8View]",
+ ),
+ ];
+ for &(asc, use_topk, use_view, run_asc, name_tpl) in numeric_cases {
+ let name = name_tpl
+ .replace("{rows}", &total_rows.to_string())
+ .replace("{limit}", &limit.to_string());
+ let ctx = rt
+ .block_on(create_context(partitions, samples, asc, use_topk,
use_view))
+ .unwrap();
+ c.bench_function(&name, |b| {
+ b.iter(|| run(&rt, ctx.clone(), limit, use_topk, run_asc))
+ });
+ }
- let ctx = rt
- .block_on(create_context(partitions, samples, false, false, false))
- .unwrap();
- c.bench_function(
- format!("aggregate {} time-series rows", partitions *
samples).as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, false, false))
- .unwrap();
- c.bench_function(
- format!("aggregate {} worst-case rows", partitions * samples).as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} time-series rows",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} worst-case rows",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
- );
-
- // Utf8View schema,time-series rows
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} time-series rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
- );
-
- // Utf8View schema,worst-case rows
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} worst-case rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
- );
-
- // String aggregate benchmarks - grouping by timestamp, aggregating string
column
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} time-series rows [Utf8]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} worst-case rows [Utf8]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} time-series rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} worst-case rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
+ // String aggregate benchmarks
+ // (asc, use_topk, use_view, scenario)
+ let string_cases: &[(bool, bool, bool)] = &[
+ (false, false, false),
+ (true, false, false),
+ (false, false, true),
+ (true, false, true),
+ (false, true, false),
+ (true, true, false),
+ (false, true, true),
+ (true, true, true),
+ ];
+ for &(asc, use_topk, use_view) in string_cases {
+ let scenario = if asc { "worst-case" } else { "time-series" };
+ let type_label = if use_view { "Utf8View" } else { "Utf8" };
+ let name = if use_topk {
+ format!(
+ "top k={limit} string aggregate {total_rows} {scenario} rows
[{type_label}]"
+ )
+ } else {
+ format!("string aggregate {total_rows} {scenario} rows
[{type_label}]")
+ };
+ let ctx = rt
+ .block_on(create_context(partitions, samples, asc, use_topk,
use_view))
+ .unwrap();
+ c.bench_function(&name, |b| {
+ b.iter(|| run_string(&rt, ctx.clone(), limit, use_topk))
+ });
+ }
// DISTINCT benchmarks
- let ctx = rt.block_on(async {
- create_context_distinct(partitions, samples, false)
- .await
- .unwrap()
- });
- c.bench_function(
- format!("distinct {} rows desc [no TopK]", partitions *
samples).as_str(),
- |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, false)),
- );
-
- c.bench_function(
- format!("distinct {} rows asc [no TopK]", partitions *
samples).as_str(),
- |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, true)),
- );
-
- let ctx_topk = rt.block_on(async {
- create_context_distinct(partitions, samples, true)
- .await
- .unwrap()
- });
- c.bench_function(
- format!("distinct {} rows desc [TopK]", partitions * samples).as_str(),
- |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, false)),
- );
-
- c.bench_function(
- format!("distinct {} rows asc [TopK]", partitions * samples).as_str(),
- |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, true)),
- );
+ // (use_topk, asc)
+ let distinct_cases: &[(bool, bool)] =
+ &[(false, false), (false, true), (true, false), (true, true)];
+ for &(use_topk, asc) in distinct_cases {
+ let dir = if asc { "asc" } else { "desc" };
+ let topk_label = if use_topk { "TopK" } else { "no TopK" };
+ let name = format!("distinct {total_rows} rows {dir} [{topk_label}]");
+ let ctx = rt.block_on(async {
Review Comment:
I understand I think I've resolved this by using and cloning the session
(shallow copy afais) but can you check if possible
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]