This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 34f75131a feat: make query parameter optional in tpch benchmark (#1391)
34f75131a is described below

commit 34f75131a273e953ffbbc31aa48d6c60fade7dd9
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jan 17 15:50:38 2026 -0700

    feat: make query parameter optional in tpch benchmark (#1391)
    
    When running the tpch benchmark, the --query parameter is now optional.
    If not specified, all 22 TPC-H queries will be run sequentially.
    
    Changes:
    - Make --query optional for both datafusion and ballista benchmarks
    - Run all 22 queries when --query is not specified
    - Only print SQL queries when --debug flag is enabled
    - Write a single JSON output file for the entire benchmark run
    - Fix parquet file path resolution for datafusion benchmarks
    - Simplify output when iterations=1 (no iteration number, no average)
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 benchmarks/src/bin/tpch.rs | 271 +++++++++++++++++++++++++++++----------------
 1 file changed, 174 insertions(+), 97 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index d1401a658..4bfc133ef 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -28,11 +28,14 @@ use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::SessionStateBuilder;
 use datafusion::execution::context::SessionState;
+#[cfg(test)]
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::logical_expr::{Expr, expr::Cast};
 use datafusion::parquet::basic::Compression;
 use datafusion::parquet::file::properties::WriterProperties;
+#[cfg(test)]
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
+#[cfg(test)]
 use datafusion::physical_plan::{collect, displayable};
 use datafusion::prelude::*;
 use datafusion::{
@@ -60,6 +63,7 @@ use std::{
     time::{Instant, SystemTime},
 };
 use structopt::StructOpt;
+#[cfg(test)]
 use tokio::task::JoinHandle;
 
 #[cfg(feature = "snmalloc")]
@@ -72,9 +76,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 #[derive(Debug, StructOpt, Clone)]
 struct BallistaBenchmarkOpt {
-    /// Query number
+    /// Query number (1-22). If not specified, runs all queries.
     #[structopt(short, long)]
-    query: usize,
+    query: Option<usize>,
 
     /// Activate debug mode to see query results
     #[structopt(short, long)]
@@ -122,9 +126,9 @@ struct BallistaBenchmarkOpt {
 
 #[derive(Debug, StructOpt, Clone)]
 struct DataFusionBenchmarkOpt {
-    /// Query number
+    /// Query number (1-22). If not specified, runs all queries.
     #[structopt(short, long)]
-    query: usize,
+    query: Option<usize>,
 
     /// Activate debug mode to see query results
     #[structopt(short, long)]
@@ -283,7 +287,6 @@ async fn main() -> Result<()> {
 #[allow(clippy::await_holding_lock)]
 async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> 
Result<Vec<RecordBatch>> {
     println!("Running benchmarks with the following options: {opt:?}");
-    let mut benchmark_run = BenchmarkRun::new(opt.query);
     let config = SessionConfig::new()
         .with_target_partitions(opt.partitions)
         .with_batch_size(opt.batch_size);
@@ -319,30 +322,65 @@ async fn benchmark_datafusion(opt: 
DataFusionBenchmarkOpt) -> Result<Vec<RecordB
         }
     }
 
-    let mut millis = vec![];
-    // run benchmark
+    // Determine which queries to run
+    let query_numbers: Vec<usize> = opt
+        .query
+        .map(|q| vec![q])
+        .unwrap_or_else(|| (1..=22).collect());
+
+    let mut benchmark_run = BenchmarkRun::new();
     let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
-    for i in 0..opt.iterations {
-        let start = Instant::now();
-        let plans = create_logical_plans(&ctx, opt.query).await?;
-        for plan in plans {
-            result = execute_query(&ctx, &plan, opt.debug).await?;
+
+    for query in query_numbers {
+        let mut query_run = QueryRun::new(query);
+        let mut millis = vec![];
+
+        // run benchmark
+        let sqls = get_query_sql(query)?;
+        if opt.debug {
+            println!("Query {query}:\n{sqls:?}");
+        }
+        for i in 0..opt.iterations {
+            let start = Instant::now();
+            // Execute each SQL statement sequentially (required for queries 
like q15
+            // that create views and then reference them)
+            for sql in &sqls {
+                if opt.debug {
+                    println!("Executing: {sql}");
+                }
+                let df = ctx.sql(sql).await?;
+                result = df.collect().await?;
+            }
+            let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+            if opt.debug {
+                pretty::print_batches(&result)?;
+            }
+            millis.push(elapsed);
+            let row_count = result.iter().map(|b| b.num_rows()).sum();
+            if opt.iterations == 1 {
+                println!(
+                    "Query {} took {:.1} ms and returned {} rows",
+                    query, elapsed, row_count
+                );
+            } else {
+                println!(
+                    "Query {} iteration {} took {:.1} ms and returned {} rows",
+                    query, i, elapsed, row_count
+                );
+            }
+            query_run.add_result(elapsed, row_count);
         }
-        let elapsed = start.elapsed().as_secs_f64() * 1000.0;
-        millis.push(elapsed);
-        let row_count = result.iter().map(|b| b.num_rows()).sum();
-        println!(
-            "Query {} iteration {} took {:.1} ms and returned {} rows",
-            opt.query, i, elapsed, row_count
-        );
-        benchmark_run.add_result(elapsed, row_count);
-    }
 
-    let avg = millis.iter().sum::<f64>() / millis.len() as f64;
-    println!("Query {} avg time: {:.2} ms", opt.query, avg);
+        if opt.iterations > 1 {
+            let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+            println!("Query {} avg time: {:.1} ms", query, avg);
+        }
+
+        benchmark_run.add_query_run(query_run);
+    }
 
     if let Some(path) = &opt.output_path {
-        write_summary_json(&mut benchmark_run, path)?;
+        write_summary_json(&benchmark_run, path)?;
     }
 
     Ok(result)
@@ -350,93 +388,112 @@ async fn benchmark_datafusion(opt: 
DataFusionBenchmarkOpt) -> Result<Vec<RecordB
 
 async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
     println!("Running benchmarks with the following options: {opt:?}");
-    let mut benchmark_run = BenchmarkRun::new(opt.query);
 
-    let config = SessionConfig::new_with_ballista()
-        .with_target_partitions(opt.partitions)
-        .with_ballista_job_name(&format!("Query derived from TPC-H q{}", 
opt.query))
-        .with_batch_size(opt.batch_size)
-        .with_collect_statistics(true);
-
-    let state = SessionStateBuilder::new()
-        .with_default_features()
-        .with_config(config)
-        .build();
     let address = format!(
         "df://{}:{}",
         opt.host.clone().unwrap().as_str(),
         opt.port.unwrap()
     );
-    let ctx = SessionContext::remote_with_state(&address, state).await?;
 
-    // register tables with Ballista context
-    let path = opt.path.to_str().unwrap();
-    let file_format = opt.file_format.as_str();
+    // Determine which queries to run
+    let query_numbers: Vec<usize> = opt
+        .query
+        .map(|q| vec![q])
+        .unwrap_or_else(|| (1..=22).collect());
 
-    register_tables(path, file_format, &ctx, opt.debug).await?;
+    let mut benchmark_run = BenchmarkRun::new();
 
-    let mut millis = vec![];
+    for query in query_numbers {
+        let mut query_run = QueryRun::new(query);
 
-    // run benchmark
-    let queries = get_query_sql(opt.query)?;
-    println!(
-        "Running benchmark with queries {}:\n {:?}",
-        opt.query, queries
-    );
-    let mut batches = vec![];
-    for i in 0..opt.iterations {
-        let start = Instant::now();
-        for sql in &queries {
-            let df = ctx
-                .sql(sql)
-                .await
-                .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
-                .unwrap();
-            let plan = df.clone().into_optimized_plan()?;
+        let config = SessionConfig::new_with_ballista()
+            .with_target_partitions(opt.partitions)
+            .with_ballista_job_name(&format!("Query derived from TPC-H q{}", 
query))
+            .with_batch_size(opt.batch_size)
+            .with_collect_statistics(true);
+
+        let state = SessionStateBuilder::new()
+            .with_default_features()
+            .with_config(config)
+            .build();
+        let ctx = SessionContext::remote_with_state(&address, state).await?;
+
+        // register tables with Ballista context
+        let path = opt.path.to_str().unwrap();
+        let file_format = opt.file_format.as_str();
+
+        register_tables(path, file_format, &ctx, opt.debug).await?;
+
+        let mut millis = vec![];
+
+        // run benchmark
+        let sqls = get_query_sql(query)?;
+        if opt.debug {
+            println!("Running benchmark with query {}:\n {:?}", query, sqls);
+        }
+        let mut batches = vec![];
+        for i in 0..opt.iterations {
+            let start = Instant::now();
+            for sql in &sqls {
+                let df = ctx
+                    .sql(sql)
+                    .await
+                    .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
+                    .unwrap();
+                let plan = df.clone().into_optimized_plan()?;
+                if opt.debug {
+                    println!("=== Optimized logical plan ===\n{plan:?}\n");
+                }
+                batches = df
+                    .collect()
+                    .await
+                    .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
+                    .unwrap();
+            }
+            let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+            millis.push(elapsed);
+            let row_count = batches.iter().map(|b| b.num_rows()).sum();
+            if opt.iterations == 1 {
+                println!(
+                    "Query {} took {:.1} ms and returned {} rows",
+                    query, elapsed, row_count
+                );
+            } else {
+                println!(
+                    "Query {} iteration {} took {:.1} ms and returned {} rows",
+                    query, i, elapsed, row_count
+                );
+            }
+            query_run.add_result(elapsed, row_count);
             if opt.debug {
-                println!("=== Optimized logical plan ===\n{plan:?}\n");
+                pretty::print_batches(&batches)?;
+            }
+
+            if let Some(expected_results_path) = opt.expected_results.as_ref() 
{
+                let expected = get_expected_results(query, 
expected_results_path).await?;
+                assert_expected_results(&expected, &batches)
             }
-            batches = df
-                .collect()
-                .await
-                .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
-                .unwrap();
-        }
-        let elapsed = start.elapsed().as_secs_f64() * 1000.0;
-        millis.push(elapsed);
-        let row_count = batches.iter().map(|b| b.num_rows()).sum();
-        println!(
-            "Query {} iteration {} took {:.1} ms and returned {} rows",
-            opt.query, i, elapsed, row_count
-        );
-        benchmark_run.add_result(elapsed, row_count);
-        if opt.debug {
-            pretty::print_batches(&batches)?;
         }
 
-        if let Some(expected_results_path) = opt.expected_results.as_ref() {
-            let expected = get_expected_results(opt.query, 
expected_results_path).await?;
-            assert_expected_results(&expected, &batches)
+        if opt.iterations > 1 {
+            let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+            println!("Query {} avg time: {:.1} ms", query, avg);
         }
-    }
 
-    let avg = millis.iter().sum::<f64>() / millis.len() as f64;
-    println!("Query {} avg time: {:.2} ms", opt.query, avg);
+        benchmark_run.add_query_run(query_run);
+    }
 
     if let Some(path) = &opt.output_path {
-        write_summary_json(&mut benchmark_run, path)?;
+        write_summary_json(&benchmark_run, path)?;
     }
 
     Ok(())
 }
 
-fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> 
Result<()> {
+fn write_summary_json(benchmark_run: &BenchmarkRun, path: &Path) -> Result<()> 
{
     let json =
         serde_json::to_string_pretty(&benchmark_run).expect("summary is 
serializable");
-    let filename = format!(
-        "tpch-q{}-{}.json",
-        benchmark_run.query, benchmark_run.start_time
-    );
+    let filename = format!("tpch-{}.json", benchmark_run.start_time);
     let path = path.join(filename);
     println!(
         "Writing summary file to {}",
@@ -662,6 +719,7 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
 }
 
 /// Create a logical plan for each query in the specified query file
+#[cfg(test)]
 async fn create_logical_plans(
     ctx: &SessionContext,
     query: usize,
@@ -687,6 +745,7 @@ async fn create_logical_plans(
         .collect()
 }
 
+#[cfg(test)]
 async fn execute_query(
     ctx: &SessionContext,
     plan: &LogicalPlan,
@@ -833,7 +892,7 @@ async fn get_table(
             )
         }
         "parquet" => {
-            let path = format!("{path}/{table}");
+            let path = find_path(path, table, "parquet")?;
             let format = ParquetFormat::default().with_enable_pruning(true);
 
             (
@@ -971,6 +1030,27 @@ pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
     schema.finish()
 }
 
+#[derive(Debug, Serialize)]
+struct QueryRun {
+    /// query number
+    query: usize,
+    /// list of individual run times and row counts
+    iterations: Vec<QueryResult>,
+}
+
+impl QueryRun {
+    fn new(query: usize) -> Self {
+        Self {
+            query,
+            iterations: vec![],
+        }
+    }
+
+    fn add_result(&mut self, elapsed: f64, row_count: usize) {
+        self.iterations.push(QueryResult { elapsed, row_count })
+    }
+}
+
 #[derive(Debug, Serialize)]
 struct BenchmarkRun {
     /// Benchmark crate version
@@ -983,14 +1063,12 @@ struct BenchmarkRun {
     start_time: u64,
     /// CLI arguments
     arguments: Vec<String>,
-    /// query number
-    query: usize,
-    /// list of individual run times and row counts
-    iterations: Vec<QueryResult>,
+    /// Results for each query
+    queries: Vec<QueryRun>,
 }
 
 impl BenchmarkRun {
-    fn new(query: usize) -> Self {
+    fn new() -> Self {
         Self {
             benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
             datafusion_version: DATAFUSION_VERSION.to_owned(),
@@ -1000,13 +1078,12 @@ impl BenchmarkRun {
                 .expect("current time is later than UNIX_EPOCH")
                 .as_secs(),
             arguments: std::env::args().skip(1).collect::<Vec<String>>(),
-            query,
-            iterations: vec![],
+            queries: vec![],
         }
     }
 
-    fn add_result(&mut self, elapsed: f64, row_count: usize) {
-        self.iterations.push(QueryResult { elapsed, row_count })
+    fn add_query_run(&mut self, query_run: QueryRun) {
+        self.queries.push(query_run)
     }
 }
 
@@ -1637,7 +1714,7 @@ mod tests {
 
             // run the query to compute actual results of the query
             let opt = DataFusionBenchmarkOpt {
-                query: n,
+                query: Some(n),
                 debug: false,
                 iterations: 1,
                 partitions: 2,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to