This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c65a41fe4 Expand sql_planner benchmark for benchmarking physical and 
logical optimization. (#17276)
8c65a41fe4 is described below

commit 8c65a41fe42cb0e5ebac9c8a83a794754e5e3545
Author: Bruce Ritchie <bruce.ritc...@veeva.com>
AuthorDate: Sun Aug 24 06:46:34 2025 -0400

    Expand sql_planner benchmark for benchmarking physical and logical 
optimization. (#17276)
    
    * Updating physical and adding a logical benchmark for large # of columns.
    
    * Removed unsused import.
---
 datafusion/core/benches/sql_planner.rs | 225 +++++++++++++++++++++++++++++++--
 1 file changed, 212 insertions(+), 13 deletions(-)

diff --git a/datafusion/core/benches/sql_planner.rs 
b/datafusion/core/benches/sql_planner.rs
index d02478d2b4..b8413344e4 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -25,11 +25,18 @@ mod data_utils;
 use crate::criterion::Criterion;
 use arrow::array::{ArrayRef, RecordBatch};
 use arrow::datatypes::{DataType, Field, Fields, Schema};
+use arrow_schema::TimeUnit::Nanosecond;
 use criterion::Bencher;
 use datafusion::datasource::MemTable;
 use datafusion::execution::context::SessionContext;
+use datafusion::prelude::DataFrame;
 use datafusion_common::ScalarValue;
-use datafusion_expr::col;
+use datafusion_expr::Expr::Literal;
+use datafusion_expr::{cast, col, lit, not, try_cast, when};
+use datafusion_functions::expr_fn::{
+    btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
+};
+use std::ops::Rem;
 use std::path::PathBuf;
 use std::sync::Arc;
 use test_utils::tpcds::tpcds_schemas;
@@ -58,6 +65,150 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: 
&str) {
     }));
 }
 
