This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 8c65a41fe4 Expand sql_planner benchmark for benchmarking physical and logical optimization. (#17276) 8c65a41fe4 is described below commit 8c65a41fe42cb0e5ebac9c8a83a794754e5e3545 Author: Bruce Ritchie <bruce.ritc...@veeva.com> AuthorDate: Sun Aug 24 06:46:34 2025 -0400 Expand sql_planner benchmark for benchmarking physical and logical optimization. (#17276) * Updating physical and adding a logical benchmark for large # of columns. * Removed unsused import. --- datafusion/core/benches/sql_planner.rs | 225 +++++++++++++++++++++++++++++++-- 1 file changed, 212 insertions(+), 13 deletions(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index d02478d2b4..b8413344e4 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -25,11 +25,18 @@ mod data_utils; use crate::criterion::Criterion; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::{DataType, Field, Fields, Schema}; +use arrow_schema::TimeUnit::Nanosecond; use criterion::Bencher; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; +use datafusion::prelude::DataFrame; use datafusion_common::ScalarValue; -use datafusion_expr::col; +use datafusion_expr::Expr::Literal; +use datafusion_expr::{cast, col, lit, not, try_cast, when}; +use datafusion_functions::expr_fn::{ + btrim, length, regexp_like, regexp_replace, to_timestamp, upper, +}; +use std::ops::Rem; use std::path::PathBuf; use std::sync::Arc; use test_utils::tpcds::tpcds_schemas; @@ -58,6 +65,150 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) { })); } +/// Build a dataframe for testing logical plan optimization +fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { + register_string_table(ctx, 100, 1000); + + rt.block_on(async { + let mut df = ctx.table("t").await.unwrap(); + // add some columns in + for i in 100..150 { + df = df + .with_column(&format!("c{i}"), Literal(ScalarValue::Utf8(None), None)) + .unwrap(); + } + // add in some columns with string encoded timestamps + for i in 150..175 { + df = df + .with_column( + &format!("c{i}"), + Literal(ScalarValue::Utf8(Some("2025-08-21 09:43:17".into())), None), + ) + .unwrap(); + } + // do a bunch of ops on the columns + for i in 0..175 { + // trim the columns + df = df + .with_column(&format!("c{i}"), btrim(vec![col(format!("c{i}"))])) + .unwrap(); + } + + for i in 0..175 { + let c_name = format!("c{i}"); + let c = col(&c_name); + + // random ops + if i % 5 == 0 && i < 150 { + // the actual ops here are largely unimportant as they are just a sample + // of ops that could occur on a dataframe + df = df + .with_column(&c_name, cast(c.clone(), DataType::Utf8)) + .unwrap() + .with_column( + &c_name, + when( + cast(c.clone(), DataType::Int32).gt(lit(135)), + cast( + cast(c.clone(), DataType::Int32) - lit(i + 3), + DataType::Utf8, + ), + ) + .otherwise(c.clone()) + .unwrap(), + ) + .unwrap() + .with_column( + &c_name, + when( + c.clone().is_not_null().and( + cast(c.clone(), DataType::Int32) + .between(lit(120), lit(130)), + ), + Literal(ScalarValue::Utf8(None), None), + ) + .otherwise( + when( + c.clone().is_not_null().and(regexp_like( + cast(c.clone(), DataType::Utf8View), + lit("[0-9]*"), + None, + )), + upper(c.clone()), + ) + .otherwise(c.clone()) + .unwrap(), + ) + .unwrap(), + ) + .unwrap() + .with_column( + &c_name, + when( + c.clone().is_not_null().and( + cast(c.clone(), DataType::Int32) + .between(lit(90), lit(100)), + ), + cast(c.clone(), DataType::Utf8View), + ) + .otherwise(Literal(ScalarValue::Date32(None), None)) + .unwrap(), + ) + .unwrap() + .with_column( + &c_name, + when( + c.clone().is_not_null().and( + cast(c.clone(), DataType::Int32).rem(lit(10)).gt(lit(7)), + ), + regexp_replace( + cast(c.clone(), DataType::Utf8View), + lit("1"), + lit("a"), + None, + ), + ) + .otherwise(Literal(ScalarValue::Date32(None), None)) + .unwrap(), + ) + .unwrap() + } + if i >= 150 { + df = df + .with_column( + &c_name, + try_cast( + to_timestamp(vec![c.clone(), lit("%Y-%m-%d %H:%M:%S")]), + DataType::Timestamp(Nanosecond, Some("UTC".into())), + ), + ) + .unwrap() + .with_column(&c_name, try_cast(c.clone(), DataType::Date32)) + .unwrap() + } + + // add in a few unions + if i % 30 == 0 { + let df1 = df + .clone() + .filter(length(c.clone()).gt(lit(2))) + .unwrap() + .with_column(&format!("c{i}_filtered"), lit(true)) + .unwrap(); + let df2 = df + .filter(not(length(c.clone()).gt(lit(2)))) + .unwrap() + .with_column(&format!("c{i}_filtered"), lit(false)) + .unwrap(); + + df = df1.union_by_name(df2).unwrap() + } + } + + df + }) +} + /// Create schema with the specified number of columns fn create_schema(column_prefix: &str, num_columns: usize) -> Schema { let fields: Fields = (0..num_columns) @@ -180,13 +331,40 @@ fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows ctx.register_table("t", Arc::new(table)).unwrap(); } +/// Registers a table like this: +/// c0,c1,c2...,c99 +/// "0","100"..."9900" +/// "0","200"..."19800" +/// "0","300"..."29700" +fn register_string_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) { + // ("c0", ["0", "0", ...]) + // ("c1": ["100", "200", ...]) + // etc + let iter = (0..num_columns).map(|i| i as u64).map(|i| { + let array: ArrayRef = Arc::new(arrow::array::StringViewArray::from_iter_values( + (0..num_rows) + .map(|j| format!("c{}", j as u64 * 100 + i)) + .collect::<Vec<_>>(), + )); + (format!("c{i}"), array) + }); + let batch = RecordBatch::try_from_iter(iter).unwrap(); + let schema = batch.schema(); + let partitions = vec![vec![batch]]; + + // create the table + let table = MemTable::try_new(schema, partitions).unwrap(); + + ctx.register_table("t", Arc::new(table)).unwrap(); +} + /// return a query like /// ```sql -/// select c1, null as c2, ... null as cn from t ORDER BY c1 +/// select c1, 2 as c2, ... n as cn from t ORDER BY c1 /// UNION ALL -/// select null as c1, c2, ... null as cn from t ORDER BY c2 +/// select 1 as c1, c2, ... n as cn from t ORDER BY c2 /// ... -/// select null as c1, null as c2, ... cn from t ORDER BY cn +/// select 1 as c1, 2 as c2, ... cn from t ORDER BY cn /// ORDER BY c1, c2 ... CN /// ``` fn union_orderby_query(n: usize) -> String { @@ -200,7 +378,7 @@ fn union_orderby_query(n: usize) -> String { if i == j { format!("c{j}") } else { - format!("null as c{j}") + format!("{j} as c{j}") } }) .collect::<Vec<_>>() @@ -370,16 +548,37 @@ fn criterion_benchmark(c: &mut Criterion) { }); // -- Sorted Queries -- - register_union_order_table(&ctx, 100, 1000); - - // this query has many expressions in its sort order so stresses - // order equivalence validation - c.bench_function("physical_sorted_union_orderby", |b| { - // SELECT ... UNION ALL ... - let query = union_orderby_query(20); - b.iter(|| physical_plan(&ctx, &rt, &query)) + for column_count in [10, 50, 100, 200, 300] { + register_union_order_table(&ctx, column_count, 1000); + + // this query has many expressions in its sort order so stresses + // order equivalence validation + c.bench_function( + &format!("physical_sorted_union_order_by_{column_count}"), + |b| { + // SELECT ... UNION ALL ... + let query = union_orderby_query(column_count); + b.iter(|| physical_plan(&ctx, &rt, &query)) + }, + ); + + let _ = ctx.deregister_table("t"); + } + + // -- validate logical plan optimize performance + let df = build_test_data_frame(&ctx, &rt); + + c.bench_function("logical_plan_optimize", |b| { + b.iter(|| { + let df_clone = df.clone(); + criterion::black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) }); + let _ = ctx.deregister_table("t"); + // --- TPC-H --- let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org