kosiew commented on code in PR #21073:
URL: https://github.com/apache/datafusion/pull/21073#discussion_r2964649276


##########
datafusion/core/benches/topk_aggregate.rs:
##########
@@ -290,152 +290,107 @@ fn criterion_benchmark(c: &mut Criterion) {
     let limit = LIMIT;
     let partitions = 10;
     let samples = 1_000_000;
+    let total_rows = partitions * samples;
+
+    // Numeric aggregate benchmarks
+    // (asc, use_topk, use_view, run_asc)
+    let numeric_cases: &[(bool, bool, bool, bool, &str)] = &[
+        (
+            false,
+            false,
+            false,
+            false,
+            "aggregate {rows} time-series rows",
+        ),
+        (true, false, false, true, "aggregate {rows} worst-case rows"),
+        (
+            false,
+            true,
+            false,
+            false,
+            "top k={limit} aggregate {rows} time-series rows",
+        ),
+        (
+            true,
+            true,
+            false,
+            true,
+            "top k={limit} aggregate {rows} worst-case rows",
+        ),
+        (
+            false,
+            true,
+            true,
+            false,
+            "top k={limit} aggregate {rows} time-series rows [Utf8View]",
+        ),
+        (
+            true,
+            true,
+            true,
+            true,
+            "top k={limit} aggregate {rows} worst-case rows [Utf8View]",
+        ),
+    ];
+    for &(asc, use_topk, use_view, run_asc, name_tpl) in numeric_cases {

Review Comment:
   Could this tuple be simplified a bit? run_asc currently mirrors asc in every 
entry, so carrying both booleans makes the benchmark matrix harder to audit and 
easier to desynchronize later. Passing asc straight through (or switching to a 
small case struct with named fields) would make the intent clearer.



##########
datafusion/core/benches/topk_aggregate.rs:
##########
@@ -290,152 +290,107 @@ fn criterion_benchmark(c: &mut Criterion) {
     let limit = LIMIT;
     let partitions = 10;
     let samples = 1_000_000;
+    let total_rows = partitions * samples;
+
+    // Numeric aggregate benchmarks
+    // (asc, use_topk, use_view, run_asc)
+    let numeric_cases: &[(bool, bool, bool, bool, &str)] = &[
+        (
+            false,
+            false,
+            false,
+            false,
+            "aggregate {rows} time-series rows",
+        ),
+        (true, false, false, true, "aggregate {rows} worst-case rows"),
+        (
+            false,
+            true,
+            false,
+            false,
+            "top k={limit} aggregate {rows} time-series rows",
+        ),
+        (
+            true,
+            true,
+            false,
+            true,
+            "top k={limit} aggregate {rows} worst-case rows",
+        ),
+        (
+            false,
+            true,
+            true,
+            false,
+            "top k={limit} aggregate {rows} time-series rows [Utf8View]",
+        ),
+        (
+            true,
+            true,
+            true,
+            true,
+            "top k={limit} aggregate {rows} worst-case rows [Utf8View]",
+        ),
+    ];
+    for &(asc, use_topk, use_view, run_asc, name_tpl) in numeric_cases {
+        let name = name_tpl
+            .replace("{rows}", &total_rows.to_string())
+            .replace("{limit}", &limit.to_string());
+        let ctx = rt
+            .block_on(create_context(partitions, samples, asc, use_topk, 
use_view))
+            .unwrap();
+        c.bench_function(&name, |b| {
+            b.iter(|| run(&rt, ctx.clone(), limit, use_topk, run_asc))
+        });
+    }
 
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, false, false))
-        .unwrap();
-    c.bench_function(
-        format!("aggregate {} time-series rows", partitions * 
samples).as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, false, false))
-        .unwrap();
-    c.bench_function(
-        format!("aggregate {} worst-case rows", partitions * samples).as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} time-series rows",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} worst-case rows",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
-    );
-
-    // Utf8View schema,time-series rows
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} time-series rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
-    );
-
-    // Utf8View schema,worst-case rows
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} worst-case rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
-    );
-
-    // String aggregate benchmarks - grouping by timestamp, aggregating string 
column
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} time-series rows [Utf8]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} worst-case rows [Utf8]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} time-series rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} worst-case rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
+    // String aggregate benchmarks
+    // (asc, use_topk, use_view, scenario)
+    let string_cases: &[(bool, bool, bool)] = &[
+        (false, false, false),
+        (true, false, false),
+        (false, false, true),
+        (true, false, true),
+        (false, true, false),
+        (true, true, false),
+        (false, true, true),
+        (true, true, true),
+    ];
+    for &(asc, use_topk, use_view) in string_cases {
+        let scenario = if asc { "worst-case" } else { "time-series" };
+        let type_label = if use_view { "Utf8View" } else { "Utf8" };
+        let name = if use_topk {
+            format!(
+                "top k={limit} string aggregate {total_rows} {scenario} rows 
[{type_label}]"
+            )
+        } else {
+            format!("string aggregate {total_rows} {scenario} rows 
[{type_label}]")
+        };
+        let ctx = rt
+            .block_on(create_context(partitions, samples, asc, use_topk, 
use_view))
+            .unwrap();
+        c.bench_function(&name, |b| {
+            b.iter(|| run_string(&rt, ctx.clone(), limit, use_topk))
+        });
+    }
 
     // DISTINCT benchmarks
-    let ctx = rt.block_on(async {
-        create_context_distinct(partitions, samples, false)
-            .await
-            .unwrap()
-    });
-    c.bench_function(
-        format!("distinct {} rows desc [no TopK]", partitions * 
samples).as_str(),
-        |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, false)),
-    );
-
-    c.bench_function(
-        format!("distinct {} rows asc [no TopK]", partitions * 
samples).as_str(),
-        |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, true)),
-    );
-
-    let ctx_topk = rt.block_on(async {
-        create_context_distinct(partitions, samples, true)
-            .await
-            .unwrap()
-    });
-    c.bench_function(
-        format!("distinct {} rows desc [TopK]", partitions * samples).as_str(),
-        |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, false)),
-    );
-
-    c.bench_function(
-        format!("distinct {} rows asc [TopK]", partitions * samples).as_str(),
-        |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, true)),
-    );
+    // (use_topk, asc)
+    let distinct_cases: &[(bool, bool)] =
+        &[(false, false), (false, true), (true, false), (true, true)];
+    for &(use_topk, asc) in distinct_cases {
+        let dir = if asc { "asc" } else { "desc" };
+        let topk_label = if use_topk { "TopK" } else { "no TopK" };
+        let name = format!("distinct {total_rows} rows {dir} [{topk_label}]");
+        let ctx = rt.block_on(async {

Review Comment:
   This refactor now rebuilds the DISTINCT input/context once per (use_topk, 
asc) pair, even though asc only affects the query text and the old version 
shared one context per TopK mode. It is outside the timed loop, so not a 
benchmark-result bug, but it does add a lot of setup work for 10M rows. Could 
we hoist context creation by use_topk again, or extract a small helper that 
caches the two contexts?



##########
datafusion/core/benches/topk_aggregate.rs:
##########
@@ -290,152 +290,107 @@ fn criterion_benchmark(c: &mut Criterion) {
     let limit = LIMIT;
     let partitions = 10;
     let samples = 1_000_000;
+    let total_rows = partitions * samples;
+
+    // Numeric aggregate benchmarks
+    // (asc, use_topk, use_view, run_asc)
+    let numeric_cases: &[(bool, bool, bool, bool, &str)] = &[
+        (
+            false,
+            false,
+            false,
+            false,
+            "aggregate {rows} time-series rows",
+        ),
+        (true, false, false, true, "aggregate {rows} worst-case rows"),
+        (
+            false,
+            true,
+            false,
+            false,
+            "top k={limit} aggregate {rows} time-series rows",
+        ),
+        (
+            true,
+            true,
+            false,
+            true,
+            "top k={limit} aggregate {rows} worst-case rows",
+        ),
+        (
+            false,
+            true,
+            true,
+            false,
+            "top k={limit} aggregate {rows} time-series rows [Utf8View]",
+        ),
+        (
+            true,
+            true,
+            true,
+            true,
+            "top k={limit} aggregate {rows} worst-case rows [Utf8View]",
+        ),
+    ];
+    for &(asc, use_topk, use_view, run_asc, name_tpl) in numeric_cases {
+        let name = name_tpl
+            .replace("{rows}", &total_rows.to_string())
+            .replace("{limit}", &limit.to_string());
+        let ctx = rt
+            .block_on(create_context(partitions, samples, asc, use_topk, 
use_view))
+            .unwrap();
+        c.bench_function(&name, |b| {
+            b.iter(|| run(&rt, ctx.clone(), limit, use_topk, run_asc))
+        });
+    }
 
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, false, false))
-        .unwrap();
-    c.bench_function(
-        format!("aggregate {} time-series rows", partitions * 
samples).as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, false, false))
-        .unwrap();
-    c.bench_function(
-        format!("aggregate {} worst-case rows", partitions * samples).as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} time-series rows",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} worst-case rows",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
-    );
-
-    // Utf8View schema,time-series rows
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} time-series rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
-    );
-
-    // Utf8View schema,worst-case rows
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} aggregate {} worst-case rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
-    );
-
-    // String aggregate benchmarks - grouping by timestamp, aggregating string 
column
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} time-series rows [Utf8]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, false))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} worst-case rows [Utf8]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, false, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} time-series rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
-
-    let ctx = rt
-        .block_on(create_context(partitions, samples, true, true, true))
-        .unwrap();
-    c.bench_function(
-        format!(
-            "top k={limit} string aggregate {} worst-case rows [Utf8View]",
-            partitions * samples
-        )
-        .as_str(),
-        |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
-    );
+    // String aggregate benchmarks
+    // (asc, use_topk, use_view, scenario)
+    let string_cases: &[(bool, bool, bool)] = &[
+        (false, false, false),
+        (true, false, false),
+        (false, false, true),
+        (true, false, true),
+        (false, true, false),
+        (true, true, false),
+        (false, true, true),
+        (true, true, true),
+    ];
+    for &(asc, use_topk, use_view) in string_cases {
+        let scenario = if asc { "worst-case" } else { "time-series" };
+        let type_label = if use_view { "Utf8View" } else { "Utf8" };
+        let name = if use_topk {
+            format!(
+                "top k={limit} string aggregate {total_rows} {scenario} rows 
[{type_label}]"
+            )
+        } else {
+            format!("string aggregate {total_rows} {scenario} rows 
[{type_label}]")
+        };
+        let ctx = rt
+            .block_on(create_context(partitions, samples, asc, use_topk, 
use_view))
+            .unwrap();
+        c.bench_function(&name, |b| {

Review Comment:
   These new TopK-disabled string cases expand the matrix, but they still only 
go through run_string()/aggregate_string(), which currently checks row-count 
and whether the physical plan contains lim=[...]. That means this PR does not 
actually verify correctness across Utf8 and Utf8View group keys, even though 
that is part of the motivation. Can we strengthen the string benchmark path 
with an expected-result assertion (or add a dedicated helper/test that compares 
Utf8 vs Utf8View output for both TopK modes) so the new variants catch 
ordering/value regressions instead of only plan-shape changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to