+/// Build a dataframe for testing logical plan optimization
+fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
+    register_string_table(ctx, 100, 1000);
+
+    rt.block_on(async {
+        let mut df = ctx.table("t").await.unwrap();
+        // add some columns in
+        for i in 100..150 {
+            df = df
+                .with_column(&format!("c{i}"), 
Literal(ScalarValue::Utf8(None), None))
+                .unwrap();
+        }
+        // add in some columns with string encoded timestamps
+        for i in 150..175 {
+            df = df
+                .with_column(
+                    &format!("c{i}"),
+                    Literal(ScalarValue::Utf8(Some("2025-08-21 
09:43:17".into())), None),
+                )
+                .unwrap();
+        }
+        // do a bunch of ops on the columns
+        for i in 0..175 {
+            // trim the columns
+            df = df
+                .with_column(&format!("c{i}"), 
btrim(vec![col(format!("c{i}"))]))
+                .unwrap();
+        }
+
+        for i in 0..175 {
+            let c_name = format!("c{i}");
+            let c = col(&c_name);
+
+            // random ops
+            if i % 5 == 0 && i < 150 {
+                // the actual ops here are largely unimportant as they are 
just a sample
+                // of ops that could occur on a dataframe
+                df = df
+                    .with_column(&c_name, cast(c.clone(), DataType::Utf8))
+                    .unwrap()
+                    .with_column(
+                        &c_name,
+                        when(
+                            cast(c.clone(), DataType::Int32).gt(lit(135)),
+                            cast(
+                                cast(c.clone(), DataType::Int32) - lit(i + 3),
+                                DataType::Utf8,
+                            ),
+                        )
+                        .otherwise(c.clone())
+                        .unwrap(),
+                    )
+                    .unwrap()
+                    .with_column(
+                        &c_name,
+                        when(
+                            c.clone().is_not_null().and(
+                                cast(c.clone(), DataType::Int32)
+                                    .between(lit(120), lit(130)),
+                            ),
+                            Literal(ScalarValue::Utf8(None), None),
+                        )
+                        .otherwise(
+                            when(
+                                c.clone().is_not_null().and(regexp_like(
+                                    cast(c.clone(), DataType::Utf8View),
+                                    lit("[0-9]*"),
+                                    None,
+                                )),
+                                upper(c.clone()),
+                            )
+                            .otherwise(c.clone())
+                            .unwrap(),
+                        )
+                        .unwrap(),
+                    )
+                    .unwrap()
+                    .with_column(
+                        &c_name,
+                        when(
+                            c.clone().is_not_null().and(
+                                cast(c.clone(), DataType::Int32)
+                                    .between(lit(90), lit(100)),
+                            ),
+                            cast(c.clone(), DataType::Utf8View),
+                        )
+                        .otherwise(Literal(ScalarValue::Date32(None), None))
+                        .unwrap(),
+                    )
+                    .unwrap()
+                    .with_column(
+                        &c_name,
+                        when(
+                            c.clone().is_not_null().and(
+                                cast(c.clone(), 
DataType::Int32).rem(lit(10)).gt(lit(7)),
+                            ),
+                            regexp_replace(
+                                cast(c.clone(), DataType::Utf8View),
+                                lit("1"),
+                                lit("a"),
+                                None,
+                            ),
+                        )
+                        .otherwise(Literal(ScalarValue::Date32(None), None))
+                        .unwrap(),
+                    )
+                    .unwrap()
+            }
+            if i >= 150 {
+                df = df
+                    .with_column(
+                        &c_name,
+                        try_cast(
+                            to_timestamp(vec![c.clone(), lit("%Y-%m-%d 
%H:%M:%S")]),
+                            DataType::Timestamp(Nanosecond, 
Some("UTC".into())),
+                        ),
+                    )
+                    .unwrap()
+                    .with_column(&c_name, try_cast(c.clone(), 
DataType::Date32))
+                    .unwrap()
+            }
+
+            // add in a few unions
+            if i % 30 == 0 {
+                let df1 = df
+                    .clone()
+                    .filter(length(c.clone()).gt(lit(2)))
+                    .unwrap()
+                    .with_column(&format!("c{i}_filtered"), lit(true))
+                    .unwrap();
+                let df2 = df
+                    .filter(not(length(c.clone()).gt(lit(2))))
+                    .unwrap()
+                    .with_column(&format!("c{i}_filtered"), lit(false))
+                    .unwrap();
+
+                df = df1.union_by_name(df2).unwrap()
+            }
+        }
+
+        df
+    })
+}
+
 /// Create schema with the specified number of columns
 fn create_schema(column_prefix: &str, num_columns: usize) -> Schema {
     let fields: Fields = (0..num_columns)
@@ -180,13 +331,40 @@ fn register_union_order_table(ctx: &SessionContext, 
num_columns: usize, num_rows
     ctx.register_table("t", Arc::new(table)).unwrap();
 }
 
+/// Registers a table like this:
+/// c0,c1,c2...,c99
+/// "0","100"..."9900"
+/// "0","200"..."19800"
+/// "0","300"..."29700"
+fn register_string_table(ctx: &SessionContext, num_columns: usize, num_rows: 
usize) {
+    // ("c0", ["0", "0", ...])
+    // ("c1": ["100", "200", ...])
+    // etc
+    let iter = (0..num_columns).map(|i| i as u64).map(|i| {
+        let array: ArrayRef = 
Arc::new(arrow::array::StringViewArray::from_iter_values(
+            (0..num_rows)
+                .map(|j| format!("c{}", j as u64 * 100 + i))
+                .collect::<Vec<_>>(),
+        ));
+        (format!("c{i}"), array)
+    });
+    let batch = RecordBatch::try_from_iter(iter).unwrap();
+    let schema = batch.schema();
+    let partitions = vec![vec![batch]];
+
+    // create the table
+    let table = MemTable::try_new(schema, partitions).unwrap();
+
+    ctx.register_table("t", Arc::new(table)).unwrap();
+}
+
 /// return a query like
 /// ```sql
-/// select c1, null as c2, ... null as cn from t ORDER BY c1
+/// select c1, 2 as c2, ... n as cn from t ORDER BY c1
 ///   UNION ALL
-/// select null as c1, c2, ... null as cn from t ORDER BY c2
+/// select 1 as c1, c2, ... n as cn from t ORDER BY c2
 /// ...
-/// select null as c1, null as c2, ... cn from t ORDER BY cn
+/// select 1 as c1, 2 as c2, ... cn from t ORDER BY cn
 ///  ORDER BY c1, c2 ... CN
 /// ```
 fn union_orderby_query(n: usize) -> String {
@@ -200,7 +378,7 @@ fn union_orderby_query(n: usize) -> String {
                 if i == j {
                     format!("c{j}")
                 } else {
-                    format!("null as c{j}")
+                    format!("{j} as c{j}")
                 }
             })
             .collect::<Vec<_>>()
@@ -370,16 +548,37 @@ fn criterion_benchmark(c: &mut Criterion) {
     });
 
     // -- Sorted Queries --
-    register_union_order_table(&ctx, 100, 1000);
-
-    // this query has many expressions in its sort order so stresses
-    // order equivalence validation
-    c.bench_function("physical_sorted_union_orderby", |b| {
-        // SELECT ... UNION ALL ...
-        let query = union_orderby_query(20);
-        b.iter(|| physical_plan(&ctx, &rt, &query))
+    for column_count in [10, 50, 100, 200, 300] {
+        register_union_order_table(&ctx, column_count, 1000);
+
+        // this query has many expressions in its sort order so stresses
+        // order equivalence validation
+        c.bench_function(
+            &format!("physical_sorted_union_order_by_{column_count}"),
+            |b| {
+                // SELECT ... UNION ALL ...
+                let query = union_orderby_query(column_count);
+                b.iter(|| physical_plan(&ctx, &rt, &query))
+            },
+        );
+
+        let _ = ctx.deregister_table("t");
+    }
+
+    // -- validate logical plan optimize performance
+    let df = build_test_data_frame(&ctx, &rt);
+
+    c.bench_function("logical_plan_optimize", |b| {
+        b.iter(|| {
+            let df_clone = df.clone();
+            criterion::black_box(
+                rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
+            );
+        })
     });
 
+    let _ = ctx.deregister_table("t");
+
     // --- TPC-H ---
 
     let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to