This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3a65be0dca Add benchmark for planning sorted unions (#14157)
3a65be0dca is described below
commit 3a65be0dca36c9da36d1cd9509fa89b78e49dacc
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Jan 19 13:35:11 2025 -0500
Add benchmark for planning sorted unions (#14157)
---
datafusion/core/benches/sql_planner.rs | 84 ++++++++++++++++++++++++++++++++++
1 file changed, 84 insertions(+)
diff --git a/datafusion/core/benches/sql_planner.rs
b/datafusion/core/benches/sql_planner.rs
index 44320e7a28..a1b339eea3 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -24,10 +24,12 @@ mod data_utils;
use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
+use arrow_array::{ArrayRef, RecordBatch};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
+use datafusion_expr::col;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
@@ -147,6 +149,77 @@ fn benchmark_with_param_values_many_columns(ctx:
&SessionContext, b: &mut Benche
});
}
+/// Registers a table like this:
+/// c0,c1,c2...,c99
+/// 0,100...9900
+/// 0,200...19800
+/// 0,300...29700
+fn register_union_order_table(ctx: &SessionContext, num_columns: usize,
num_rows: usize) {
+ // ("c0", [0, 0, ...])
+ // ("c1": [100, 200, ...])
+ // etc
+ let iter = (0..num_columns).map(|i| i as u64).map(|i| {
+ let array: ArrayRef =
Arc::new(arrow::array::UInt64Array::from_iter_values(
+ (0..num_rows)
+ .map(|j| j as u64 * 100 + i)
+ .collect::<Vec<_>>(),
+ ));
+ (format!("c{}", i), array)
+ });
+ let batch = RecordBatch::try_from_iter(iter).unwrap();
+ let schema = batch.schema();
+ let partitions = vec![vec![batch]];
+
+ // tell DataFusion that the table is sorted by all columns
+ let sort_order = (0..num_columns)
+ .map(|i| col(format!("c{}", i)).sort(true, true))
+ .collect::<Vec<_>>();
+
+ // create the table
+ let table = MemTable::try_new(schema, partitions)
+ .unwrap()
+ .with_sort_order(vec![sort_order]);
+
+ ctx.register_table("t", Arc::new(table)).unwrap();
+}
+
+/// return a query like
+/// ```sql
+/// select c1, null as c2, ... null as cn from t ORDER BY c1
+/// UNION ALL
+/// select null as c1, c2, ... null as cn from t ORDER BY c2
+/// ...
+/// select null as c1, null as c2, ... cn from t ORDER BY cn
+/// ORDER BY c1, c2 ... CN
+/// ```
+fn union_orderby_query(n: usize) -> String {
+ let mut query = String::new();
+ for i in 0..n {
+ if i != 0 {
+ query.push_str("\n UNION ALL \n");
+ }
+ let select_list = (0..n)
+ .map(|j| {
+ if i == j {
+ format!("c{j}")
+ } else {
+ format!("null as c{j}")
+ }
+ })
+ .collect::<Vec<_>>()
+ .join(", ");
+ query.push_str(&format!("(SELECT {} FROM t ORDER BY c{})",
select_list, i));
+ }
+ query.push_str(&format!(
+ "\nORDER BY {}",
+ (0..n)
+ .map(|i| format!("c{}", i))
+ .collect::<Vec<_>>()
+ .join(", ")
+ ));
+ query
+}
+
fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the
benchmark
if
!PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
@@ -289,6 +362,17 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});
+ // -- Sorted Queries --
+ register_union_order_table(&ctx, 100, 1000);
+
+ // this query has many expressions in its sort order so stresses
+ // order equivalence validation
+ c.bench_function("physical_sorted_union_orderby", |b| {
+ // SELECT ... UNION ALL ...
+ let query = union_orderby_query(20);
+ b.iter(|| physical_plan(&ctx, &query))
+ });
+
// --- TPC-H ---
let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]